SMT 通过连接器配置创建 kafka 连接器字符串分区键

Posted

技术标签:

【中文标题】SMT 通过连接器配置创建 kafka 连接器字符串分区键【英文标题】:SMT's to create kafka connector string partition key through connector config 【发布时间】:2021-09-06 01:40:51 【问题描述】:

我一直在为 PostgreSQL 实现一个 kafka 连接器(我正在使用 debezium kafka 连接器并通过 docker 运行所有部分)。我需要一个自定义分区键,所以我一直在使用 SMT 来实现这一点。但是,我使用的方法创建了一个结构,我需要它是一个字符串。这个article 贯穿了如何将分区键设置为int,但我无法访问配置文件来设置适当的转换。目前我的kafka连接器是这样的

 curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
    "name": "connector",
    "config": 
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname" : "postgres",
        "database.server.name": "postgres",
        "table.include.list": "public.table",
        "database.history.kafka.bootstrap.servers": "kafka:9092",  
        "database.history.kafka.topic": "schema-changes.table",
        "transforms": "routeRecords,unwrap,createkey",
        "transforms.routeRecords.type":  "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.routeRecords.regex": "(.*)",
        "transforms.routeRecords.replacement": "table",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.createkey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createkey.fields": "id"
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    
'

我知道我必须提取列的值,但我不确定如何。

【问题讨论】:

请告诉我们您尝试提取列的值 @Todd 是上述配置 sn-p "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.createkey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.createkey.fields": "id" 中的 unwrapcreateKey 转换。但就像我上面所说的,这会创建一个具有id 值的结构。但我需要密钥只能是字符串值 【参考方案1】:

ValueToKey 从字段列表中创建一个结构体,正如它所记录的那样。

您需要再进行一次转换才能从 Struct 中提取 特定 字段,如链接帖子中所示。

org.apache.kafka.connect.transforms.ExtractField$Key

注意:这不会“设置”实际 Kafka 记录的分区,只“设置”键,然后由 Producer 散列以获取分区

【讨论】:

啊,是的,这就解释了!谢谢,我试试看:) 如果这解决了您的问题,请随时使用帖子旁边的复选标记接受答案

以上是关于SMT 通过连接器配置创建 kafka 连接器字符串分区键的主要内容,如果未能解决你的问题,请参考以下文章

Kafka SMT ValueToKey - 如何使用多个值作为键?

Debezium Kafka 连接器 mongodb

Kafka 消息包含控制字符(MongoDB 源连接器)

是否可以仅将 SMT(单消息转换)应用于来自指定主题的消息

kafka参数配置

kafka权限控制