kafkaStream

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafkaStream相关的知识,希望对你有一定的参考价值。

参考技术A 基本概念
名称 解释
Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
Topic Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
Producer 消息生产者,向Broker发送消息的客户端
Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup 每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition 物理上的概念,一个topic可以分为多个partition,每个partition内部是有序的
Consumer Group
consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
group.id是一个字符串,唯一标识一个consumer group
consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)
Rebalance
触发Rebalance的条件

组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了-这两者的区别后面会谈到)
订阅主题数发生变更-这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
订阅主题的分区数发生变更
均衡策略:

range和round-robin
Coordinator
确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式:

__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)

注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。该分区leader所在的broker就是被选定的coordinator

Coordinator同Consumer Group之间的协议:

Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着
LeaveGroup请求:主动告诉coordinator我要离开consumer group
SyncGroup请求:group leader把分配方案告诉组内所有成员
JoinGroup请求:成员请求加入组
DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

Kafka Streaming
Stream Processing Topology
1、stream是Kafka Stream最重要的抽象,它代表了一个无限持续的数据集。stream是有序的、可重放消息、对不可变数据集支持故障转移
2、一个stream processing application由一到多个processor topologies组成,其中每个processor topology是一张图,由多个streams(edges)连接着多个stream processor(node)
3、一个stream processor是processor topology中的一个节点,它代表一个在stream中的处理步骤:从上游processors接受数据、进行一些处理、最后发送一到多条数据到下游processors

Kafka Stream提供两种开发stream processing topology的API

1、high-level Stream DSL:提供通用的数据操作,如map和fileter
2、lower-level Processor API:提供定义和连接自定义processor,同时跟state store(下文会介绍)交互

Kafka-Streams的一些亮点:

Source Processor:它是一种特殊的stream processor,没有上游processor。它通过消费一个或者多个topic中的记录为自身的拓扑生成输入流,然后将他们转发到下游processor
Sink Processors:也是一种特殊的stream processor,他没有下游processor。它发送从上游processor接收到的所有记录到一个特定的Kakfa Topic

Time
Event Time:
Processiong Time:
Ingestion Time:

Kafka Stream基于TimestampExtractor接口获取对应data记录的时间戳信息;

State
无状态是指消息的处理不依赖与其他消息,而有状态则相反;比如join/groupby操作均为有状态;

Kafka Streams提供State Store机制;

源码分析
运行架构
TopologyBuilder builder = new TopologyBuilder();

builder.addSource("Source", "streams-file-input");

builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");

builder.addSink("Sink", "streams-wordcount-processor-output", "Process");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(5000L);

streams.close();

TopologyBuilder: 构建Kafka Topology关系,主要方法有addSource()/addProcessor()/addStateStore()/addSink();
KafkaStreams: 为Kafka Streams运行上下文,开启关闭整个应用;

KafkaStreams启动流程:

GlobalStreamThread
StreamThread
疑问: Topic+Partition -> Thread -> Task之间的关系是什么?如何进行调度的?

TopologyBuilder构建Topology过程
public synchronized ProcessorTopology buildGlobalStateTopology()
final Set<String> globalGroups = globalNodeGroups();
if (globalGroups.isEmpty())
return null;

return build(globalGroups);


名称 类型 含义
nodeToSourceTopics HashMap
运维
惊群与脑裂
这两种现象存在于>=0.9的版本中,当时consumer是依赖zookeeper的。

惊群: 任何Broker和Consumer的增减都会触发所有Consumer的Rebalance,导致同一个group里不同的consumer拥有了相同的partition,进而引起Consumer的消费错乱;
脑裂: 每个consumer单独通过zookeeper判断哪些broker和consumer宕机,不同的consumer在同一个时刻看到的view可能不一致,导致Rebanlance;

计算
参考:

KafkaStreams 无一例外地关闭

【中文标题】KafkaStreams 无一例外地关闭【英文标题】:KafkaStreams shuts down with no exceptions 【发布时间】:2018-06-18 12:16:42 【问题描述】:

我有四个使用相同应用程序 ID 运行的 Kafka 流应用程序实例。所有输入主题都属于单个分区。为了实现可扩展性,我通过一个具有多个分区的中间虚拟主题传递了它。我已将request.timeout.ms 设置为 4 分钟。

Kafka 实例进入 ERROR 状态,没有抛出任何异常。很难弄清楚确切的问题是什么。有什么想法吗?

[INFO ] 2018-01-09 12:30:11.579 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:939 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] Shutting down
[INFO ] 2018-01-09 12:30:11.579 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:888 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN.
[INFO ] 2018-01-09 12:30:11.595 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[INFO ] 2018-01-09 12:30:11.605 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:972 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] Stream thread shutdown complete
[INFO ] 2018-01-09 12:30:11.605 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:888 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD.
[WARN ] 2018-01-09 12:30:11.605 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] KafkaStreams:343 - stream-client [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4] All stream threads have died. The Kafka Streams instance will be in an error state and should be closed.
[INFO ] 2018-01-09 12:30:11.605 [new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] KafkaStreams:268 - stream-client [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4] State transition from RUNNING to ERROR.

【问题讨论】:

尝试注册UncaughtExceptionHandler 以获取更多详细信息:docs.confluent.io/current/streams/developer-guide/… 或将日志级别提高到 DEBUG 是的!日志级别已经处于调试模式,并且有一个 uncaughtExceptionHandler 已经注册到 kafka 流 - 仍然没有记录任何内容。 这很奇怪...Shutting down 消息之前记录了什么? 我有一个自定义的流分区器。从中不断写入一条日志行“EventStreamPartitioner:20 - 代码'isro'和分区'109'”。 我有一个消费者偏移重置工具,它使用 OffsetCommitRequest 更改主题的偏移。我停止了应用程序并使用该工具更改了输入主题和中间主题的偏移量。我有两个经纪人,我将请求发送给了一个经纪人。然后重新启动应用程序,之后它不断进入 ERROR 状态。我认为偏移量更改导致日志文件损坏。一旦我更改了消费者组 ID,它就起作用了。但我仍然不确定偏移量更改后 kafka 日志文件损坏的原因是什么 【参考方案1】:

Thea asker 在 cmets 中分享了他的解决方案:

一旦我更改了消费者组 ID,它就起作用了。

还值得注意的是,最近的版本中已经引入了相关的问题(可能有也可能没有相同的根本原因),现在在 Kafka 版本 2.5.1 和 2.6 中看起来也是fixed。 0 以上。

因此,今天遇到这种情况的人可能想检查他们的版本是否足够高(或低)以避免此问题。

【讨论】:

【参考方案2】:

您可能还需要将 default.production.exception.handler Kafka Streams 属性设置为实现 ProductionExceptionHandler 的类,并且与默认类 DefaultProductionExceptionHandler 不同,它会在触发永久故障状态之前记录错误。

【讨论】:

以上是关于kafkaStream的主要内容,如果未能解决你的问题,请参考以下文章