flink消费kafka protobuf格式数据

Posted 周幽王丶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink消费kafka protobuf格式数据相关的知识,希望对你有一定的参考价值。

  1. 读kafka中protobuf数据类型需要对应的protobuf协议,例如:
  • user.proto 文件
syntax = "proto3";

message User 
  string name = 1;
  int32 age = 2;

  • 用protoc命令或maven插件protobuf将协议文件编译成java类
  1. 注册
env.getConfig().registerTypeWithKryoSerializer(User.class, 
	ProtobufSerializer.class);
  1. 消费
FlinkKafkaConsumer<User> consumer =
                new FlinkKafkaConsumer<>(
                        Collections.singletonList("topic_name"), new DeserializationSchema<User>() 

                    @Override
                    public TypeInformation<User> getProducedType() 
                        return BasicTypeInfo.of(User.class);
                    

                    @Override
                    public User deserialize(byte[] message) throws IOException 
                            return User.parseFrom(message);
                    

                    @Override
                    public boolean isEndOfStream(User nextElement) 
                        return false;
                    
                , consumerPro);

        env.addSource(consumer).print();

总结:其实跟正常的读kafka一样,只是多了一个反序列化的步骤

flink处理数据从kafka到另外一个kafka

参考技术A

需求就是将流量数据(json格式)中某个接口数据抽取一下。如:有个identityUri="yiyang/user/getById/13782" , 这里的13782,是个userId,我们需要将其处理成 identityUri="yiyang/user/getById/"

实际上我们生产中是将二者接口使用的。先使用2,如果没有匹配到,在使用1

这里是演示flink kafka的用法,我们简单使用正则处理

注意:kafka消费的方式是: kafkaConsumer.setStartFromGroupOffsets();

看下上面的启动日志,有这样的信息:Resetting offset for partition yiyang-0 to offset 22.

我们另外启动一个程序,发送消息,并消费两个topic中的数据

看下 ConsumeKafkaTest 中的日志

在看下另外一个服务(消费两个topic数据)的日志:

说明已经成功的把处理好的消息发送到另外一个topic中了

关于数据处理,如果只是简单的增加字段,减少字段,正则替换,也可以使用logstash工具

以上是关于flink消费kafka protobuf格式数据的主要内容,如果未能解决你的问题,请参考以下文章

flink处理数据从kafka到另外一个kafka

flink消费kafka细节

Fllink实时计算运用Flink 自定义序列化Protobuf接入实现方案

Flink消费Kafka到HDFS实现及详解

利用Flink消费Kafka数据保证全局有序

利用Flink消费Kafka数据保证全局有序