FLINK 基于1.15.2的Java开发-从KAFKA读入JSON把它反序列化成一个JAVA Object

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-从KAFKA读入JSON把它反序列化成一个JAVA Object相关的知识,希望对你有一定的参考价值。

背景

生产级的应用,一般我们会大量涉及到json的序列化和反序列化场景。我们假设kafka的broker sender过来的数据是json格式,如下这种格式:

"productId":"a1001","status":"101"

然后我们要把它反序列化成一个Java Object如下这样的一个Java Bean:

* 系统项目名称 com.aldi.com.cnflink.demo ProductBean.java
 *
 * 2022年9月27日-上午11:16:38 2022XX公司-版权所有
 *
 */
package com.aldi.com.cnflink.demo;
 
import java.io.Serializable;
 
/**
 *
 * ProductBean
 *
 *
 * 2022年9月27日 上午11:16:38
 *
 * @version 1.0.0
 *
 */
public class ProductBean implements Serializable 
 
    private String productId = "";
    private int status = 0;
 
    public String getProductId() 
        return productId;
    
 
    public void setProductId(String productId) 
        this.productId = productId;
    
 
    public int getStatus() 
        return status;
    
 
    public void setStatus(int status) 
        this.status = status;
    
 

flink 1.15.2使用反序列化的技巧

目前网上给出的例子都是<flink1.10版的,在这些例子中很多代码早已经被deprecated(废弃)了。或者很多代码是scala写的。

scala是一个不太规范的小语言,它虽然基于java但远没有java生态来的完善。我们为什么要强调flink都用java写?因为这和一体化架构有关。

流批一体,它本身就属于业务系统的一部分,它属于所谓的大数据生态,但系统和架构上和大数据是完全不一样的。这一块我会在后面展开论述。

因此当我们的业务系统如:零售中台它是java的,那么这个flink所用的技术栈也必须从属于java,这样它才可以使用成熟的“企业级架构”、“中间件”去一体化的集成,对于开发、devops、运维团队来说,也不会因为多个语言、技术栈的混合而影响到“代码公开标准、技术上无黑盒、内化容易、项目可持续性强、技术开发运维资源通用化(这点很重要,不要搞了很晦涩为了show一些所谓的ability而影响到了:通用性、是否易掌握)”。

这边需要用到两处技巧如下:

  1. 读入kafka时,使用setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer(true, true)))//这段是我们自己企业业务逻辑需要你也可以不带任何参数,这个参数代表的是:否读入kafka topic true/false 是否读入kafka timestamp true/false
  2. 自定义KafkaDeserialization

上代码

先来看读入kafka时的代码

KafkaSource<ProductBean> source = KafkaSource.<ProductBean>builder()
                .setBootstrapServers(paras.get("kafka.bootstrapservers")).setTopics(paras.get("kafka.topic"))
                .setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer(true, true)))
                .build();

再来看自定义的ProductBeanJSONDeSerializer

// 序列化
public static class ProductBeanJSONDeSerializer implements KafkaDeserializationSchema<ProductBean> 
 
    private final String encoding = "UTF8";
    private boolean includeTopic;
    private boolean includeTimestamp;
 
    public ProductBeanJSONDeSerializer(boolean includeTopic, boolean includeTimestamp) 
        this.includeTopic = includeTopic;
        this.includeTimestamp = includeTimestamp;
    
 
    @Override
    public TypeInformation<ProductBean> getProducedType() 
        return TypeInformation.of(ProductBean.class);
    
 
    @Override
    public boolean isEndOfStream(ProductBean nextElement) 
        return false;
    
 
    @Override
    public ProductBean deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception 
        if (consumerRecord != null) 
            try 
                String value = new String(consumerRecord.value(), encoding);
                ProductBean product = JSON.parseObject(value, ProductBean.class);
                return product;
             catch (Exception e) 
                logger.error(">>>>>>deserialize failed : " + e.getMessage(), e);
            
        
        return null;
    

项目运行起来的效果

此时你就可以在kafka端输入以下这样的内容:

"productId":"a1001","status":"101"

读进来后它会自动变成了ProductBean了。

以上是关于FLINK 基于1.15.2的Java开发-从KAFKA读入JSON把它反序列化成一个JAVA Object的主要内容,如果未能解决你的问题,请参考以下文章

FLINK 基于1.15.2的Java开发-入门

FLINK 基于1.15.2的Java开发-自定义Source端

FLINK 基于1.15.2的Java开发-在flink内如何使用log4j

FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境

FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜

FLINK 基于1.15.2的Java开发-Sink到MYSQL的两种姿势