以独立模式运行 Kafka 连接,存在偏移问题
Posted
技术标签:
【中文标题】以独立模式运行 Kafka 连接,存在偏移问题【英文标题】:Running Kafka connect in standalone mode, having issues with offsets 【发布时间】:2022-01-12 20:35:07 【问题描述】:我正在使用我找到的这个 Github 存储库和文件夹路径:https://github.com/entechlog/kafka-examples/tree/master/kafka-connect-standalone 以独立模式在本地运行 Kafka 连接。我对 Docker compose 文件进行了一些更改,但主要是与身份验证有关的更改。
我现在遇到的问题是,当我运行 Docker 映像时,对于每个分区(有 10 个,0 到 9),我多次收到此错误:
[2021-12-07 19:03:04,485] INFO [bq-sink-connector|task-0] [Consumer clientId=connector- consumer-bq-sink-connector-0, groupId=connect-bq-sink-connector] Found no committed offset for partition <topic name here>-<partition number here> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1362)
我认为验证或连接到端点没有任何问题,我认为消费者(连接接收器)没有发回偏移量。
我是否缺少环境变量?你会看到这个 docker compose 文件有CONNECT_OFFSET_STORAGE_FILE_FILENAME: /tmp/connect.offsets
,我尝试添加CONNECTOR_OFFSET_STORAGE_FILE_FILENAME: /tmp/connect.offsets
(CONNECT_ vs. CONNECTOR_)然后我得到一个错误Failed authentication with <Kafka endpoint here>
,所以现在我只是在绕圈子。
【问题讨论】:
独立模式只能使用本地偏移文件。阅读此docs.confluent.io/home/connect/userguide.html#standalone-mode。local
表示不在 kafka 经纪人的话题上。 AND INFO
不是错误。
@J.Song 我知道它使用本地偏移文件。所以对我来说,没有提交的偏移量是有道理的,因为它是一个“空”的本地临时文件。那么如果没有提交的偏移量,我该如何开始消费主题呢?如果从来没有最后一个已知的偏移量,如何以独立模式运行以进行测试?不过我不确定我是否理解你的最后一句话。
如前所述,我认为没有理由在运行分布式模式的现有 Confluent 容器上使用这个特定容器。为什么你不想使用它们?
关于您提到的错误,CONNECTOR_
适用于 connector 独立属性文件(BQ Sink)。 CONNECT_
用于 worker(offset.storage.file.name
属性实际上存在)
@OneCricketeer 我仍在学习很多关于 Kafka 及其工作原理的知识,我确实了解独立与分布式,但我了解独立对于本地测试非常有用,这就是我正在做的。我正在运行 POC,我必须在这里学习。您能否指出一些易于在 Docker 中运行的开箱即用的东西,我可以使用 AWS MSK IAM 身份验证进行配置,允许我在本地机器上运行以便进行测试,并且可以与 wepay BigQuery 连接器一起使用已经创建了?我想比较一下不同之处,以便我能完全理解。
【参考方案1】:
我认为你的注意力集中在错误的输出上。
-
这是一条 INFO 消息
偏移文件(或分布式模式下的主题)用于源连接器。
Sink 连接器使用消费者组。如果没有找到groupId=connect-bq-sink-connector
的偏移量,那么消费者组没有提交。
【讨论】:
那么我怎样才能“询问”我的消费者组接收器连接器,即connect-bq-sink-connector
,提交一个偏移量,以便消费者可以开始接收来自主题的消息?我猜这就是完成这项工作所需要的吗?我认为在分布式模式下运行会有所帮助,但这样做时会出现实际错误。
偏移提交通常在接收器连接器中自动完成,您应该在日志中查找 ERROR
或 WARN
以查找它没有的实例
您可以使用kafka-consumer-groups --describe --group connect-bq-sink-connector
来检查提交的偏移量
我相信日志中没有任何其他错误。我可以检查WARN。但是我应该寻找什么样的东西来表明没有偏移提交?还有那个命令,我应该在 kafka-connect 容器中这样做吗?
该命令可以在安装了 Kafka 的任何地方运行,并且可以连接到代理。想到 IIRC,Failed to commit
到 TimeoutException
。显然,如果启动连接器时主题中没有数据,那么无论如何都不会提交任何偏移量。所以,你应该先启动连接器,然后产生一些数据。以上是关于以独立模式运行 Kafka 连接,存在偏移问题的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 连接器 Elasticsearch topics.regex
具有多个连接器和一个主题的分布式 Kafka Connect