kafra技术内幕:消费者初始化(scala)
Posted 架构师
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafra技术内幕:消费者初始化(scala)相关的知识,希望对你有一定的参考价值。
high-level consumer
有时候应用程序从Kafka读取数据,并不太关心消息offset的处理. 所以Hight Level Consumer提供了一个从Kafka消费数据的高层抽象.
High Level Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中. 这个offset基于Consumer Group的名称.
Consumer Group是整个Kafka集群全局的. 所以要特别小心在新的逻辑启动之前要关闭所有的旧的逻辑(消费者进程).
当新的消费者加入同一个消费组时,Kafka会添加这个消费者的线程到要消费的topic的可用线程集合中,并且触发re-balance.
在re-balance时,kafka会分配可用的partition给可用的线程,可能移动一个partition到其他的线程中.
如果你的消费者逻辑混合了新的和旧的处理逻辑,很可能有些消息会被分配到旧的处理逻辑中.
High Level Consumer可以(应该)是多线程的应用程序.线程模型是以topic的partitions数量为中心的,不过有些规则:
●如果线程数量多于partition的数量,有部分线程无法消费该topic下任何一条消息
●如果线程数量少于partition的数量,有一些线程会消费多个partition的数据
●如果线程数量等于partition的数量,则正好一个线程消费一个partition的数据
●当添加更多的消费者进程/线程会触发re-balance,导致partition的分配发生了变化
如果一个线程消费多个partitions,并不会保证收到的消息的有序性,不过在同一个partition里的offset则是有序的.
比如某个消费者线程从partition-10接收5条消息,然后从partition-11接收了6条消息,接着再从partition-10接收了5条消息,
然后还是从partition-10又接收了5条消息,即使这个时候partition-11也有数据. 但是并不保证partition之间的顺序.
High-level的实现依赖于Consumer Group.Kafka保证同一consumer group中只有一个consumer会消费某条消息.
即每一个consumer实例只会消费某一个或多个特定partition的数据,而一个partition的数据只会被一个特定的consumer所消费.
●同一条消息会被多个ConsumerGroup消费,所以有多个ConsumerGroup,每个ConsumerGroup只有一个Consumer,实现了广播.
通常不同的ConsumerGroup的消费处理逻辑是不同的,这样同一份数据源(消息)交给不同的处理逻辑.●一个ConsumerGroup有多个Consumer,一条消息只会被这个ConsumerGroup的一个消费者所消费.实现了单播.
不过通常的设计如下图,有多个ConsumerGroup, 并且每个ConsumerGroup中也有多个Consumer.
high-level Consumer Example
消费者示例, 指定要消费的topic和线程数,返回每个topic对应的KafkaStream列表,每个线程对应一个KafkaStream.
下面的示例中只使用了一个线程,所以通过streams.get(0)获取到该线程对应的KafkaStream.然后从流中读取出消息.
topicCountMap表示客户端可以同时消费多个topic,那为什么要设置线程数呢? 因为一个topic有多个partition分布在
多个broker节点上.即使是同一个broker,也可能有这个topic的多个partition. 用不同的线程来隔离不同的partition.
●ConsumerConnector: Consumer的连接器,这里基于ZK实现,是ZookeeperConsumerConnector
●KafkaStream: 消息流,每个消费者线程都对应了一个消息流,消息会放入消息流的阻塞队列中
●ConsumerIterator: 消费者迭代器,只有迭代器开始迭代获取数据时,才会返回给消费者
ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
KafkaStream<byte[], byte[]> stream = streams.get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()){
System.out.println("message: " + new String(it.next().message()));
}
多线程版本可以参考: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
ConsumerConnector
Consumer定义在ConsumerConnector接口同一个文件中.默认创建的ConsumerConnector是ZookeeperConsumerConnector
object Consumer extends Logging {
def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
consumerConnect
}
}
ConsumerConnector主要有创建消息流(createMessageStreams)和提交offset(commitOffsets)两种方法.
Consumer会根据消息流消费数据, 并且定时提交offset.由客户端自己保存offset是kafka采用pull拉取消息的一个附带工作.
trait ConsumerConnector {
def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]
def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean)
def setConsumerRebalanceListener(listener: ConsumerRebalanceListener)
def shutdown()
}
ZookeeperConsumerConnector
一个Consumer会创建一个ZookeeperConsumerConnector,代表一个消费者进程.
●fetcher: 消费者获取数据, 使用ConsumerFetcherManager fetcher线程抓取数据
●zkUtils: 消费者要和ZK通信, 除了注册自己,还有其他信息也会写到ZK中
●topicThreadIdAndQueues: 消费者会指定自己消费哪些topic,并指定线程数, 所以topicThreadId都对应一个队列
●messageStreamCreated: 消费者会创建消息流, 每个队列都对应一个消息流
●offsetsChannel: offset可以存储在ZK或者kafka中,如果存在kafka里,像其他请求一样,需要和Broker通信
●还有其他几个Listener监听器,分别用于topicPartition的更新,负载均衡,消费者重新负载等
虽然high level consumer不需要在应用程序中自己管理offset等,但kafka内部还是会帮你管理offset的.
为什么ConsumerConnector是ZookeeperConsumerConnector,因为消费者的offset是保存在ZooKeeper中的.
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean)
extends ConsumerConnector with Logging with KafkaMetricsGroup {
private var fetcher: Option[ConsumerFetcherManager] = None
private var zkUtils: ZkUtils = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
private val messageStreamCreated = new AtomicBoolean(false)
private var offsetsChannel: BlockingChannel = null
private var sessionExpirationListener: ZKSessionExpireListener = null
private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
private var loadBalancerListener: ZKRebalancerListener = null
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
private var consumerRebalanceListener: ConsumerRebalanceListener = null
connectZk() // ① 创建ZkUtils,会创建对应的ZkConnection和ZkClient
createFetcher() // ② 创建ConsumerFetcherManager,消费者fetcher线程
ensureOffsetManagerConnected() // ③ 确保连接上OffsetManager.
if (config.autoCommitEnable) { // ④ 启动定时提交offset线程
scheduler.startup
scheduler.schedule("kafka-consumer-autocommit", autoCommit, ...)
}
}
zk and broker
●①
/brokers
->>topics
和ids
: 集群中所有的topics,以及所有的brokers.
[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 1] ls /brokers
[topics, ids]
[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 2] ls /brokers/ids
[3, 5, 4]
[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 4] get /brokers/ids/3
{"jmx_port":10055,"timestamp":"1453380999577","host":"192.168.48.153","version":1,"port":9092}
●③
/brokers/topics/topic_name
-> topic的每个partition,以及分配的replicas(AR)●④
/brokers/topics/topic_name/partitions/partition_id/state
-> 这个partition的leader,isr
[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 17] get /brokers/topics/topic1
{"version":1,"partitions":{"2":[5,4],"1":[4,3],"0":[3,5]}} ⬅️
[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 12] ls /brokers/topics/topic1/partitions
[2, 1, 0]
[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 13] ls /brokers/topics/topic1/partitions/0
[state]
[zk: 192.168.47.83:2181,192.168.47.84:2181,192.168.47.86:2181(CONNECTED) 15] get /brokers/topics/topic1/partitions/0/state
{"controller_epoch":1775,"leader":3,"version":1,"leader_epoch":145,"isr":[3,5]}
上图是kafka manager中某个topic的PartitionInfo, 集群只有3个节点,这个topic有3个partition,2个副本.
② Broker node registry
/brokers/ids/0 --> { "host" : "host:port", "topics" :
{"topic1": ["partition1" ... "partitionN"], ..., "topicN": ["partition1" ... "partitionN"] } }
每个Broker节点在自己启动的时候,会在/brokers下创建一个逻辑节点. 内容包括了Broker的主机和端口, Broker服务的所有topic,
以及分配到当前Broker的这个topic的partition列表(并不是topic的全部partition,会将所有partition分布在不同的brokers).
A consumer subscribes to event changes of the broker node registry.
当Broker挂掉的时候,在这个Broker上的所有Partition都丢失了,而Partition是给消费者服务的.
所以Broker挂掉后在做迁移的时候,会将其上的Partition转移到其他Broker上,因此消费者要消费的Partition也跟着变化.
③ Broker topic registry
/brokers/topics/topic1 -> {"version":1,"partitions":{"2":[5,4],"1":[4,3],"0":[3,5]}}
虽然topic是在/brokers下,但是这个topic的信息是全局的.在创建topic的时候,这个topic的每个partition的编号以及replicas.
具体每个partition的Leader以及isr信息则是在/brokers/topics/topic_name/partitions/partition_id/state
zk and consumer
Consumer id registry: /consumers/[group_id]/ids/[consumer_id] -> topic1,...topicN
每个消费者会将它的id注册为临时znode并且将它所消费的topic设置为znode的值,当客户端(消费者)退出时,znode(consumer_id)会被删除.
A consumer subscribes to event changes of the consumer id registry within its group.
每个consumer会订阅它所在的消费组中关于consumer_id注册的更新事件. 为什么要注册呢,因为Kafka只会将一条消息发送到一个消费组中唯一的一个消费者.
如果某个消费者挂了,它要把本来发给挂的消费者的消费转给这个消费组中其他的消费者.同理,有新消费者加入消费组时,也会进行负载均衡.
Partition owner registry: /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id
在消费时,每个topic的partition只能被一个消费者组中的唯一的一个消费者消费.在每次重新负载的时候,这个映射策略就会重新构建.
Consumer offset tracking: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
每个消费者都要跟踪自己消费的每个Partition最近的offset.表示自己读取到Partition的最新位置.
由于一个Partition只能被消费组中的一个消费者消费,所以offset是以消费组
为级别的,而不是消费者.
因为如果原来的消费者挂了后,应当将这个Partition交给同一个消费组中别的消费者,而此时offset是没有变化的.
一个partition可以被不同的消费者组
中的不同消费者消费,所以不同的消费者组必须维护他们各自对该partition消费的最新的offset
init
在创建ZookeeperConsumerConnector时,有几个初始化方法需要事先执行.
●因为消费者要和ZK通信,所以connectZk会确保连接上ZooKeeper
●消费者要消费数据,需要有抓取线程,所有的抓取线程交给ConsumerFetcherManager统一管理
●由消费者客户端自己保存offset,而消费者会消费多个topic的多个partition.
●类似多个数据抓取线程有管理类,多个partition的offset管理类OffsetManager是一个GroupCoordinator
●定时提交线程会使用OffsetManager建立的通道定时提交offset到zk或者kafka.
createMessageStreams
由ConsumerConnector创建消息流,需要指定解码器,因为要将日志反序列化(生产者写消息时对消息序列化到日志文件).
consume并不真正的消费数据,只是初始化存放数据的queue.真正消费数据的是对该queue进行shallow iterator.
在kafka的运行过程中,会有其他的线程将数据放入partition对应的queue中. 而queue是用于KafkaStream的.
一旦数据添加到queue后,KafkaStream的阻塞队列就有数据了,消费者就可以从队列中消费消息.
def createMessageStreams[K,V](topicCountMap: Map[String,Int],
keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String, List[KafkaStream[K,V]]] = {
consume(topicCountMap, keyDecoder, valueDecoder)
}
def consume[K, V](topicCountMap: scala.collection.Map[String,Int],
keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = {
val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
// make a list of (queue,stream) pairs, one pair for each threadId 只是准备了队列和流,数据什么时候填充呢?
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
(queue, stream)
})
).flatten.toList //threadIdSet是个集合,外层的topicThreadIds.values也是集合,所以用flatten压扁为queue-stream对
val dirs = new ZKGroupDirs(config.groupId) // /consumers/[group_id]
registerConsumerInZK(dirs, consumerIdString, topicCount) // /consumers/[group_id]/ids/[consumer_id]
reinitializeConsumer(topicCount, queuesAndStreams) // 重新初始化消费者 ⬅️
// 返回KafkaStream, 每个Topic都对应了多个KafkaStream. 数量和topicCount中的count一样
loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}
consumerIdString
会返回当前Consumer在哪个ConsumerGroup的编号.每个consumer在消费组中的编号都是唯一的.
一个消费者,对一个topic可以使用多个线程一起消费(一个进程可以有多个线程). 当然一个消费者也可以消费多个topic.
def makeConsumerThreadIdsPerTopic(consumerIdString: String, topicCountMap: Map[String, Int]) = {
val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]()
for ((topic, nConsumers) <- topicCountMap) { // 每个topic有几个消费者线程
val consumerSet = new mutable.HashSet[ConsumerThreadId] // 一个消费者线程对应一个ConsumerThreadId
for (i <- 0 until nConsumers)
consumerSet += ConsumerThreadId(consumerIdString, i)
consumerThreadIdsPerTopicMap.put(topic, consumerSet) // 每个topic都有多个Consumer线程,但是只有一个消费者进程
}
consumerThreadIdsPerTopicMap // topic到消费者线程集合的映射
}
假设消费者C1声明了topic1:2, topic2:3. topicThreadIds=consumerThreadIdsPerTopicMap.
topicThreadIds.values = [ (C1_1,C1_2), (C1_1,C1_2,C1_3)]一共有5个线程,queuesAndStreams也有5个元素.
consumerThreadIdsPerTopicMap = {
topic1: [C1_1, C1_2],
topic2: [C1_1, C1_2, C1_3]
}
topicThreadIds.values = [
[C1_1, C1_2],
[C1_1, C1_2, C1_3]
]
threadIdSet循环[C1_1, C1_2]时, 生成两个queue->stream pair.
threadIdSet循环[C1_1, C1_2, C1_3]时, 生成三个queue->stream pair.
queuesAndStreams = [
(LinkedBlockingQueue_1,KafkaStream_1), //topic1:C1_1
(LinkedBlockingQueue_2,KafkaStream_2), //topic1:C1_2
(LinkedBlockingQueue_3,KafkaStream_3), //topic2:C1_1
(LinkedBlockingQueue_4,KafkaStream_4), //topic2:C1_2
(LinkedBlockingQueue_5,KafkaStream_5), //topic2:C1_3
]
对于消费者而言,它只要指定要消费的topic和线程数量就可以了,其他具体这个topic分成多少个partition,
以及topic-partition是分布是哪个broker上,对于客户端而言都是透明的.
客户端关注的是我的每个线程都对应了一个队列,每个队列都是一个消息流就可以了.
在Producer以及前面分析的Fetcher,都是以Broker-Topic-Partition为级别的.
AbstractFetcherManager的fetcherThreadMap就是以brokerAndFetcherId来创建拉取线程的.
而消费者是通过拉取线程才有数据可以消费的,所以客户端的每个线程实际上也是针对Partition级别的.
registerConsumerInZK
消费者需要向ZK注册一个临时节点,路径为:/consumers/[group_id]/ids/[consumer_id]
,内容为订阅的topic.
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
val consumerRegistrationInfo = Json.encode(Map("version" -> 1,
"subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern, "timestamp" -> timestamp))
val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.consumerRegistryDir + "/" + consumerIdString,
consumerRegistrationInfo, zkUtils.zkConnection.getZookeeper, false)
zkWatchedEphemeral.create()
}
问题:什么时候这个节点会被删除掉呢? Consumer进程挂掉时,或者Session失效时删除临时节点. 重连时会重新创建.
由于是临时节点,一旦创建节点的这个进程挂掉了,临时节点就会自动被删除掉. 这是由zk机制决定的,不是由消费者完成的.
reinitializeConsumer listener
当前Consumer在ZK注册之后,需要重新初始化Consumer. 对于全新的消费者,注册多个监听器,在zk的对应节点的注册事件发生时,会回调监听器的方法.
●将topic对应的消费者线程id及对应的LinkedBlockingQueue放入topicThreadIdAndQueues中,LinkedBlockingQueue是真正存放数据的queue
●① 注册
sessionExpirationListener
,监听状态变化事件.在session失效重新创建session时调用●② 向
/consumers/[group_id]/ids
注册Child变更事件的loadBalancerListener
,当消费组下的消费者发生变化时调用●③ 向
/brokers/topics/[topic]
注册Data变更事件的topicPartitionChangeListener
,在topic数据发生变化时调用●显式调用
loadBalancerListener.syncedRebalance()
, 会调用reblance方法进行consumer的初始化工作
private def reinitializeConsumer[K,V](topicCount: TopicCount,
queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
val dirs = new ZKGroupDirs(config.groupId)
// ② listener to consumer and partition changes
if (loadBalancerListener == null) {
val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString,
topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
}
// ① create listener for session expired event if not exist yet
if (sessionExpirationListener == null) sessionExpirationListener =
new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener)
// ③ create listener for topic partition change event if not exist yet
if (topicPartitionChangeListener == null)
topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
// listener to consumer and partition changes
zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
// register on broker partition path changes.
topicStreamsMap.foreach { topicAndStreams =>
zkUtils.zkClient.subscribeDataChanges(BrokerTopicsPath+"/"+topicAndStreams._1, topicPartitionChangeListener)
}
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance()
}
ZKRebalancerListener传入ZKSessionExpireListener和ZKTopicPartitionChangeListener.它们都会使用ZKRebalancerListener完成自己的工作.
ZKSessionExpireListener
当Session失效时,新的会话建立时,立即进行rebalance操作.
class ZKSessionExpireListener(val dirs: ZKGroupDirs, val consumerIdString: String,
val topicCount: TopicCount, val loadBalancerListener: ZKRebalancerListener) extends IZkStateListener {
def handleNewSession() {
loadBalancerListener.resetState()
registerConsumerInZK(dirs, consumerIdString, topicCount)
loadBalancerListener.syncedRebalance()
}
}
ZKTopicPartitionChangeListener
当topic的数据变化时,通过触发的方式启动rebalance操作.
class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener)
extends IZkDataListener {
def handleDataChange(dataPath : String, data: Object) {
loadBalancerListener.rebalanceEventTriggered()
}
}
来源:zqhxuyuan.github.io
原文:http://zqhxuyuan.github.io/2016/01/19/2016-01-19-Kafka-Consumer-scala/
如有侵权或不周之处,敬请劳烦联系若飞(微信:1321113940)马上删除,谢谢!
·END·
以上是关于kafra技术内幕:消费者初始化(scala)的主要内容,如果未能解决你的问题,请参考以下文章