上文我们看到PersistentQueue类的实现,它就代表每个消息队列在服务其中的实现,另外我们会看到每个PersistentQueue类包含一个Journal对象,该对象主要是负责队列的持久化操作。对于文件的读写我们用Java NIO来实现。下面我们来看一看主要的几个方法
def fillReadBehind(gotItem: QItem => Unit)(gotCheckpoint: Checkpoint => Unit): Unit = {
val pos = if (replayer.isDefined) replayer.get.position else writer.position
val filename = if (replayerFilename.isDefined) replayerFilename.get else queueName
reader.foreach { rj =>
if (rj.position == pos && readerFilename.get == filename) {
// we've caught up.
rj.close()
reader = None
readerFilename = None
} else {
readJournalEntry(rj) match {
case (JournalItem.Add(item), _) =>
gotItem(item)
case (JournalItem.Remove, _) =>
removesSinceReadBehind -= 1
case (JournalItem.ConfirmRemove(_), _) =>
removesSinceReadBehind -= 1
case (JournalItem.Continue(item, xid), _) =>
removesSinceReadBehind -= 1
gotItem(item)
case (JournalItem.EndOfFile, _) =>
// move to next file and try again.
val oldFilename = readerFilename.get
rj.close()
readerFilename = Journal.journalAfter(queuePath, queueName, readerFilename.get)
reader = Some(new FileInputStream(new File(queuePath, readerFilename.get)).getChannel)
log.info("Read-behind on '%s' moving from file %s to %s", queueName, oldFilename, readerFilename.get)
if (checkpoint.isDefined && checkpoint.get.filename == oldFilename) {
gotCheckpoint(checkpoint.get)
}
fillReadBehind(gotItem)(gotCheckpoint)
case (_, _) =>
}
}
}
}
Kestrel对于过期数据的处理很巧妙,他通过两种方式来触发对过期数据的检查,一种是定时任务,我们在Kestrel.scala看到的定时器,就是设置定期去检查数据是否过期,第二种是当消费队列中的数据时会检查数据是否过期,这两种方法检查的都是队列头的数据。
当你需要对队列中的数据进行消费或者检查过期数据时,都需要调用fillReadBehind方法,因为你的内存是有一定空间的,它不一定能够把之前所有持久化的数据都导入内存中,所以当内存中一旦有空闲空间,都会从文件中继续读取数据到内存。
另外一个有意思的地方是,Journal对象会记录一个removesSinceReadBehind对象,这个对象记录从读文件开始到现在所有 ReadFile 模式下接受的remove操作的个数,这样每在文件中读到一个remove操作,就对其进行减一,就收一个pop操作就对该变量加一。这个方法能够保证服务在重启时replay这些日志文件时,消息没有丢失,但是对于消息的重复发送,不能进行保证。
对于事物,Kestrel会在PersistentQueue中保存一个Map中,用于存储该事物<事物ID,Item>, 当确认消费时,从map中删除这个事物,如果不提交,则把该消息插入到队列的首位,并从Map中删除该数据。
Kestrel中如果对消息队列选择了持久化,那么,客户的每一个操作都会记录为日志。这个日志正如Mysql一样,通过回放能够恢复之前的数据。
分享到:
相关推荐
NULL 博文链接:https://vanadiumlin.iteye.com/blog/1461152
NULL 博文链接:https://snowolf.iteye.com/blog/1604531
kestrel项目源文件包
在.NET 6.0上使用Kestrel配置和自定义HTTPS.doc
资源分类:Python库 所属语言:Python 资源全名:kestrel-lang-1.0.5.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
NULL 博文链接:https://snowolf.iteye.com/blog/1605229
addlog-kestrel
NULL 博文链接:https://snowolf.iteye.com/blog/1612207
本示例可直接运行,方便快速了解Kestrel框架. Kestrel 是包含在 ASP.NET Core 项目模板中的 Web 服务器, .NET Core 支持的所有平台和版本均支持 Kestrel。
红隼节点Node.js 的 Kestrel 客户端安装 npm install kestrel.node用法 var Kestrel = require ( 'kestrel.node' ) ;var client = new Kestrel ( 'localhost:22133' ) ;// get can optionally take a timeout in ...
Kestrel是不是Unix或Windows的内核。
这篇文章主要是记录如何将Kestrel的服务封装在WindowService中 关于WindowsServer 请参考如下这篇文章 .netcore worker service (辅助角色服务) 的上手入门,包含linux和windows服务部署 开发服务 之前做过.net5...
在ASP.NET Core中,如果在Kestrel中想使用HTTPS对站点进行加密传输,可以按照如下方式 申请证书 这一步就不详细说了,有免费的和收费的,申请完成之后会给你一个*.pfx结尾的文件。 添加NuGet包 nuget中...
介绍和背景Kestrel项目涉及使用全自制设计的计算和自我教育的自由,直至从原理图和寄存器传输逻辑一直到OS API和用户教程的各个级别公开记录的硬件和软件。 根据我的经验,它的设计来自多种来源: 硬件工程卓越奖...
Kestrel(Kotlin 事件溯源) 用于在 Kotlin 中构建基于事件的 CQRS 应用程序的框架。 概括 事件溯源是一种架构范式,其中应用程序状态被建模并存储为在您的应用程序域中有意义的语义事件的不可变序列。 CQRS,命令...
python库。 资源全名:kestrel_lang-1.1.0-py3-none-any.whl
jar包,亲测可用