Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis

Posted

技术标签:

【中文标题】Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis【英文标题】:Kafka Connect Sink - From: Avro Topic, To: Json -> Redis 【发布时间】:2021-09-08 09:11:45 【问题描述】:

我有一个环境,我使用一个 Kafka Connect Worker,它使用来自 Oracle 数据库的一些数据,然后将其推送到 Avro 格式的 Kafka 主题中。

现在,我需要创建一个 Kafka Connect Sink 来使用此 AVRO 消息,将其转换为 Json,然后将其写入 Redis 数据库。

到目前为止,我只能在 Redis 上写入我从主题中使用的相同 AVRO 消息。我曾尝试使用转换器,但我可能误解了它们的用法。

下面是我对工人和水槽的配置。


    "name": "SOURCE",
    "config": 
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "transforms": "createKey, extractStr",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "ID",
        "transforms.extractStr.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractStr.field": "ID",
        "connection.url": "<>",
        "connection.user": "<>",
        "connection.password": "<>",
        "table.whitelist": "V_TEST_C",
        "schema.pattern": "<>",
        "numeric.mapping": "best_fit",
        "mode": "timestamp+incrementing",
        "incrementing.column.name": "CID",
        "timestamp.column.name": "TS_ULT_ALT",
        "validate.non.null": "false",
        "table.types": "VIEW",
        "retention.ms":12000,
        "poll.interval.ms": "30000",
        "topic.prefix": "TEST.",
        "value.converter.schema.registry.url": "<>"
    

下沉


  "name": "SINK",
  "config": 
    "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "1",
    "topics": "V_TEST_C",
    "redis.hosts": "redis:6379",
    "schema.registry.url": "<>",
    "value.converter.schema.registry.url": "<>",
    "value.converter.schemas.enable":"false",
    "key.converter.schemas.enable":"false",
    "insert.mode": "UPSERT",
    "delete.enabled": "false",
    "quote.sql.identifier": "never"
  

【问题讨论】:

您的源连接器使用的是 JSON,而不是 Avro... 无论如何,转换器不会像您想象的那样“转换”(在序列化格式之间)。它将这些类型(JSON/Avro)转换为 Connect Framework 内部的 StructSchema 类,而 Sink 和 Source 类使用它们来实际读取/写入数据 【参考方案1】:

在 Avro 格式的 Kafka 主题中推送它

这不是你发布的内容

"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",

在任何情况下,您都不能使用该 Redis 连接器存储字符串或字节之外的任何内容

From docs(强调)

此连接器希望来自 Kafka 的记录具有以字节或字符串形式存储的键和值。如果您的数据已经以 Redis 中所需的格式保存在 Kafka 中考虑为此连接器使用 ByteArrayConverter 或 StringConverter

因此,无法使用 AvroConverter,因为它会生成结构化对象,而不是字符串或字节的架构。

注意:禁用模式的 JSONConverter 实际上与处理 JSON 数据的字符串转换器的行为相同。 但是,如果将 StringConverter 与 Avro 数据一起使用,您最终可能会得到像 Structfoo=bar 这样的数据,但您仍然需要在某个级别运行 Avro 反序列化器,所以我不确定您是否真的可以使用带有 Avro 主题数据的 Json 或 StringConverter。

【讨论】:

我的错误。这是我的测试之一,但实际信息是使用 Avro。但我明白答案。谢谢。

以上是关于Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?

Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?

即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败

使用JsonConverter的Kafka Connect HDFS Sink for JSON格式

如何在 Kafka Connect Sink 中指定 Kafka 主题的分区

Kafka-Connect:启动 S3 Sink 连接器时出现无法识别的错误