Kafka Connect:如何将String解析为Map

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Connect:如何将String解析为Map相关的知识,希望对你有一定的参考价值。

假设我有一个文件填充了由新行字符(JSON)分隔的 对象/行。当基于FileStreamSource的连接器读取此文件时,它会将每一行视为java.lang.String

如何将这个java.lang.String解析为java.util.Mapstruct以执行进一步的转换(例如使用MaskField屏蔽字段或使用ExtractField提取字段)?

PS:问题不在于如何将一些java.lang.String解析为java.util.Mapstruct,而是关于如何将这种解析逻辑与Kafka(自定义Kafka转换?)集成或通过其他方式获得相同的结果(例如在Kafka中配置或使用特定的连接器/转换等)

答案

正如Apache Kafka文档所述,FileStreamSource并不完全是生产支持的连接器......

也许你最好使用spooldir连接器,它支持行分隔JSON https://github.com/jcustenborder/kafka-connect-spooldir/blob/master/README.md

另一答案

有两种可能的方法:

  1. 您可以使用Confluent Platform并使用适当的KSQL查询(https://docs.confluent.io/current/ksql/docs/tutorials/index.html#ksql-tutorials)运行连接器。
  2. 您可以使用源连接器启动Kafka Stream应用程序(https://kafka.apache.org/documentation/streams/)。流应用程序将从连接器放置消息的主题/ -s中读取消息。您需要在Kafka流应用程序中实现转换逻辑。处理消息时,Stream应用程序将其置于输出主题。下面是流应用程序代码的示例结构。
Properties props = new Properties();

...

final StreamsBuilder builder = new StreamsBuilder();
Pattern pattern = Pattern.compile(<YOUR_INPUT_TOPIC_PATTERN>);
KStream<String, String> source = builder.stream(pattern);

...

source.mapValues((k,v) -> {
     Gson gson = new Gson();
     Map map = gson.fromJson(v, Map.class);

     // here is your transformation logi

     return v;
}).to(<YOUR_OUTPUT_TOPIC>);

...

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);

...

streams.start();

以上是关于Kafka Connect:如何将String解析为Map的主要内容,如果未能解决你的问题,请参考以下文章

如何将 from_json 与 Kafka connect 0.10 和 Spark Structured Streaming 一起使用?

如何将 kafka-connect-jdbc-5.5.0.jar 添加到 Debezium/connect

如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?

Kafka Connect 如何安装 Connect 插件

Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?

如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]