FLINK 基于1.15.2的Java开发-从KAFKA读入JSON把它反序列化成一个JAVA Object
Posted TGITCIC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-从KAFKA读入JSON把它反序列化成一个JAVA Object相关的知识,希望对你有一定的参考价值。
背景
生产级的应用,一般我们会大量涉及到json的序列化和反序列化场景。我们假设kafka的broker sender过来的数据是json格式,如下这种格式:
"productId":"a1001","status":"101"
然后我们要把它反序列化成一个Java Object如下这样的一个Java Bean:
* 系统项目名称 com.aldi.com.cnflink.demo ProductBean.java
*
* 2022年9月27日-上午11:16:38 2022XX公司-版权所有
*
*/
package com.aldi.com.cnflink.demo;
import java.io.Serializable;
/**
*
* ProductBean
*
*
* 2022年9月27日 上午11:16:38
*
* @version 1.0.0
*
*/
public class ProductBean implements Serializable
private String productId = "";
private int status = 0;
public String getProductId()
return productId;
public void setProductId(String productId)
this.productId = productId;
public int getStatus()
return status;
public void setStatus(int status)
this.status = status;
flink 1.15.2使用反序列化的技巧
目前网上给出的例子都是<flink1.10版的,在这些例子中很多代码早已经被deprecated(废弃)了。或者很多代码是scala写的。
scala是一个不太规范的小语言,它虽然基于java但远没有java生态来的完善。我们为什么要强调flink都用java写?因为这和一体化架构有关。
流批一体,它本身就属于业务系统的一部分,它属于所谓的大数据生态,但系统和架构上和大数据是完全不一样的。这一块我会在后面展开论述。
因此当我们的业务系统如:零售中台它是java的,那么这个flink所用的技术栈也必须从属于java,这样它才可以使用成熟的“企业级架构”、“中间件”去一体化的集成,对于开发、devops、运维团队来说,也不会因为多个语言、技术栈的混合而影响到“代码公开标准、技术上无黑盒、内化容易、项目可持续性强、技术开发运维资源通用化(这点很重要,不要搞了很晦涩为了show一些所谓的ability而影响到了:通用性、是否易掌握)”。
这边需要用到两处技巧如下:
- 读入kafka时,使用setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer(true, true)))//这段是我们自己企业业务逻辑需要你也可以不带任何参数,这个参数代表的是:否读入kafka topic true/false 是否读入kafka timestamp true/false
- 自定义KafkaDeserialization
上代码
先来看读入kafka时的代码
KafkaSource<ProductBean> source = KafkaSource.<ProductBean>builder()
.setBootstrapServers(paras.get("kafka.bootstrapservers")).setTopics(paras.get("kafka.topic"))
.setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer(true, true)))
.build();
再来看自定义的ProductBeanJSONDeSerializer
// 序列化
public static class ProductBeanJSONDeSerializer implements KafkaDeserializationSchema<ProductBean>
private final String encoding = "UTF8";
private boolean includeTopic;
private boolean includeTimestamp;
public ProductBeanJSONDeSerializer(boolean includeTopic, boolean includeTimestamp)
this.includeTopic = includeTopic;
this.includeTimestamp = includeTimestamp;
@Override
public TypeInformation<ProductBean> getProducedType()
return TypeInformation.of(ProductBean.class);
@Override
public boolean isEndOfStream(ProductBean nextElement)
return false;
@Override
public ProductBean deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception
if (consumerRecord != null)
try
String value = new String(consumerRecord.value(), encoding);
ProductBean product = JSON.parseObject(value, ProductBean.class);
return product;
catch (Exception e)
logger.error(">>>>>>deserialize failed : " + e.getMessage(), e);
return null;
项目运行起来的效果
此时你就可以在kafka端输入以下这样的内容:
"productId":"a1001","status":"101"
读进来后它会自动变成了ProductBean了。
以上是关于FLINK 基于1.15.2的Java开发-从KAFKA读入JSON把它反序列化成一个JAVA Object的主要内容,如果未能解决你的问题,请参考以下文章
FLINK 基于1.15.2的Java开发-自定义Source端
FLINK 基于1.15.2的Java开发-在flink内如何使用log4j
FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境