`
tjuxiaoqiang
  • 浏览: 25154 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

kestrel 源码分析之二 QueueCollection消息队列管理

阅读更多
前一篇已经对Kestrel的启动类进行了说明,我们看到在启动类中定义了一个QueueCollection,这个类主要用在内存中维护用户定义的队列。
def loadQueues() {
    Journal.getQueueNamesFromFolder(path) map { queue(_) }
  }

def queue(name: String): Option[PersistentQueue] = synchronized {
    if (shuttingDown) {
      None
    } else {
      Some(queues.get(name) getOrElse {
        // only happens when creating a queue for the first time.
        val q = if (name contains '+') {
          val master = name.split('+')(0)
          fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name
          log.info("Fanout queue %s added to %s", name, master)
          buildQueue(master, name, path.getPath)
        } else {
          buildQueue(name, name, path.getPath)
        }
        q.setup
        queues(name) = q
        q
      })
    }
  }

val config = queueConfigMap.getOrElse(name, defaultQueueConfig)
    log.info("Setting up queue %s: %s", realName, config)
    new PersistentQueue(realName, path, config, timer, journalSyncTimer, Some(this.apply))


上文中说到Kestrel启动的时候首先实例化一个对象,然后调用QueueCollection.loadQueues();函数,根据上面的代码我们可以看到,这个函数主要是从文件系统中找到所有的队列持久化文件,并调用queue函数。我们发现这个类为每个队列创建创建了一个PersistentQueue类,并调用PersistentQueue类的setup方法,这个方法主要是回放这个队列对应的持久化文件,具体的操作下节介绍。

另外该类中还有一些对队列的操作方法,如add,remove等操作方法。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics