基于 debezium 内容的路由配置

Posted

技术标签:

【中文标题】基于 debezium 内容的路由配置【英文标题】:debezium content based routing configuration 【发布时间】:2021-03-26 14:37:45 【问题描述】:

我正在使用 confluent,所以我根据 confluent 文档安装了 dibezium 连接器,在 connect.properties 中使用 confluent-hub 我确实有条目

plugin.path=/usr/share/java,/opt/confluent-6.0.0/share/confluent-hub-components

我需要使用 io.debezium.transforms.ContentBasedRouter https://debezium.io/documentation/reference/1.3/configuration/content-based-routing.html

所以根据我下载的 debezium 文档debezium-scripting-1.3.1.Final.jar 并将其放入 /opt/confluent-6.0.0/share/confluent-hub-components/ 并将其复制到 /opt/confluent-6.0.0/share/confluent-hub-components/debezium-debezium-connector-sqlserver/lib 目录 这里是我的 mysql_src.json 连接器中的条目

"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "source.snapshot",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "value.__source_snapshot == 'false' ? 'test'"

当我尝试配置/加载此连接器时,我收到以下错误消息

[2020-12-15 22:18:45,351] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1369)
java.lang.NoClassDefFoundError: io/debezium/DebeziumException

有什么建议可以解决这个问题吗?

【问题讨论】:

您是否按照以下步骤操作:获取 JSR-223 脚本引擎实现并将其内容添加到您的 Kafka Connect 环境的 Debezium 插件目录中。? 将 debezium-scripting-1.3.1.Final.jar 复制到 /usr/share/java 相同的错误消息 您需要同时安装 debezium-scripting-1.3.1.Final.jar 和 JSR-223 脚本引擎实现(例如,对于 Groovy 3,您可以从 groovy-lang.org/. 下载其 JSR 223 实现) 我假设 debezium-scripting-1.3.1.Final.jar 具有所有必要的组件。你能指出我正确的文档/示例如何吗? 来自文档:debezium.io/documentation/reference/1.3/configuration/… - 请检查步骤 1 和 3。 【参考方案1】:

根据docs,您需要额外获取一个JSR-223脚本引擎实现并将其内容添加到您的Kafka Connect环境的Debezium插件目录中,因为:

Debezium 不附带任何 JSR 223 API 的实现。要将表达式语言与 Debezium 一起使用,您必须下载该语言的 JSR 223 脚本引擎实现,并将该语言实现使用的任何其他 JAR 文件添加到您的 Debezium 连接器插件目录中。

【讨论】:

不太可能是 JSR 223,我已将 groovy-jsr223-3.0.7-indy.jar 复制到 /opt/confluent-6.0.0/share/confluent-hub-components/debezium-debezium -connector-sqlserver/lib 并且仍然出现相同的错误,我已经从 connector.properties transforms=unwrap,route transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.add.fields= 中注释掉了 groovy 部分source.snapshot transforms.route.type=io.debezium.transforms.ContentBasedRouter #transforms.route.language=jsr223.groovy #transforms.route.topic.expression=value.snapshot == 'false' ? 'test' : null 仍然出错 在此之后您是否重新启动了 Kafka Connect? 是的,我做了,连接日志:[2020-12-16 08:01:13,649] INFO Loading plugin from: /opt/confluent-6.0.0/share/confluent-hub-components/debezium-scripting-1.3.1.Final.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246) [2020-12-16 08:01:13,655] INFO Registered loader: PluginClassLoaderpluginLocation=file:/opt/confluent-6.0.0/share/confluent-hub-components/debezium-scripting-1.3.1.Final.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) 我使用的是本地 CLI,会不会有问题?【参考方案2】:

我不确定配置是否正确,但我通过了第一个配置问题(我希望)我现在面临另一个问题,我将在不同的问题中进行描述。 我不知道出了什么问题,我确实关注了

    清理 zookeeper 目录 清理 kafka 目录 使用命令行启动/停止脚本以分布式模式运行 kafka(不使用 confluent cli) 这解决了java.lang.NoClassDefFoundError: io/debezium/DebeziumException 错误

【讨论】:

以上是关于基于 debezium 内容的路由配置的主要内容,如果未能解决你的问题,请参考以下文章

思科策略路由 PBR 详解

关于简单动态路由协议配置,注入,路由重分布

基于配置的 C# MVC 动态路由

k8s istio 配置请求的路由规则

CCNA学习笔记基础概念

动态路由协议(基于RIP协议 配置过程)