flink处理数据从kafka到另外一个kafka
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink处理数据从kafka到另外一个kafka相关的知识,希望对你有一定的参考价值。
参考技术A需求就是将流量数据(json格式)中某个接口数据抽取一下。如:有个identityUri="yiyang/user/getById/13782" , 这里的13782,是个userId,我们需要将其处理成 identityUri="yiyang/user/getById/"
实际上我们生产中是将二者接口使用的。先使用2,如果没有匹配到,在使用1
这里是演示flink kafka的用法,我们简单使用正则处理
注意:kafka消费的方式是: kafkaConsumer.setStartFromGroupOffsets();
看下上面的启动日志,有这样的信息:Resetting offset for partition yiyang-0 to offset 22.
我们另外启动一个程序,发送消息,并消费两个topic中的数据
看下 ConsumeKafkaTest 中的日志
在看下另外一个服务(消费两个topic数据)的日志:
说明已经成功的把处理好的消息发送到另外一个topic中了
关于数据处理,如果只是简单的增加字段,减少字段,正则替换,也可以使用logstash工具
以上是关于flink处理数据从kafka到另外一个kafka的主要内容,如果未能解决你的问题,请参考以下文章