Kafka Connect:如何将String解析为Map
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Connect:如何将String解析为Map相关的知识,希望对你有一定的参考价值。
假设我有一个文件填充了由新行字符(JSON
)分隔的
对象/行。当基于FileStreamSource的连接器读取此文件时,它会将每一行视为java.lang.String
。
如何将这个java.lang.String
解析为java.util.Map
或struct以执行进一步的转换(例如使用MaskField屏蔽字段或使用ExtractField提取字段)?
PS:问题不在于如何将一些java.lang.String
解析为java.util.Map
或struct,而是关于如何将这种解析逻辑与Kafka(自定义Kafka转换?)集成或通过其他方式获得相同的结果(例如在Kafka中配置或使用特定的连接器/转换等)
正如Apache Kafka文档所述,FileStreamSource
并不完全是生产支持的连接器......
也许你最好使用spooldir连接器,它支持行分隔JSON https://github.com/jcustenborder/kafka-connect-spooldir/blob/master/README.md
有两种可能的方法:
- 您可以使用Confluent Platform并使用适当的KSQL查询(https://docs.confluent.io/current/ksql/docs/tutorials/index.html#ksql-tutorials)运行连接器。
- 您可以使用源连接器启动Kafka Stream应用程序(https://kafka.apache.org/documentation/streams/)。流应用程序将从连接器放置消息的主题/ -s中读取消息。您需要在Kafka流应用程序中实现转换逻辑。处理消息时,Stream应用程序将其置于输出主题。下面是流应用程序代码的示例结构。
Properties props = new Properties();
...
final StreamsBuilder builder = new StreamsBuilder();
Pattern pattern = Pattern.compile(<YOUR_INPUT_TOPIC_PATTERN>);
KStream<String, String> source = builder.stream(pattern);
...
source.mapValues((k,v) -> {
Gson gson = new Gson();
Map map = gson.fromJson(v, Map.class);
// here is your transformation logi
return v;
}).to(<YOUR_OUTPUT_TOPIC>);
...
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
...
streams.start();
以上是关于Kafka Connect:如何将String解析为Map的主要内容,如果未能解决你的问题,请参考以下文章
如何将 from_json 与 Kafka connect 0.10 和 Spark Structured Streaming 一起使用?
如何将 kafka-connect-jdbc-5.5.0.jar 添加到 Debezium/connect
如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?
Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?
如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]