Kafka Connect - 如何发送和处理嵌套的 JSON?

Posted

技术标签:

【中文标题】Kafka Connect - 如何发送和处理嵌套的 JSON?【英文标题】:Kafka Connect - how to send and handle nested JSON? 【发布时间】:2021-04-06 02:27:31 【问题描述】:

我有一条消息,它适用于我的 Redis Sink 连接器(连接器将价值赋予 Redis):

    
  "schema": 
    "type": "struct",
    "fields": [
      
        "type": "int64",
        "optional": false,
        "field": "registertime"
      ,
      
        "type": "string",
        "optional": false,
        "field": "userid"
      ,
      
        "type": "string",
        "optional": false,
        "field": "regionid"
      ,
      
        "type": "string",
        "optional": false,
        "field": "after"
      
    ],
    "optional": false,
    "name": "ksql.users"
  ,
  "payload": 
    "registertime": 1493819497170,
    "userid": "User_1",
    "regionid": "Region_5",
    "after": "MALE"
  

但我希望将字段“之后”作为嵌套对象:

    "after": 
        "one": null,
        "two": "one"
    

并在此基础上处理数据(即,如果“one”为空,则跳过)。

所以我有一个连接器:


    "name": "connector1",
    "config": 
        "topics": "topic1",
        "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
        "tasks.max": "1",
        "connect.redis.error.policy": "NOOP",
        "connect.redis.host": "localhost",
        "connect.redis.port": "6379",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "connect.redis.kcql": "INSERT INTO prod- SELECT * FROM topic1 PK after;"
    

我想从消息中的嵌套 json 中提取数据。 我的命令引以为豪:

topc=topic1
message=message.json
echo "key:$(jq -rc . $message)" | $kafka_dir/bin/kafka-console-producer.sh --topic $topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

我如何发送一个嵌套的 json 对象,我如何通过 Transforms 从中提取一个字段,并且基于它的值处理与否?

【问题讨论】:

【参考方案1】:

发送嵌套数据的工作方式与发送常规消息相同。

您应该能够更新消息以包含类似此架构信息的内容,用于 after 字段

"type": "struct", 
"fields": ["field": "one", "optional": false, "type":"string",... ], 
"optional": false, 
"field": "after"

然后相应地更新有效负载。

我个人从未使用过 JSONConverter 架构/有效负载类型,因为 Avro 更适合这种情况


据我所知,Kafka Connect 不能跳过消息;它会处理所有这些。也没有用于提取深度超过 1 的任意嵌套值的内置转换,因此获取 after 结构中的字段 可能是一个问题。但是,您可以通过修改此特定连接器的 SELECT * FROM topic1 KCQL 语句来获得它

一般来说,如果您需要这样的逻辑,您会使用流处理器(例如 KSQL 或 Kafka Streams)在转储到数据库之前过滤/修改主题

【讨论】:

谢谢,但 AFAIK Kafka Connect 可以跳过消息 - 通过转换,例如 ExtractField($Key 或 $Value),然后是 Tombstone 或过滤器($Key 或 $Value)。我的问题是如何发送有效负载嵌套对象 - 而不是字符串(正如我试图展示的那样)。我想要获得的是不是在字符串上而是在对象“one”:“two”,“three”:“four”上使用Transforms。如何发送它,在 ExtractField$Value 之后拥有另一个对象,然后通过下一个转换(例如 Tombstone 或 Filter)处理对象,而不是字符串 对于 kafka-console-producer 来说,生成嵌套数据与单行文本没有任何不同。如果您希望嵌套 after 字段,它需要是 Struct 模式类型,而不是 "type": "string",并且根据您的生产者控制的有效负载更新,而不是 Connect 或转换 是的,我在问如何生成这样的有效负载,因为如果我在架构中将string 更改为object,连接器会显示错误,它不理解这种类型。所以我的问题是,这样的消息(有效负载)应该是什么样子 你尝试过这样的事情吗? "type": "struct", "fields": ["field": "one", "type":"string",... ], "field": "after"?否则,我建议您在不需要架构和有效负载的情况下使用 Avro 而不是 JSON 哦,我有!我使用的是连接器 com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector insted of com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector 所以这是问题的原因

以上是关于Kafka Connect - 如何发送和处理嵌套的 JSON?的主要内容,如果未能解决你的问题,请参考以下文章

如何在没有 Confluent 的情况下使用 Kafka Connect 从 Kafka 向 AWS S3 发送数据?

Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?

Kafka-Connect实践

如何隐藏 Kafka connect api 未处理异常的堆栈跟踪

kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是不是可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?

一文读懂Kafka Connect核心概念