`
tjuxiaoqiang
  • 浏览: 25040 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

kestrel 源码分析之四 Journal消息持久化类

阅读更多
上文我们看到PersistentQueue类的实现,它就代表每个消息队列在服务其中的实现,另外我们会看到每个PersistentQueue类包含一个Journal对象,该对象主要是负责队列的持久化操作。对于文件的读写我们用Java NIO来实现。下面我们来看一看主要的几个方法
def fillReadBehind(gotItem: QItem => Unit)(gotCheckpoint: Checkpoint => Unit): Unit = {
    val pos = if (replayer.isDefined) replayer.get.position else writer.position
    val filename = if (replayerFilename.isDefined) replayerFilename.get else queueName

    reader.foreach { rj =>
      if (rj.position == pos && readerFilename.get == filename) {
        // we've caught up.
        rj.close()
        reader = None
        readerFilename = None
      } else {
        readJournalEntry(rj) match {
          case (JournalItem.Add(item), _) =>
            gotItem(item)
          case (JournalItem.Remove, _) =>
            removesSinceReadBehind -= 1
          case (JournalItem.ConfirmRemove(_), _) =>
            removesSinceReadBehind -= 1
          case (JournalItem.Continue(item, xid), _) =>
            removesSinceReadBehind -= 1
            gotItem(item)
          case (JournalItem.EndOfFile, _) =>
            // move to next file and try again.
            val oldFilename = readerFilename.get
            rj.close()
            readerFilename = Journal.journalAfter(queuePath, queueName, readerFilename.get)
            reader = Some(new FileInputStream(new File(queuePath, readerFilename.get)).getChannel)
            log.info("Read-behind on '%s' moving from file %s to %s", queueName, oldFilename, readerFilename.get)
            if (checkpoint.isDefined && checkpoint.get.filename == oldFilename) {
              gotCheckpoint(checkpoint.get)
            }
            fillReadBehind(gotItem)(gotCheckpoint)
          case (_, _) =>
        }
      }
    }
  }


Kestrel对于过期数据的处理很巧妙,他通过两种方式来触发对过期数据的检查,一种是定时任务,我们在Kestrel.scala看到的定时器,就是设置定期去检查数据是否过期,第二种是当消费队列中的数据时会检查数据是否过期,这两种方法检查的都是队列头的数据。

当你需要对队列中的数据进行消费或者检查过期数据时,都需要调用fillReadBehind方法,因为你的内存是有一定空间的,它不一定能够把之前所有持久化的数据都导入内存中,所以当内存中一旦有空闲空间,都会从文件中继续读取数据到内存。

另外一个有意思的地方是,Journal对象会记录一个removesSinceReadBehind对象,这个对象记录从读文件开始到现在所有 ReadFile 模式下接受的remove操作的个数,这样每在文件中读到一个remove操作,就对其进行减一,就收一个pop操作就对该变量加一。这个方法能够保证服务在重启时replay这些日志文件时,消息没有丢失,但是对于消息的重复发送,不能进行保证。

对于事物,Kestrel会在PersistentQueue中保存一个Map中,用于存储该事物<事物ID,Item>, 当确认消费时,从map中删除这个事物,如果不提交,则把该消息插入到队列的首位,并从Map中删除该数据。

Kestrel中如果对消息队列选择了持久化,那么,客户的每一个操作都会记录为日志。这个日志正如Mysql一样,通过回放能够恢复之前的数据。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics