如何设置 Kafka 连接器以在 Debezium 中使用自定义转换?
Posted
技术标签:
【中文标题】如何设置 Kafka 连接器以在 Debezium 中使用自定义转换?【英文标题】:How to set up Kafka-connector to use custom transform in Debezium? 【发布时间】:2020-11-28 01:02:55 【问题描述】:我是 Kafka 的新手,我不知道如何使用“transforms.router.type”使其与我的 Debezium 设置一起使用。 所以我做了特殊的事件转换 java 类并为它准备了配置以部署到容器,如下所示:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d
"name": "task-connector",
"config":
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"slot.name" : "task_engine_saga",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "tasks",
"schema.whitelist": "public",
"table.whitelist" : "public.task",
"tombstones.on.delete" : "false",
"transforms" : "router",
"transforms.router.type" : "com.task.connect.TaskEventRouter"
响应说找不到此配置。
CREATE kafka connector task-connector....
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1091 100 516 100 575 8322 9274 --:--:-- --:--:-- --:--:-- 17596"error_code":400,"message":"Connector configuration is invalid and contains the following 3 error(s):\nInvalid value com.task.connect.TaskEventRouter for configuration transforms.router.type: Class com.task.connect.TaskEventRouter could not be found.\nInvalid value null for configuration transforms.router.type: Not a Transformation\nA value is required\nYou can also find the above list of errors at the endpoint `/connector-plugins/connectorType/config/validate`"
然后,我将带有 jar 文件的主机文件夹复制到容器 /connect 目录,其中我的 java 类具有事件转换逻辑,但它也没有帮助。有人可以帮助我并告诉我应该怎么做才能使这个自定义的 transforms.router.type 与我的 Debezium 设置一起工作吗?
我的容器 docker-compose 设置:
version: '3'
services:
pgadmin:
container_name: pgadmin_container
image: dpage/pgadmin4
environment:
PGADMIN_DEFAULT_EMAIL: $PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org
PGADMIN_DEFAULT_PASSWORD: $PGADMIN_DEFAULT_PASSWORD:-admin
volumes:
- pgadmin:/root/.pgadmin
ports:
- "$PGADMIN_PORT:-5050:80"
restart: unless-stopped
zookeeper:
image: debezium/zookeeper:1.3
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:1.3
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: debezium/example-postgres:1.3
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- PGDATA=/data/postgres
- POSTGRES_DB=$POSTGRES_DB:-task_engine
connect:
image: debezium/connect:1.3
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
volumes:
postgres:
pgadmin:
【问题讨论】:
您的plugin.path
属性是什么样的?您是否尝试将包含自定义转换的 JAR 与包含 PostgresConnector
的 JAR 放在同一位置?
或者,您可以将包含自定义转换的 JAR 添加到 kafka lib
目录,但这不会那么干净。
@dnault 我的 connect-distribted.properties 中有 plugin.path=/kafka/connect。所以我也有 debezium-connector-postgres 文件夹并将 jar 文件放在那里,但仍然没有运气。尝试将 jar 放入 kafka/libs 相同的情况。在我将此文件放入kafka文件夹后,也许应该执行某种重新配置脚本?只有在那之后我才应该部署配置?
不清楚com.task.connect.TaskEventRouter
是什么...这是您自己添加的吗?因为这在 Debezium 中不存在,正如错误所暗示的那样
我创建了自定义 docker 镜像并复制了转换,它有帮助
【参考方案1】:
我做了特殊的事件转换java类
据我所知,您尚未将此编译后的类安装到您的 debezium 映像中,也没有编辑 plugin.path
文件以包含该 JAR
如果您已经使用该数据创建了自己的 Docker 映像,那么您应该更改 image: debezium/connect:1.3
这可以解释为什么转换不可用
【讨论】:
以上是关于如何设置 Kafka 连接器以在 Debezium 中使用自定义转换?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 debezium mysql 连接器 kafka 进行初始快照加载?
Debezium 如何使用 Kafka Connect 正确注册 SqlServer 连接器 - 连接被拒绝
当debezium连接器从你的sql服务器获取数据时,有没有办法限制kafka连接堆空间
我们如何从带有 debezium kafka 连接器的副本集中的辅助 mongodb 节点跟踪 oplog?
(KAFKA Oracle DEBEZIUM)无法连接:无法解析Oracle数据库版本
如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?