`
tjuxiaoqiang
  • 浏览: 25176 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

kestrel 源码分析之一 综述

阅读更多
Kestrel 是Twitter内部使用的,用scala语言实现的一个消息队列中间件。而且核心代码只用了两千多行,能够支持事务,并且支持消息的持久化,Kestrel底层的通信是通过netty实现的,支持memcached协议。下面我们来分析一下kestrel的几个核心实现类,并讨论一下它是如何支持事务和消息的持久化。而Kestrel的消息的持久化是通过Java NIO实现。

下面是Kestrel的几个核心实现类:

Kestrel.scala -- 核心启动类

QueueCollection.scala -- 通过Map管理所有消息队列

PersistentQueue.scala -- 用于维护一个消息队列

Journal.scala -- 用于管理对应的消息队列持久化

KestrelHandler.scala -- Kestrel消息队列核心操作管理类

QItem.scala -- 用于存储一个消息实体

journalSyncTimer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)

    timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS)


    queueCollection = new QueueCollection(queuePath, new NettyTimer(timer), new NettyTimer(journalSyncTimer), defaultQueueConfig, builders)

    queueCollection.loadQueues()


Kestrel启动的时候首先实例化一个QueueCollection,用于管理一组消息队列,然后通过queueCollection.loadQueues()方法把之前持久化的消息load到内存中。

executor = Executors.newCachedThreadPool()
    channelFactory = new NioServerSocketChannelFactory(executor, executor)

    val memcachePipelineFactory = new ChannelPipelineFactory() {
      def getPipeline() = {
        val protocolCodec = protocol match {
          case Protocol.Ascii => MemcacheCodec.asciiCodec(bytesRead, bytesWritten)
          case Protocol.Binary => throw new Exception("Binary protocol not supported yet.")
        }
        val handler = new MemcacheHandler(channelGroup, queueCollection, maxOpenTransactions, clientTimeout)
        Channels.pipeline(protocolCodec, handler)
      }
    }
    memcacheAcceptor = memcacheListenPort.map { port =>
      val address = new InetSocketAddress(listenAddress, port)
      makeAcceptor(channelFactory, memcachePipelineFactory, address)
    }


定义一个MemcacheHandler,用于接收外面的消息,并且把MemcacheHandler绑定到配置文件中配置的端口上。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics