如何重命名/替换 Kafka-connect SMT 结构中的字段?
Posted
技术标签:
【中文标题】如何重命名/替换 Kafka-connect SMT 结构中的字段?【英文标题】:How to rename/replace a field within a struct in Kafka-connect SMT? 【发布时间】:2020-11-20 15:05:28 【问题描述】:description for replaceField SMT 说它可以Filter or rename fields within a Struct or Map.
但是我找不到任何用于替换或重命名结构中的字段的工作示例。
我有一个主题中的数据正在使用Kafka Connect Elasticsearch Sink 写入 ElasticSearch。为简单起见,假设数据格式如下所示。
'ID':22,
'ITEM': 'Shampoo'
'USER':
'NAME': 'jon',
'AGE':25
因此,如果我尝试重命名/替换 USER.NAME
或 USER.AGE
,我将如何在连接器中进行配置? (我已经在 ksqldb 中编写了所有内容)。这是我当前的配置,我将 ITEM
重命名为 product
和 ID
重命名为 id
CREATE SINK CONNECTOR ELASTIC_SINK WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://host.docker.internal:9200',
'type.name' = '_doc',
'topics' = 'ELASTIC_TOPIC',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'transforms' = 'RenameField',
'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.RenameField.renames' = 'ITEM:product,ID:id',
);
【问题讨论】:
这能回答你的问题吗? How to transform and extract fields in Kafka sink JDBC connector 【参考方案1】:看看现有的SO问答:https://***.com/a/56601093/4778022
您可以提供要重命名的字段的路径,部分以句点分隔。
CREATE SINK CONNECTOR ELASTIC_SINK WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://host.docker.internal:9200',
'type.name' = '_doc',
'topics' = 'ELASTIC_TOPIC',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'transforms' = 'RenameField',
'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.RenameField.renames' = 'USER.NAME:name,ITEM:product,ID:id',
);
【讨论】:
以上是关于如何重命名/替换 Kafka-connect SMT 结构中的字段?的主要内容,如果未能解决你的问题,请参考以下文章
如何通过在 Windows 中使用批处理替换子字符串来重命名文件 [关闭]
kafka-connect JDBC PostgreSQL Sink Connector 显式定义 PostgrSQL 模式(命名空间)