调试自定义 Kafka 连接器的简单有效方法是啥?

Posted

技术标签:

【中文标题】调试自定义 Kafka 连接器的简单有效方法是啥?【英文标题】:What is a simple, effective way to debug custom Kafka connectors?调试自定义 Kafka 连接器的简单有效方法是什么? 【发布时间】:2018-01-24 20:07:44 【问题描述】:

我正在使用几个 Kafka 连接器,我在控制台输出中看不到它们的创建/部署有任何错误,但是我没有得到我正在寻找的结果(没有任何结果) ,希望或其他)。我基于 Kafka 的示例 FileStream 连接器制作了这些连接器,因此我的调试技术基于示例中使用的 SLF4J 记录器的使用。我已经搜索了我认为会在控制台输出中生成的日志消息,但无济于事。我是否在错误的地方寻找这些消息?或者也许有更好的方法来调试这些连接器?

我在实现中引用的 SLF4J 记录器的示例用法:

Kafka FileStreamSinkTask

Kafka FileStreamSourceTask

【问题讨论】:

【参考方案1】:

我将尝试以广泛的方式回答您的问题。进行连接器开发的简单方法如下:

通过查看众多公开可用的 Kafka 连接器之一来构建和构建您的连接器源代码(您可以在此处找到广泛的列表:https://www.confluent.io/product/connectors/) 从https://www.confluent.io/download/ 下载最新的 Confluent 开源版本 (>= 3.3.0)

通过以下方式之一使您的连接器包可用于 Kafka Connect:

    将所有连接器 jar 文件(连接器 jar 加上依赖项 jar,不包括 Connect API jar)存储到文件系统中的某个位置,并通过将此位置添加到 连接工作程序属性中的plugin.path 属性。例如,如果您的连接器 jar 存储在 /opt/connectors/my-first-connector 中,您将在工作人员的属性中设置 plugin.path=/opt/connectors(见下文)。 将所有连接器 jar 文件存储在 $CONFLUENT_HOME/share/java 下的文件夹中。例如:$CONFLUENT_HOME/share/java/kafka-connect-my-first-connector。 (需要以kafka-connect- 开头的前缀才能被启动脚本拾取)。 $CONFLUENT_HOME 是您安装 Confluent 平台的位置。

(可选)通过将 $CONFLUENT_HOME/etc/kafka/connect-log4j.properties 中的 Connect 的日志级别更改为 DEBUG 甚至 TRACE 来增加日志记录。

使用 Confluent CLI 启动所有服务,包括 Kafka Connect。详情在这里:http://docs.confluent.io/current/connect/quickstart.html

简述:confluent start

注意:CLI 当前加载的 Connect 工作器的属性文件是 $CONFLUENT_HOME/etc/schema-registry/connect-avro-distributed.properties。如果您选择启用类加载隔离,但如果您需要更改 Connect 工作线程的属性,则应该编辑该文件。

一旦你运行了 Connect worker,通过运行来启动你的连接器:

confluent load <connector_name> -d <connector_config.properties>

confluent load <connector_name> -d <connector_config.json>

连接器配置可以是 java 属性或 JSON 格式。

运行 confluent log connect 打开 Connect worker 的日志文件,或通过运行直接导航到存储日志和数据的位置

cd "$( confluent current )"

注意:通过适当地设置环境变量 CONFLUENT_CURRENT,在 Confluent CLI 会话期间更改日志和数据的存储位置。例如。鉴于/opt/confluent 存在并且是您要存储数据的位置,请运行:

export CONFLUENT_CURRENT=/opt/confluentconfluent current

最后,交互式调试连接器的一种可能方法是在使用 Confluent CLI 开始连接之前应用以下内容:

confluent stop connectexport CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;confluent start connect

然后与您的调试器连接(例如远程连接到 Connect worker(默认端口:5005)。要停止在调试模式下运行连接,只需在完成后运行:unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;

希望以上内容能让您的连接器开发更轻松,更有趣!

【讨论】:

嘿@Konstantine Karantasis,谢谢你的回答。我有机会尝试 Confluent,但我发现自己在尝试加载连接器时遇到了困难,您是否可以在该步骤中提供更多详细信息?我的连接器被打包在您指示的位置的 jar 中:neo4k.filestream.source.Neo4jFileStreamSourceConnector 和 neo4k.sink.Neo4jSinkConnector;并在 $CONFLUENT_HOME/etc/kafka-connect-neo4j 分别具有以下配置文件:connect-neo4j-file-source.properties 和 connect-neo4j-sink.properties。 没关系,我得到了加载命令来查找 .properties 文件。这只是正确的文件路径问题。 export KAFKA_DEBUG=y; 设置 KAFKA_DEBUG 对我有用,而不是 CONNECT_DEBUG。【参考方案2】:

我喜欢接受的答案。一件事 - 环境变量对我不起作用...我正在使用融合社区版 5.3.1...

这就是我所做的工作......

我从这里安装了 confluent cli: https://docs.confluent.io/current/cli/installing.html#tarball-installation

我使用命令 confluent local start 运行融合

我使用命令 ps -ef | grep connect 获得了连接应用程序的详细信息

我将生成的命令复制到编辑器并添加了 arg(在 java 之后):

-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

然后我停止使用命令confluent local stop connect连接

然后我使用 arg 运行连接命令

短暂的中场休息---

vs 代码开发由 gang of four 成名的 erich gamma 领导,他还编写了 eclipse。 vs code 正在成为一流的 java ide 见https://en.wikipedia.org/wiki/Erich_Gamma

中场休息---

接下来我启动了 vs code 并打开了 debezium oracle 连接器文件夹(从这里克隆)https://github.com/debezium/debezium-incubator

然后我选择了Debug - Open Configurations

并进入高亮的调试配置

然后运行调试器——它会命中你的断点!!

connect 命令应如下所示:

/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/logs -Dlog4j.configuration=file:/Users/myuserid/confluent-5.3.1/bin/../etc/kafka/connect-log4j.properties -cp /Users/myuserid/confluent-5.3.1/share/java/kafka/*:/Users/myuserid/confluent-5.3.1/share/java/confluent-common/*:/Users/myuserid/confluent-5.3.1/share/java/kafka-serde-tools/*:/Users/myuserid/confluent-5.3.1/bin/../share/java/kafka/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/dependant-libs-2.12.8/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/* org.apache.kafka.connect.cli.ConnectDistributed /var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/connect.properties

【讨论】:

环境变量应该可以工作,但它是 KAFKA_DEBUG github.com/apache/kafka/blob/trunk/bin/kafka-run-class.sh#L220【参考方案3】:

连接器模块由 kafka 连接器框架执行。对于调试,我们可以使用独立模式。我们可以将 IDE 配置为使用 ConnectStandalone 主函数作为入口点。

    创建调试配置如下。如果它是 maven 项目,需要记住勾选“包含具有“提供”范围的依赖项

    连接器属性文件需要指定连接器类名“connector.class”进行调试

    worker 属性文件可以从 kafka 文件夹 /usr/local/etc/kafka/connect-standalone.properties 复制

【讨论】:

以上是关于调试自定义 Kafka 连接器的简单有效方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

将 Lambda 连接到 AWS MSK 中的 kafka 主题的最经济有效的方法是啥?

我在哪里编写 kafka 连接接收器自定义分区器的代码?

从 HMT 连接表中获取属性的最有效方法是啥?

javafx是啥

在 Swift 中设置自定义 NavigationBar 的最简单方法是啥?

使用自定义属性最简单最优雅的方法是啥