FLINK 基于1.15.2的Java开发-自定义Source端

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-自定义Source端相关的知识,希望对你有一定的参考价值。

开篇

之前我们讲过自定义Sink端,今天我们要讲一下自定义Source端。

自定义Source端比知定义Sink端还要简单。它只要继承自RichSourceFunction<T>,并覆写以下几个方法就行:

@Override
    public void open(Configuration parameters) throws Exception 
 
@Override
    public void run(SourceContext sourceContext) throws Exception 
 
@Override
    public void cancel() 
 
    

现实需求

我们假设现在有一个这样的需求:我们自定义我们的Kafka Source,然后在接入Kafka数据源时把以下这样的数据直接转成一个ProductBean

kafka源数据:

"productId":"a101","status":101

ProductBean.java长这样

 * 系统项目名称 com.aldi.com.cnflink.demo ProductBean.java
 * 
 * 2022年9月27日-上午11:16:38 2022XX公司-版权所有
 * 
 */
package org.mk.demo.flink;

import java.io.Serializable;

/**
 * 
 * ProductBean
 * 
 * 
 * 2022年9月27日 上午11:16:38
 * 
 * @version 1.0.0
 * 
 */
public class ProductBean implements Serializable 

    private String productId = "";
    private int status = 0;

    public String getProductId() 
        return productId;
    

    public void setProductId(String productId) 
        this.productId = productId;
    

    public int getStatus() 
        return status;
    

    public void setStatus(int status) 
        this.status = status;
    


因此,我们自定我们自己的KafkaSource

开始自定义KafkaSource

CustomizedKafkaSource.java

package org.mk.demo.flink;
 
import com.alibaba.fastjson.JSON;
 
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.mk.demo.flink.util.ParamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
 
public class CustomizedKafkaSource extends RichSourceFunction<ProductBean> 
    private final static Logger logger = LoggerFactory.getLogger(CustomizedKafkaSource.class);
    KafkaProducer<String, String> producer;
    KafkaConsumer<String, String> consumer;
 
    @Override
    public void open(Configuration parameters) throws Exception 
        ExecutionConfig.GlobalJobParameters context = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        ParameterTool paras = (ParameterTool) context;
        String bootStrapServers = paras.get("kafka.bootstrapservers");
        String topicName = paras.get("kafka.topic");
        logger.info(">>>>>>kafka.bootStrapServers->" + bootStrapServers);
        logger.info(">>>>>>topicName->" + topicName);
 
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test01");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
 
        // 创建消费者
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topicName));
    
 
    @Override
    public void run(SourceContext sourceContext) throws Exception 
 
        while (true) 
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
            for (ConsumerRecord<String, String> record : records) 
 
                String key = record.key(); // 可以忽略
                String value = record.value();// 这个才是kafka里输入了一行如:
                                                // "productId":"101","status":101然后敲一下回车,这一串就跑到这儿的value里了。
 
                // Map map = JSON.parseObject(value);//
                // 为了转型上更安全我们使用map来转型value,也可以使用具体的XXXBean.Class来转。
                ProductBean product = JSON.parseObject(value, ProductBean.class);
                String productId = product.getProductId();
                int status = product.getStatus();
                // Tuple2<String, Integer> tuple2 = new Tuple2<String, Integer>(productId,
                // status);
                sourceContext.collect(product); // 与下游输出或者是sink类型必须一致
            
        
 
    
 
    @Override
    public void cancel() 
 
    

这边需要注意的是public void run()方法中的ConsumerRecords用法,ConsumerRecords是一个可遍历的数据结构,在遍历时它会有一个<String, String>结构的数据对象。这个对象有两个值

  1. ConsumerRecord<String, String> record.key
  2. ConsumerRecord<String, String> record.value

我们现在在我们的Kafka端输入这么一行东西+一个回车:

"productId":"a101","status":101

那么此处的ConsumerRecord<String, String> record.value处得到的就是上面这么一行,因此此时你就可以进行反序列转成Bean了。

如何调用自定义的Source

自定义Source后我们来看怎么调用这个自定义的Source。

package org.mk.demo.flink;
 
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.util.Collector;
 
import com.alibaba.fastjson.JSON;
 
public class CustomizedKafkaApplication 
    private final static Logger logger = LoggerFactory.getLogger(CustomizedKafkaApplication.class);
 
    // 序列化
    public static class ProductBeanJSONDeSerializer implements KafkaDeserializationSchema<ProductBean> 
 
        private final String encoding = "UTF8";
        private boolean includeTopic;
        private boolean includeTimestamp;
 
        public ProductBeanJSONDeSerializer(boolean includeTopic, boolean includeTimestamp) 
            this.includeTopic = includeTopic;
            this.includeTimestamp = includeTimestamp;
        
 
        @Override
        public TypeInformation<ProductBean> getProducedType() 
            return TypeInformation.of(ProductBean.class);
        
 
        @Override
        public boolean isEndOfStream(ProductBean nextElement) 
            return false;
        
 
        @Override
        public ProductBean deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception 
            if (consumerRecord != null) 
                try 
                    String value = new String(consumerRecord.value(), encoding);
                    ProductBean product = JSON.parseObject(value, ProductBean.class);
                    return product;
                 catch (Exception e) 
                    logger.error(">>>>>>deserialize failed : " + e.getMessage(), e);
                
            
            return null;
        
    
 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool argParas = ParameterTool.fromArgs(args);
        String propertiesFilePath = argParas.get("config_path");
        if (logger.isDebugEnabled()) 
            logger.debug(">>>>>>start to load properties from ", propertiesFilePath);
        
        ParameterTool paras = ParameterTool.fromPropertiesFile(propertiesFilePath);
        env.getConfig().setGlobalJobParameters(paras);
        DataStreamSource<ProductBean> kafkaSource = env.addSource(new CustomizedKafkaSource());
        DataStream<Tuple2<String, Integer>> ds = kafkaSource
                .flatMap(new FlatMapFunction<ProductBean, Tuple2<String, Integer>>() 
                    public void flatMap(ProductBean product, Collector<Tuple2<String, Integer>> collector)
                            throws Exception 
                        if (product.getStatus() == 101) 
                            // System.out.println(">>>>>>productId->" + product.getProductId());
                            logger.info(">>>>>>productId->" + product.getProductId());
                            collector.collect(new Tuple2<String, Integer>(product.getProductId(), 1));
                        
                    
                );
        DataStream<Tuple2<String, Integer>> prodCount = ds.keyBy(value -> value.f0)
                .window(SlidingProcessingTimeWindows.of(Time.hours(1), Time.seconds(5)))
                .sum(1);
        prodCount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))// (shu1, xx) (shu2,xx)....
                .process(new CustomizedSinkWindow()).addSink(new MyRedisSink("flinkdemo:kafka:customizedKafka"));// 5代表前5名
 
        env.execute();
    
 
    private static class CustomizedSinkWindow
            extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> 
        public CustomizedSinkWindow() 
        
 
        @Override
        public void process(
                ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
                Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception 
            for (Tuple2<String, Integer> element : input) 
                logger.info(">>>>>>prodid->" + element.f0 + "  购买次数->" + element.f1);
                out.collect(element);
            
        
    
 

结束,很简单。

以上是关于FLINK 基于1.15.2的Java开发-自定义Source端的主要内容,如果未能解决你的问题,请参考以下文章

FLINK 基于1.15.2的Java开发-如何使用外部配置文件

FLINK 基于1.15.2的Java开发-入门

FLINK 基于1.15.2的Java开发-在flink内如何使用log4j

FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境

FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜

FLINK 基于1.15.2的Java开发-Sink到MYSQL的两种姿势