Kafka2.0服务端启动源码【黑马java培训】
				更新时间:2019年07月26日 10时48分10秒  来源:黑马程序员论坛
				
					
| Kafka2.0服务端启动源码 
   Kafka 服务端通过Kafka.scala的主函数main方法启动。KafkaServerStartable类提供读取配置文件、启动/停止服务的方法。而启动/停止服务最终调用的是KafkaServer的startup/shutdown方法。启动流程 请求处理流程详细源码分析Acceptor 线程def run() {  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 注册接收事件  startupComplete() // 通知 Acceptor 线程  var currentProcessor = 0  while (isRunning) {    val ready = nioSelector.select(500) // 轮询事件    if (ready > 0) {      val keys = nioSelector.selectedKeys()      val iter = keys.iterator()      while (iter.hasNext && isRunning) {        val key = iter.next        iter.remove()        if (key.isAcceptable) { // 有可接受事件          val processor = synchronized {            currentProcessor = currentProcessor % processors.size            processors(currentProcessor) // 缓存 Processor           }          accept(key, processor) // 将 SocketChannel 缓存到队列        }      }    }  }}Processor 线程override def run() {  startupComplete() // CountDownLatch.countDown 唤醒 Acceptor 线程。  while (isRunning) {    configureNewConnections() // 从缓存队列取出 SocketChannel,绑定到 KafkaChannel    processNewResponses() // 处理返回客户端的响应    poll() // Kafka.Selector 轮询读取/写入事件    processCompletedReceives() // 处理客户端的请求,放到阻塞队列    processCompletedSends() // 处理返回客户端响应后的回调    processDisconnected() // 断开连接后的处理  }}KafkaRequestHandler 线程阻塞队列def run() {  while (!stopped) {    val startSelectTime = time.nanoseconds    // 从阻塞队列拉取请求    val req = requestChannel.receiveRequest(300)     req match {      case request: RequestChannel.Request =>        try {          apis.handle(request) // 调用`KafkaApis.handle`方法,进行集中处理请求。        }    }  }}KSelector启动 zk 客户端。启动动态配置。启动调度线程池。启动日志管理器的后台线程,包括日志清理、日志刷盘、日志删除、日志压缩。启动 NIO Socket 服务。初始化一个接收器Acceptor,即启动 NIO Socket。添加num.network.threads个接收器到请求通道RequestChannel的处理器缓存ConcurrentHashMap,key 为递增编号,value 为处理器Processor。Acceptor执行CountDownLatch.await等待通知启动。缓存Acceptor到ConcurrentHashMap,key 为EndPoint,value 为Acceptor。
启动副本管理器。在 zk 注册 broker。启动控制器。启动组协调器。启动事务协调器。初始化KafkaApis。初始化处理器线程缓存池。启动num.io.threads个请求处理器线程KafkaRequestHandler。从阻塞队列ArrayBlockingQueue获取请求,调用KafkaApis.handle方法,进行集中处理请求。
启动处理器线程。首先CountDownLatch.countDown通知唤醒Acceptor线程。使用NIO.select轮询。如果有可接收就绪的事件,则将当前的SocketChannel加入缓存队列ConcurrentLinkedQueue
从上述缓存队列取出SocketChannel,绑定到KafkaChannel。将接收到的请求缓存到限长阻塞队列ArrayBlockingQueue
 
   参考客户端源码分析。 
 
 
 
 
 
 
 
 
 
 | 
        
 
    
    
推荐了解热门学科
传智播客是一家致力于培养高素质软件开发人才的科技公司,“黑马程序员”是传智播客旗下高端IT教育品牌。自“黑马程序员”成立以来,教学研发团队一直致力于打造精品课程资源,不断在产、学、研3个层面创新自己的执教理念与教学方针,并集中“黑马程序员”的优势力量,针对性地出版了计算机系列教材50多册,制作教学视频数+套,发表各类技术文章数百篇。
传智播客从未停止思考 
传智播客副总裁毕向东在2019IT培训行业变革大会提到,“传智播客意识到企业的用人需求已经从初级程序员升级到中高级程序员,具备多领域、多行业项目经验的人才成为企业用人的首选。”
中级程序员和初级程序员的差别在哪里?
项目经验。毕向东表示,“中级程序员和初级程序员最大的差别在于中级程序员比初级程序员多了三四年的工作经验,从而多出了更多的项目经验。“为此,传智播客研究院引进曾在知名IT企业如阿里、IBM就职的高级技术专家,集中研发面向中高级程序员的课程,用以满足企业用人需求,尽快补全IT行业所需的人才缺口。
何为中高级程序员课程? 
传智播客进行了定义。中高级程序员课程,是在当前主流的初级程序员课程的基础上,增加多领域多行业的含金量项目,从技术的广度和深度上进行拓展。“我们希望用5年的时间,打造上百个高含金量的项目,覆盖主流的32个行业。”传智播客课程研发总监于洋表示。
黑马程序员热门视频教程【点击播放】