Fllink实时计算运用Flink 自定义序列化Protobuf接入实现方案

Posted mirson

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Fllink实时计算运用Flink 自定义序列化Protobuf接入实现方案相关的知识,希望对你有一定的参考价值。

1. 自定义序列化接入方案(Protobuf)

在实际应用场景中, 会存在各种复杂传输对象,同时要求较高的传输处理性能, 这就需要采用自定义的序列化方式做相应实现, 这里以Protobuf为例做讲解。

功能: kafka对同一Topic的生产与消费,采用Protobuf做序列化与反序列化传输, 验证能否正常解析数据。

  1. 通过protobuf脚本生成JAVA文件

    syntax = "proto3";
    option java_package = "com.itcast.flink.connectors.kafka.proto";
    option java_outer_classname = "AccessLogProto";
    
    // 消息结构定义
    message AccessLog {
    
        string ip = 1;
    
        string time = 2;
    
        string type = 3;
    
        string api = 4;
    
        string num = 5;
    }
    

通过批处理脚本,生成JAVA文件:

@echo off
for %%i in (proto/*.proto) do (
  d:/TestCode/protoc.exe --proto_path=./proto  --java_out=../java  ./proto/%%i
  echo generate %%i to java file successfully!
)

注意, 路径要配置正确。

  1. 自定义序列化实现

    添加POM依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.1.8.RELEASE</version>
        </dependency>
    </dependencies>
    

AccessLog对象:

@Data
public class AccessLog implements Serializable {

    private String ip;

    private String time;

    private String type;

    private String api;

    private Integer num;
}

CustomSerialSchema:

/**
 * 自定义序列化实现(Protobuf)
 */
public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> {

    private static final long serialVersionUID = 1L;

    private transient Charset charset;

    public CustomSerialSchema() {
        this(StandardCharsets.UTF_8);
    }

    public CustomSerialSchema(Charset charset) {
        this.charset = checkNotNull(charset);
    }

    public Charset getCharset() {
        return charset;
    }
  
    /**
     * 反序列化实现
     * @param message
     * @return
     */
    @Override
    public AccessLog deserialize(byte[] message) {
        AccessLog accessLog = null;
        try {
            AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message);
            accessLog = new AccessLog();
            BeanUtils.copyProperties(accessLogProto, accessLog);
            return accessLog;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return accessLog;
    }

    @Override
    public boolean isEndOfStream(AccessLog nextElement) {
        return false;
    }

    /**
     * 序列化处理
     * @param element
     * @return
     */
    @Override
    public byte[] serialize(AccessLog element) {
        AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder();
        BeanUtils.copyProperties(element, builder);
        return builder.build().toByteArray();
    }

    /**
     * 定义消息类型
     * @return
     */
    @Override
    public TypeInformation<AccessLog> getProducedType() {
        return TypeInformation.of(AccessLog.class);
    }
}
  1. 通过flink对kafka消息生产者的实现

    public class KafkaSinkApplication {
    
        public static void main(String[] args) throws Exception {
    
            // 1. 创建运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 2. 读取Socket数据源
            DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\\n");
            // 3. 转换处理流数据
            SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() {
                @Override
                public AccessLog map(String value) throws Exception {
                    System.out.println(value);
                    // 根据分隔符解析数据
                    String[] arrValue = value.split("\\t");
                    // 将数据组装为对象
                    AccessLog log = new AccessLog();
                    log.setNum(1);
                    for(int i=0; i<arrValue.length; i++) {
                        if(i == 0) {
                            log.setIp(arrValue[i]);
                        }else if( i== 1) {
                            log.setTime(arrValue[i]);
                        }else if( i== 2) {
                            log.setType(arrValue[i]);
                        }else if( i== 3) {
                            log.setApi(arrValue[i]);
                        }
                    }
    
                    return log;
                }
            });
    
            // 3. Kakfa的生产者配置
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.10.20.132:9092");
            FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
                    "10.10.20.132:9092",            // broker 列表
                    "flink-serial",                  // 目标 topic
                    new CustomSerialSchema()                 // 序列化 方式
                    );   
    
            // 4. 添加kafka的写入器
            outputStream.addSink(kafkaProducer);
    
            socketStr.print().setParallelism(1);
    
            // 5. 执行任务
            env.execute("job");
        }
    
    }

开启Kafka消费者命令行终端,验证生产者的可用性:

[root@flink1 kafka_2.12-1.1.1]# bin/kafka-console-consumer.sh --bootstrap-server  10.10.20.132:9092  --topic flink-serial    
1601649380422GET"
getAccount
1601649381422POSTaddOrder
1601649382422POST"
  1. 通过flink对kafka消息订阅者的实现

    public class KafkaSourceApplication {
    
        public static void main(String[] args) throws Exception {
    
            // 1. 创建运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2. 设置kafka服务连接信息
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.10.20.132:9092");
            properties.setProperty("group.id", "fink_group");
    
            // 3. 创建Kafka消费端
            FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer(
                    "flink-serial",                  // 目标 topic
                    new CustomSerialSchema(),   // 自定义序列化
                    properties);
    
            // 4. 读取Kafka数据源
            DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer);
    
            socketStr.print().setParallelism(1);
    
            // 5. 执行任务
            env.execute("job");
        }
    
    }

通过flink的kafka生产者消息的发送, 对消费者的功能做测试验证。


本文由mirson创作分享,如需进一步交流,请加QQ群:19310171或访问www.softart.cn

以上是关于Fllink实时计算运用Flink 自定义序列化Protobuf接入实现方案的主要内容,如果未能解决你的问题,请参考以下文章

Fllink实时计算运用Flink Table API & SQL 深入详解

Fllink实时计算运用Flink Table API & SQL 案例实战

Fllink学习

Flink的实时计算

Flink 实现自定义滑动窗口

Flink 实现自定义滑动窗口