FLINK 基于1.15.2的Java开发-自定义Source端
Posted TGITCIC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-自定义Source端相关的知识,希望对你有一定的参考价值。
开篇
之前我们讲过自定义Sink端,今天我们要讲一下自定义Source端。
自定义Source端比知定义Sink端还要简单。它只要继承自RichSourceFunction<T>,并覆写以下几个方法就行:
@Override
public void open(Configuration parameters) throws Exception
@Override
public void run(SourceContext sourceContext) throws Exception
@Override
public void cancel()
现实需求
我们假设现在有一个这样的需求:我们自定义我们的Kafka Source,然后在接入Kafka数据源时把以下这样的数据直接转成一个ProductBean
kafka源数据:
"productId":"a101","status":101
ProductBean.java长这样
* 系统项目名称 com.aldi.com.cnflink.demo ProductBean.java
*
* 2022年9月27日-上午11:16:38 2022XX公司-版权所有
*
*/
package org.mk.demo.flink;
import java.io.Serializable;
/**
*
* ProductBean
*
*
* 2022年9月27日 上午11:16:38
*
* @version 1.0.0
*
*/
public class ProductBean implements Serializable
private String productId = "";
private int status = 0;
public String getProductId()
return productId;
public void setProductId(String productId)
this.productId = productId;
public int getStatus()
return status;
public void setStatus(int status)
this.status = status;
因此,我们自定我们自己的KafkaSource
开始自定义KafkaSource
CustomizedKafkaSource.java
package org.mk.demo.flink;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.mk.demo.flink.util.ParamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class CustomizedKafkaSource extends RichSourceFunction<ProductBean>
private final static Logger logger = LoggerFactory.getLogger(CustomizedKafkaSource.class);
KafkaProducer<String, String> producer;
KafkaConsumer<String, String> consumer;
@Override
public void open(Configuration parameters) throws Exception
ExecutionConfig.GlobalJobParameters context = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
ParameterTool paras = (ParameterTool) context;
String bootStrapServers = paras.get("kafka.bootstrapservers");
String topicName = paras.get("kafka.topic");
logger.info(">>>>>>kafka.bootStrapServers->" + bootStrapServers);
logger.info(">>>>>>topicName->" + topicName);
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test01");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
// 创建消费者
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
@Override
public void run(SourceContext sourceContext) throws Exception
while (true)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
String key = record.key(); // 可以忽略
String value = record.value();// 这个才是kafka里输入了一行如:
// "productId":"101","status":101然后敲一下回车,这一串就跑到这儿的value里了。
// Map map = JSON.parseObject(value);//
// 为了转型上更安全我们使用map来转型value,也可以使用具体的XXXBean.Class来转。
ProductBean product = JSON.parseObject(value, ProductBean.class);
String productId = product.getProductId();
int status = product.getStatus();
// Tuple2<String, Integer> tuple2 = new Tuple2<String, Integer>(productId,
// status);
sourceContext.collect(product); // 与下游输出或者是sink类型必须一致
@Override
public void cancel()
这边需要注意的是public void run()方法中的ConsumerRecords用法,ConsumerRecords是一个可遍历的数据结构,在遍历时它会有一个<String, String>结构的数据对象。这个对象有两个值
- ConsumerRecord<String, String> record.key
- ConsumerRecord<String, String> record.value
我们现在在我们的Kafka端输入这么一行东西+一个回车:
"productId":"a101","status":101
那么此处的ConsumerRecord<String, String> record.value处得到的就是上面这么一行,因此此时你就可以进行反序列转成Bean了。
如何调用自定义的Source
自定义Source后我们来看怎么调用这个自定义的Source。
package org.mk.demo.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.util.Collector;
import com.alibaba.fastjson.JSON;
public class CustomizedKafkaApplication
private final static Logger logger = LoggerFactory.getLogger(CustomizedKafkaApplication.class);
// 序列化
public static class ProductBeanJSONDeSerializer implements KafkaDeserializationSchema<ProductBean>
private final String encoding = "UTF8";
private boolean includeTopic;
private boolean includeTimestamp;
public ProductBeanJSONDeSerializer(boolean includeTopic, boolean includeTimestamp)
this.includeTopic = includeTopic;
this.includeTimestamp = includeTimestamp;
@Override
public TypeInformation<ProductBean> getProducedType()
return TypeInformation.of(ProductBean.class);
@Override
public boolean isEndOfStream(ProductBean nextElement)
return false;
@Override
public ProductBean deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception
if (consumerRecord != null)
try
String value = new String(consumerRecord.value(), encoding);
ProductBean product = JSON.parseObject(value, ProductBean.class);
return product;
catch (Exception e)
logger.error(">>>>>>deserialize failed : " + e.getMessage(), e);
return null;
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool argParas = ParameterTool.fromArgs(args);
String propertiesFilePath = argParas.get("config_path");
if (logger.isDebugEnabled())
logger.debug(">>>>>>start to load properties from ", propertiesFilePath);
ParameterTool paras = ParameterTool.fromPropertiesFile(propertiesFilePath);
env.getConfig().setGlobalJobParameters(paras);
DataStreamSource<ProductBean> kafkaSource = env.addSource(new CustomizedKafkaSource());
DataStream<Tuple2<String, Integer>> ds = kafkaSource
.flatMap(new FlatMapFunction<ProductBean, Tuple2<String, Integer>>()
public void flatMap(ProductBean product, Collector<Tuple2<String, Integer>> collector)
throws Exception
if (product.getStatus() == 101)
// System.out.println(">>>>>>productId->" + product.getProductId());
logger.info(">>>>>>productId->" + product.getProductId());
collector.collect(new Tuple2<String, Integer>(product.getProductId(), 1));
);
DataStream<Tuple2<String, Integer>> prodCount = ds.keyBy(value -> value.f0)
.window(SlidingProcessingTimeWindows.of(Time.hours(1), Time.seconds(5)))
.sum(1);
prodCount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))// (shu1, xx) (shu2,xx)....
.process(new CustomizedSinkWindow()).addSink(new MyRedisSink("flinkdemo:kafka:customizedKafka"));// 5代表前5名
env.execute();
private static class CustomizedSinkWindow
extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>
public CustomizedSinkWindow()
@Override
public void process(
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception
for (Tuple2<String, Integer> element : input)
logger.info(">>>>>>prodid->" + element.f0 + " 购买次数->" + element.f1);
out.collect(element);
结束,很简单。
以上是关于FLINK 基于1.15.2的Java开发-自定义Source端的主要内容,如果未能解决你的问题,请参考以下文章
FLINK 基于1.15.2的Java开发-如何使用外部配置文件
FLINK 基于1.15.2的Java开发-在flink内如何使用log4j
FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境