kafra技术内幕:消费者初始化(scala)

Posted 架构师

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafra技术内幕:消费者初始化(scala)相关的知识,希望对你有一定的参考价值。

架构师(JiaGouX)
我们都是架构师!



high-level consumer


有时候应用程序从Kafka读取数据,并不太关心消息offset的处理. 所以Hight Level Consumer提供了一个从Kafka消费数据的高层抽象.


High Level Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中. 这个offset基于Consumer Group的名称.


Consumer Group是整个Kafka集群全局的. 所以要特别小心在新的逻辑启动之前要关闭所有的旧的逻辑(消费者进程).


当新的消费者加入同一个消费组时,Kafka会添加这个消费者的线程到要消费的topic的可用线程集合中,并且触发re-balance.


在re-balance时,kafka会分配可用的partition给可用的线程,可能移动一个partition到其他的线程中.


如果你的消费者逻辑混合了新的和旧的处理逻辑,很可能有些消息会被分配到旧的处理逻辑中.


High Level Consumer可以(应该)是多线程的应用程序.线程模型是以topic的partitions数量为中心的,不过有些规则:


  • ●如果线程数量多于partition的数量,有部分线程无法消费该topic下任何一条消息

  • ●如果线程数量少于partition的数量,有一些线程会消费多个partition的数据

  • ●如果线程数量等于partition的数量,则正好一个线程消费一个partition的数据

  • ●当添加更多的消费者进程/线程会触发re-balance,导致partition的分配发生了变化


如果一个线程消费多个partitions,并不会保证收到的消息的有序性,不过在同一个partition里的offset则是有序的.


比如某个消费者线程从partition-10接收5条消息,然后从partition-11接收了6条消息,接着再从partition-10接收了5条消息,


然后还是从partition-10又接收了5条消息,即使这个时候partition-11也有数据. 但是并不保证partition之间的顺序.

High-level的实现依赖于Consumer Group.Kafka保证同一consumer group中只有一个consumer会消费某条消息.


即每一个consumer实例只会消费某一个或多个特定partition的数据,而一个partition的数据只会被一个特定的consumer所消费.


  • ●同一条消息会被多个ConsumerGroup消费,所以有多个ConsumerGroup,每个ConsumerGroup只有一个Consumer,实现了广播.
    通常不同的ConsumerGroup的消费处理逻辑是不同的,这样同一份数据源(消息)交给不同的处理逻辑.

  • ●一个ConsumerGroup有多个Consumer,一条消息只会被这个ConsumerGroup的一个消费者所消费.实现了单播.


不过通常的设计如下图,有多个ConsumerGroup, 并且每个ConsumerGroup中也有多个Consumer.

kafra技术内幕:消费者初始化(scala)(一)


high-level Consumer Example


消费者示例, 指定要消费的topic和线程数,返回每个topic对应的KafkaStream列表,每个线程对应一个KafkaStream.


下面的示例中只使用了一个线程,所以通过streams.get(0)获取到该线程对应的KafkaStream.然后从流中读取出消息.


topicCountMap表示客户端可以同时消费多个topic,那为什么要设置线程数呢? 因为一个topic有多个partition分布在
多个broker节点上.即使是同一个broker,也可能有这个topic的多个partition. 用不同的线程来隔离不同的partition.


  • ●ConsumerConnector: Consumer的连接器,这里基于ZK实现,是ZookeeperConsumerConnector

  • ●KafkaStream: 消息流,每个消费者线程都对应了一个消息流,消息会放入消息流的阻塞队列中

  • ●ConsumerIterator: 消费者迭代器,只有迭代器开始迭代获取数据时,才会返回给消费者

ConsumerConfig conf = new ConsumerConfig(props);

ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topic, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

KafkaStream<byte[], byte[]> stream = streams.get(0); 


ConsumerIterator<byte[], byte[]> it = stream.iterator();

while (it.hasNext()){

    System.out.println("message: " + new String(it.next().message()));

}


多线程版本可以参考: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example


ConsumerConnector


Consumer定义在ConsumerConnector接口同一个文件中.默认创建的ConsumerConnector是ZookeeperConsumerConnector

object Consumer extends Logging {

  def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {

    val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)

    consumerConnect

  }

}


ConsumerConnector主要有创建消息流(createMessageStreams)和提交offset(commitOffsets)两种方法.


Consumer会根据消息流消费数据, 并且定时提交offset.由客户端自己保存offset是kafka采用pull拉取消息的一个附带工作.

trait ConsumerConnector {

  def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]

  def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean)

  def setConsumerRebalanceListener(listener: ConsumerRebalanceListener)

  def shutdown()

}


ZookeeperConsumerConnector


一个Consumer会创建一个ZookeeperConsumerConnector,代表一个消费者进程.


  • ●fetcher: 消费者获取数据, 使用ConsumerFetcherManager fetcher线程抓取数据

  • ●zkUtils: 消费者要和ZK通信, 除了注册自己,还有其他信息也会写到ZK中

  • ●topicThreadIdAndQueues: 消费者会指定自己消费哪些topic,并指定线程数, 所以topicThreadId都对应一个队列

  • ●messageStreamCreated: 消费者会创建消息流, 每个队列都对应一个消息流

  • ●offsetsChannel: offset可以存储在ZK或者kafka中,如果存在kafka里,像其他请求一样,需要和Broker通信

  • ●还有其他几个Listener监听器,分别用于topicPartition的更新,负载均衡,消费者重新负载等


虽然high level consumer不需要在应用程序中自己管理offset等,但kafka内部还是会帮你管理offset的.


为什么ConsumerConnector是ZookeeperConsumerConnector,因为消费者的offset是保存在ZooKeeper中的.

private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) 

        extends ConsumerConnector with Logging with KafkaMetricsGroup {

  private var fetcher: Option[ConsumerFetcherManager] = None

  private var zkUtils: ZkUtils = null

  private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]

  private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]

  private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]

  private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")

  private val messageStreamCreated = new AtomicBoolean(false)

  private var offsetsChannel: BlockingChannel = null

  private var sessionExpirationListener: ZKSessionExpireListener = null

  private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null

  private var loadBalancerListener: ZKRebalancerListener = null

  private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null

  private var consumerRebalanceListener: ConsumerRebalanceListener = null


  connectZk()                       // ① 创建ZkUtils,会创建对应的ZkConnection和ZkClient

  createFetcher()                   // ② 创建ConsumerFetcherManager,消费者fetcher线程

  ensureOffsetManagerConnected()    // ③ 确保连接上OffsetManager.

  if (config.autoCommitEnable) {    // ④ 启动定时提交offset线程

    scheduler.startup              

    scheduler.schedule("kafka-consumer-autocommit", autoCommit, ...)

  }

}


zk and broker

  • ●① /brokers ->> topicsids: 集群中所有的topics,以及所有的brokers.

[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 1] ls /brokers

[topics, ids]


[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 2] ls /brokers/ids

[3, 5, 4]

[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 4] get /brokers/ids/3

{"jmx_port":10055,"timestamp":"1453380999577","host":"192.168.48.153","version":1,"port":9092}

  • ●③ /brokers/topics/topic_name -> topic的每个partition,以及分配的replicas(AR)

  • ●④ /brokers/topics/topic_name/partitions/partition_id/state -> 这个partition的leader,isr

[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 17] get /brokers/topics/topic1

{"version":1,"partitions":{"2":[5,4],"1":[4,3],"0":[3,5]}}   ⬅️


[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 12] ls /brokers/topics/topic1/partitions

[2, 1, 0]

[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 13] ls /brokers/topics/topic1/partitions/0

[state]

[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 15] get /brokers/topics/topic1/partitions/0/state

{"controller_epoch":1775,"leader":3,"version":1,"leader_epoch":145,"isr":[3,5]}

kafra技术内幕:消费者初始化(scala)(一)

上图是kafka manager中某个topic的PartitionInfo, 集群只有3个节点,这个topic有3个partition,2个副本.


② Broker node registry

/brokers/ids/0 --> { "host" : "host:port", "topics" : 

  {"topic1": ["partition1" ... "partitionN"], ..., "topicN": ["partition1" ... "partitionN"] } }


每个Broker节点在自己启动的时候,会在/brokers下创建一个逻辑节点. 内容包括了Broker的主机和端口, Broker服务的所有topic,
以及分配到当前Broker的这个topic的partition列表(并不是topic的全部partition,会将所有partition分布在不同的brokers).


A consumer subscribes to event changes of the broker node registry.
当Broker挂掉的时候,在这个Broker上的所有Partition都丢失了,而Partition是给消费者服务的.
所以Broker挂掉后在做迁移的时候,会将其上的Partition转移到其他Broker上,因此消费者要消费的Partition也跟着变化.


③ Broker topic registry

/brokers/topics/topic1 -> {"version":1,"partitions":{"2":[5,4],"1":[4,3],"0":[3,5]}}


虽然topic是在/brokers下,但是这个topic的信息是全局的.在创建topic的时候,这个topic的每个partition的编号以及replicas.


具体每个partition的Leader以及isr信息则是在/brokers/topics/topic_name/partitions/partition_id/state


zk and consumer

kafra技术内幕:消费者初始化(scala)(一)

Consumer id registry: /consumers/[group_id]/ids/[consumer_id] -> topic1,...topicN


每个消费者会将它的id注册为临时znode并且将它所消费的topic设置为znode的值,当客户端(消费者)退出时,znode(consumer_id)会被删除.


A consumer subscribes to event changes of the consumer id registry within its group.
每个consumer会订阅它所在的消费组中关于consumer_id注册的更新事件. 为什么要注册呢,因为Kafka只会将一条消息发送到一个消费组中唯一的一个消费者.
如果某个消费者挂了,它要把本来发给挂的消费者的消费转给这个消费组中其他的消费者.同理,有新消费者加入消费组时,也会进行负载均衡.


Partition owner registry: /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id


在消费时,每个topic的partition只能被一个消费者组中的唯一的一个消费者消费.在每次重新负载的时候,这个映射策略就会重新构建.


Consumer offset tracking: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value


每个消费者都要跟踪自己消费的每个Partition最近的offset.表示自己读取到Partition的最新位置.


由于一个Partition只能被消费组中的一个消费者消费,所以offset是以消费组为级别的,而不是消费者.


因为如果原来的消费者挂了后,应当将这个Partition交给同一个消费组中别的消费者,而此时offset是没有变化的.


一个partition可以被不同的消费者组中的不同消费者消费,所以不同的消费者组必须维护他们各自对该partition消费的最新的offset


init


在创建ZookeeperConsumerConnector时,有几个初始化方法需要事先执行.


  • ●因为消费者要和ZK通信,所以connectZk会确保连接上ZooKeeper

  • ●消费者要消费数据,需要有抓取线程,所有的抓取线程交给ConsumerFetcherManager统一管理

  • ●由消费者客户端自己保存offset,而消费者会消费多个topic的多个partition.

  • ●类似多个数据抓取线程有管理类,多个partition的offset管理类OffsetManager是一个GroupCoordinator

  • ●定时提交线程会使用OffsetManager建立的通道定时提交offset到zk或者kafka.


createMessageStreams


由ConsumerConnector创建消息流,需要指定解码器,因为要将日志反序列化(生产者写消息时对消息序列化到日志文件).


consume并不真正的消费数据,只是初始化存放数据的queue.真正消费数据的是对该queue进行shallow iterator.


在kafka的运行过程中,会有其他的线程将数据放入partition对应的queue中. 而queue是用于KafkaStream的.


一旦数据添加到queue后,KafkaStream的阻塞队列就有数据了,消费者就可以从队列中消费消息.

def createMessageStreams[K,V](topicCountMap: Map[String,Int], 

  keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String, List[KafkaStream[K,V]]] = {

  consume(topicCountMap, keyDecoder, valueDecoder)

}

def consume[K, V](topicCountMap: scala.collection.Map[String,Int], 

  keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = {

  val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)

  val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic


  // make a list of (queue,stream) pairs, one pair for each threadId 只是准备了队列和流,数据什么时候填充呢?

  val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>

    threadIdSet.map(_ => {

      val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)

      val stream = new KafkaStream[K,V](queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)

      (queue, stream)

    })

  ).flatten.toList  //threadIdSet是个集合,外层的topicThreadIds.values也是集合,所以用flatten压扁为queue-stream对


  val dirs = new ZKGroupDirs(config.groupId)                  // /consumers/[group_id]

  registerConsumerInZK(dirs, consumerIdString, topicCount)    // /consumers/[group_id]/ids/[consumer_id]

  reinitializeConsumer(topicCount, queuesAndStreams)          // 重新初始化消费者 ⬅️


  // 返回KafkaStream, 每个Topic都对应了多个KafkaStream. 数量和topicCount中的count一样

  loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]

}


consumerIdString会返回当前Consumer在哪个ConsumerGroup的编号.每个consumer在消费组中的编号都是唯一的.


一个消费者,对一个topic可以使用多个线程一起消费(一个进程可以有多个线程). 当然一个消费者也可以消费多个topic.

def makeConsumerThreadIdsPerTopic(consumerIdString: String, topicCountMap: Map[String,  Int]) = {

  val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]()

  for ((topic, nConsumers) <- topicCountMap) {                // 每个topic有几个消费者线程

    val consumerSet = new mutable.HashSet[ConsumerThreadId]   // 一个消费者线程对应一个ConsumerThreadId

    for (i <- 0 until nConsumers)

      consumerSet += ConsumerThreadId(consumerIdString, i)

    consumerThreadIdsPerTopicMap.put(topic, consumerSet)      // 每个topic都有多个Consumer线程,但是只有一个消费者进程

  }

  consumerThreadIdsPerTopicMap                                // topic到消费者线程集合的映射

}


假设消费者C1声明了topic1:2, topic2:3. topicThreadIds=consumerThreadIdsPerTopicMap.


topicThreadIds.values = [ (C1_1,C1_2), (C1_1,C1_2,C1_3)]一共有5个线程,queuesAndStreams也有5个元素.

consumerThreadIdsPerTopicMap = {

    topic1: [C1_1, C1_2],

    topic2: [C1_1, C1_2, C1_3]

}

topicThreadIds.values = [

    [C1_1, C1_2],

    [C1_1, C1_2, C1_3]

]

threadIdSet循环[C1_1, C1_2]时, 生成两个queue->stream pair. 

threadIdSet循环[C1_1, C1_2, C1_3]时, 生成三个queue->stream pair. 

queuesAndStreams = [

    (LinkedBlockingQueue_1,KafkaStream_1),      //topic1:C1_1

    (LinkedBlockingQueue_2,KafkaStream_2),      //topic1:C1_2

    (LinkedBlockingQueue_3,KafkaStream_3),      //topic2:C1_1

    (LinkedBlockingQueue_4,KafkaStream_4),      //topic2:C1_2

    (LinkedBlockingQueue_5,KafkaStream_5),      //topic2:C1_3

]

对于消费者而言,它只要指定要消费的topic和线程数量就可以了,其他具体这个topic分成多少个partition,
以及topic-partition是分布是哪个broker上,对于客户端而言都是透明的.


客户端关注的是我的每个线程都对应了一个队列,每个队列都是一个消息流就可以了.


在Producer以及前面分析的Fetcher,都是以Broker-Topic-Partition为级别的.


AbstractFetcherManager的fetcherThreadMap就是以brokerAndFetcherId来创建拉取线程的.


而消费者是通过拉取线程才有数据可以消费的,所以客户端的每个线程实际上也是针对Partition级别的.


registerConsumerInZK

消费者需要向ZK注册一个临时节点,路径为:/consumers/[group_id]/ids/[consumer_id],内容为订阅的topic.

private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {

  val consumerRegistrationInfo = Json.encode(Map("version" -> 1, 

    "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern, "timestamp" -> timestamp))

  val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.consumerRegistryDir + "/" + consumerIdString, 

    consumerRegistrationInfo, zkUtils.zkConnection.getZookeeper, false)

  zkWatchedEphemeral.create()

}


问题:什么时候这个节点会被删除掉呢? Consumer进程挂掉时,或者Session失效时删除临时节点. 重连时会重新创建.


由于是临时节点,一旦创建节点的这个进程挂掉了,临时节点就会自动被删除掉. 这是由zk机制决定的,不是由消费者完成的.


reinitializeConsumer listener

当前Consumer在ZK注册之后,需要重新初始化Consumer. 对于全新的消费者,注册多个监听器,在zk的对应节点的注册事件发生时,会回调监听器的方法.


  • ●将topic对应的消费者线程id及对应的LinkedBlockingQueue放入topicThreadIdAndQueues中,LinkedBlockingQueue是真正存放数据的queue

  • ●① 注册sessionExpirationListener,监听状态变化事件.在session失效重新创建session时调用

  • ●② 向/consumers/[group_id]/ids注册Child变更事件的loadBalancerListener,当消费组下的消费者发生变化时调用

  • ●③ 向/brokers/topics/[topic]注册Data变更事件的topicPartitionChangeListener,在topic数据发生变化时调用

  • ●显式调用loadBalancerListener.syncedRebalance(), 会调用reblance方法进行consumer的初始化工作


private def reinitializeConsumer[K,V](topicCount: TopicCount, 

  queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {

  val dirs = new ZKGroupDirs(config.groupId)

  // ② listener to consumer and partition changes

  if (loadBalancerListener == null) {

    val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]

    loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString, 

      topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])

  }

  // ① create listener for session expired event if not exist yet

  if (sessionExpirationListener == null) sessionExpirationListener = 

    new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener)

  // ③ create listener for topic partition change event if not exist yet

  if (topicPartitionChangeListener == null) 

    topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)


  // listener to consumer and partition changes

  zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)

  zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)

  // register on broker partition path changes.

  topicStreamsMap.foreach { topicAndStreams => 

    zkUtils.zkClient.subscribeDataChanges(BrokerTopicsPath+"/"+topicAndStreams._1, topicPartitionChangeListener)

  }


  // explicitly trigger load balancing for this consumer

  loadBalancerListener.syncedRebalance()

}


ZKRebalancerListener传入ZKSessionExpireListener和ZKTopicPartitionChangeListener.它们都会使用ZKRebalancerListener完成自己的工作.


ZKSessionExpireListener

当Session失效时,新的会话建立时,立即进行rebalance操作.


class ZKSessionExpireListener(val dirs: ZKGroupDirs, val consumerIdString: String, 

  val topicCount: TopicCount, val loadBalancerListener: ZKRebalancerListener) extends IZkStateListener {

  def handleNewSession() {

    loadBalancerListener.resetState()

    registerConsumerInZK(dirs, consumerIdString, topicCount)

    loadBalancerListener.syncedRebalance()

  }

}


ZKTopicPartitionChangeListener

当topic的数据变化时,通过触发的方式启动rebalance操作.

class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener) 

  extends IZkDataListener {

  def handleDataChange(dataPath : String, data: Object) {

    loadBalancerListener.rebalanceEventTriggered()

  }

}



来源:zqhxuyuan.github.io

原文:http://zqhxuyuan.github.io/2016/01/19/2016-01-19-Kafka-Consumer-scala/

如有侵权或不周之处,敬请劳烦联系若飞(微信:1321113940)马上删除,谢谢!

·END·





架构师

我们都是架构师!


架构师订阅号,关注获取更多技术分享


现已开通多个微信群,有兴趣交流学习的同学

可加若飞微信:13511421494进群

合作邮箱:admin@137x.com


以上是关于kafra技术内幕:消费者初始化(scala)的主要内容,如果未能解决你的问题,请参考以下文章

Scala链式编程内幕

spring技术内幕读书笔记之IoC容器的学习

Zookeeper技术内幕二

Scala零基础教学61-80

COM技术内幕(笔记)

Spark技术内幕: Task向Executor提交的源代码解析