Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis
Posted
技术标签:
【中文标题】Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis【英文标题】:Kafka Connect Sink - From: Avro Topic, To: Json -> Redis 【发布时间】:2021-09-08 09:11:45 【问题描述】:我有一个环境,我使用一个 Kafka Connect Worker,它使用来自 Oracle 数据库的一些数据,然后将其推送到 Avro 格式的 Kafka 主题中。
现在,我需要创建一个 Kafka Connect Sink 来使用此 AVRO 消息,将其转换为 Json,然后将其写入 Redis 数据库。
到目前为止,我只能在 Redis 上写入我从主题中使用的相同 AVRO 消息。我曾尝试使用转换器,但我可能误解了它们的用法。
下面是我对工人和水槽的配置。
"name": "SOURCE",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "createKey, extractStr",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "ID",
"transforms.extractStr.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractStr.field": "ID",
"connection.url": "<>",
"connection.user": "<>",
"connection.password": "<>",
"table.whitelist": "V_TEST_C",
"schema.pattern": "<>",
"numeric.mapping": "best_fit",
"mode": "timestamp+incrementing",
"incrementing.column.name": "CID",
"timestamp.column.name": "TS_ULT_ALT",
"validate.non.null": "false",
"table.types": "VIEW",
"retention.ms":12000,
"poll.interval.ms": "30000",
"topic.prefix": "TEST.",
"value.converter.schema.registry.url": "<>"
下沉
"name": "SINK",
"config":
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"topics": "V_TEST_C",
"redis.hosts": "redis:6379",
"schema.registry.url": "<>",
"value.converter.schema.registry.url": "<>",
"value.converter.schemas.enable":"false",
"key.converter.schemas.enable":"false",
"insert.mode": "UPSERT",
"delete.enabled": "false",
"quote.sql.identifier": "never"
【问题讨论】:
您的源连接器使用的是 JSON,而不是 Avro... 无论如何,转换器不会像您想象的那样“转换”(在序列化格式之间)。它将这些类型(JSON/Avro)转换为 Connect Framework 内部的Struct
和 Schema
类,而 Sink 和 Source 类使用它们来实际读取/写入数据
【参考方案1】:
在 Avro 格式的 Kafka 主题中推送它
这不是你发布的内容
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
在任何情况下,您都不能使用该 Redis 连接器存储字符串或字节之外的任何内容
From docs(强调)
此连接器希望来自 Kafka 的记录具有以字节或字符串形式存储的键和值。如果您的数据已经以 Redis 中所需的格式保存在 Kafka 中考虑为此连接器使用 ByteArrayConverter 或 StringConverter
因此,无法使用 AvroConverter,因为它会生成结构化对象,而不是字符串或字节的架构。
注意:禁用模式的 JSONConverter 实际上与处理 JSON 数据的字符串转换器的行为相同。 但是,如果将 StringConverter 与 Avro 数据一起使用,您最终可能会得到像 Structfoo=bar
这样的数据,但您仍然需要在某个级别运行 Avro 反序列化器,所以我不确定您是否真的可以使用带有 Avro 主题数据的 Json 或 StringConverter。
【讨论】:
我的错误。这是我的测试之一,但实际信息是使用 Avro。但我明白答案。谢谢。以上是关于Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?
Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?
即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败
使用JsonConverter的Kafka Connect HDFS Sink for JSON格式