调试自定义 Kafka 连接器的简单有效方法是啥?
Posted
技术标签:
【中文标题】调试自定义 Kafka 连接器的简单有效方法是啥?【英文标题】:What is a simple, effective way to debug custom Kafka connectors?调试自定义 Kafka 连接器的简单有效方法是什么? 【发布时间】:2018-01-24 20:07:44 【问题描述】:我正在使用几个 Kafka 连接器,我在控制台输出中看不到它们的创建/部署有任何错误,但是我没有得到我正在寻找的结果(没有任何结果) ,希望或其他)。我基于 Kafka 的示例 FileStream 连接器制作了这些连接器,因此我的调试技术基于示例中使用的 SLF4J 记录器的使用。我已经搜索了我认为会在控制台输出中生成的日志消息,但无济于事。我是否在错误的地方寻找这些消息?或者也许有更好的方法来调试这些连接器?
我在实现中引用的 SLF4J 记录器的示例用法:
Kafka FileStreamSinkTask
Kafka FileStreamSourceTask
【问题讨论】:
【参考方案1】:我将尝试以广泛的方式回答您的问题。进行连接器开发的简单方法如下:
通过查看众多公开可用的 Kafka 连接器之一来构建和构建您的连接器源代码(您可以在此处找到广泛的列表:https://www.confluent.io/product/connectors/) 从https://www.confluent.io/download/ 下载最新的 Confluent 开源版本 (>= 3.3.0)通过以下方式之一使您的连接器包可用于 Kafka Connect:
-
将所有连接器 jar 文件(连接器 jar 加上依赖项 jar,不包括 Connect API jar)存储到文件系统中的某个位置,并通过将此位置添加到
连接工作程序属性中的
plugin.path
属性。例如,如果您的连接器 jar 存储在 /opt/connectors/my-first-connector
中,您将在工作人员的属性中设置 plugin.path=/opt/connectors
(见下文)。
将所有连接器 jar 文件存储在 $CONFLUENT_HOME/share/java
下的文件夹中。例如:$CONFLUENT_HOME/share/java/kafka-connect-my-first-connector
。 (需要以kafka-connect-
开头的前缀才能被启动脚本拾取)。 $CONFLUENT_HOME 是您安装 Confluent 平台的位置。
(可选)通过将 $CONFLUENT_HOME/etc/kafka/connect-log4j.properties
中的 Connect 的日志级别更改为 DEBUG
甚至 TRACE
来增加日志记录。
使用 Confluent CLI 启动所有服务,包括 Kafka Connect。详情在这里:http://docs.confluent.io/current/connect/quickstart.html
简述:confluent start
注意:CLI 当前加载的 Connect 工作器的属性文件是
$CONFLUENT_HOME/etc/schema-registry/connect-avro-distributed.properties
。如果您选择启用类加载隔离,但如果您需要更改 Connect 工作线程的属性,则应该编辑该文件。
一旦你运行了 Connect worker,通过运行来启动你的连接器:
confluent load <connector_name> -d <connector_config.properties>
或
confluent load <connector_name> -d <connector_config.json>
连接器配置可以是 java 属性或 JSON 格式。
运行
confluent log connect
打开 Connect worker 的日志文件,或通过运行直接导航到存储日志和数据的位置
cd "$( confluent current )"
注意:通过适当地设置环境变量
CONFLUENT_CURRENT
,在 Confluent CLI 会话期间更改日志和数据的存储位置。例如。鉴于/opt/confluent
存在并且是您要存储数据的位置,请运行:
export CONFLUENT_CURRENT=/opt/confluent
confluent current
最后,交互式调试连接器的一种可能方法是在使用 Confluent CLI 开始连接之前应用以下内容:
confluent stop connect
export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
confluent start connect
然后与您的调试器连接(例如远程连接到 Connect worker(默认端口:5005)。要停止在调试模式下运行连接,只需在完成后运行:unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;
。
希望以上内容能让您的连接器开发更轻松,更有趣!
【讨论】:
嘿@Konstantine Karantasis,谢谢你的回答。我有机会尝试 Confluent,但我发现自己在尝试加载连接器时遇到了困难,您是否可以在该步骤中提供更多详细信息?我的连接器被打包在您指示的位置的 jar 中:neo4k.filestream.source.Neo4jFileStreamSourceConnector 和 neo4k.sink.Neo4jSinkConnector;并在 $CONFLUENT_HOME/etc/kafka-connect-neo4j 分别具有以下配置文件:connect-neo4j-file-source.properties 和 connect-neo4j-sink.properties。 没关系,我得到了加载命令来查找 .properties 文件。这只是正确的文件路径问题。export KAFKA_DEBUG=y;
设置 KAFKA_DEBUG 对我有用,而不是 CONNECT_DEBUG。【参考方案2】:
我喜欢接受的答案。一件事 - 环境变量对我不起作用...我正在使用融合社区版 5.3.1...
这就是我所做的工作......
我从这里安装了 confluent cli: https://docs.confluent.io/current/cli/installing.html#tarball-installation
我使用命令 confluent local start
运行融合
我使用命令 ps -ef | grep connect
获得了连接应用程序的详细信息
我将生成的命令复制到编辑器并添加了 arg(在 java 之后):
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
然后我停止使用命令confluent local stop connect
连接
然后我使用 arg 运行连接命令
短暂的中场休息---
vs 代码开发由
gang of four
成名的 erich gamma 领导,他还编写了 eclipse。 vs code 正在成为一流的 java ide 见https://en.wikipedia.org/wiki/Erich_Gamma
中场休息---
接下来我启动了 vs code 并打开了 debezium oracle 连接器文件夹(从这里克隆)https://github.com/debezium/debezium-incubator
然后我选择了Debug - Open Configurations
并进入高亮的调试配置
然后运行调试器——它会命中你的断点!!
connect 命令应如下所示:
/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/logs -Dlog4j.configuration=file:/Users/myuserid/confluent-5.3.1/bin/../etc/kafka/connect-log4j.properties -cp /Users/myuserid/confluent-5.3.1/share/java/kafka/*:/Users/myuserid/confluent-5.3.1/share/java/confluent-common/*:/Users/myuserid/confluent-5.3.1/share/java/kafka-serde-tools/*:/Users/myuserid/confluent-5.3.1/bin/../share/java/kafka/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/dependant-libs-2.12.8/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/* org.apache.kafka.connect.cli.ConnectDistributed /var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/connect.properties
【讨论】:
环境变量应该可以工作,但它是 KAFKA_DEBUG github.com/apache/kafka/blob/trunk/bin/kafka-run-class.sh#L220【参考方案3】:连接器模块由 kafka 连接器框架执行。对于调试,我们可以使用独立模式。我们可以将 IDE 配置为使用 ConnectStandalone 主函数作为入口点。
创建调试配置如下。如果它是 maven 项目,需要记住勾选“包含具有“提供”范围的依赖项
连接器属性文件需要指定连接器类名“connector.class”进行调试
worker 属性文件可以从 kafka 文件夹 /usr/local/etc/kafka/connect-standalone.properties 复制【讨论】:
以上是关于调试自定义 Kafka 连接器的简单有效方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章
将 Lambda 连接到 AWS MSK 中的 kafka 主题的最经济有效的方法是啥?