FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜相关的知识,希望对你有一定的参考价值。

需求

每一个商品被卖出去一条就以以下格式通过kafka发送过来,只对status=101的productId进行统计:

#格式:productId,statusCode
a1001,101
a1001,102
a1001,101
a1003,101
a1002,101

假设每过60s有上述内容被发送过来,那么flink应该会形成以下这样的一个排行榜在Redis内并且随着kafka传送过来的数据变化面实时变化着这个排行榜:

#productId, 被卖次数
#按照被卖次数从上到下显示最大到最小
a1001,2
a1003,1
a1002,1

为什么a1001是2而不是3?因为只统计statusCode为101的商品

进入开发前需要解决的问题

时间窗口的问题;

每60秒一统计,这就是标准的流式计算了。我们假设每隔60秒进入的数据都是不同的,第一次60秒可能进入10条数据,那么这10条数据的“排行榜”情况和下一个60秒内进入的数据会有差别,因此每个“时间窗口”进入的数据都不一样,我们的统计就是根据这个“时间窗口”内的数据进行计算的,因此才叫实时处理。

它和传统的做法的区别在于如下根本区别。

传统的做法:

假设目前mysql内有100万条记录,取出最近60s数据虽然可能取出的结果只有10条但是数据量的底有100万,即你表面看到的是取出10条背后是100万条数据参与了计算。那么如果我们说这个时间窗口假设缩短到了每5秒一变化。试想一下前端是一个小程序,有1,000个用户并行打开这个界面,每个连接以每5s时间去生产假设生产内是1亿条记录这么刷一次,每5秒刷一次,你的生产直接会被搞爆。

流式的做法:

我才不管你这个底是1000万还是1个亿,我只取60秒内的数据,可能有10条也可能是1万条,也可能是10万条,对于flink来说,这点数据量so what?一点不care。

所以我们需要先解决这个时间窗口的问题。

KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
           .setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
           .setValueOnlyDeserializer(new SimpleStringSchema()).build();
       DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
 
       DataStream<Tuple2<String, Integer>> ds = kafkaDS.flatMap(new SumSplitter());
       DataStream<Tuple2<String, Integer>> prodCount =
           ds.keyBy(value -> value.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(60), Time.seconds(10)))
               // key之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口
               .sum(1);// 将相同的key的元素第二个count值相加
 
       prodCount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))// (shu1, xx) (shu2,xx)....
           // 所有key元素进入一个5s长的窗口(选5秒是因为上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
           .process(new TopNAllFunction(5)).addSink(redisSink);// 5代表前5名
 
       env.execute();

排名的问题;

这边我们可以取巧,我们使用了TreeMap里的内置compare,我们覆写这个方法,下面上代码

private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> 
 
    private int topSize = 5;
 
    public TopNAllFunction(int topSize) 
 
        this.topSize = topSize;
    
 
    @Override
    public void process(
 
        ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
        Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception 
 
        TreeMap<Integer, Tuple2<String, Integer>> treemap =
            new TreeMap<Integer, Tuple2<String, Integer>>(new Comparator<Integer>() 
 
                @Override
                public int compare(Integer y, Integer x) 
                    return (x < y) ? -1 : 1;
                
 
            ); // treemap按照key降序排列,相同count值不覆盖
 
        for (Tuple2<String, Integer> element : input) 
            treemap.put(element.f1, element);
            if (treemap.size() > topSize)  // 只保留前面TopN个元素
                treemap.pollLastEntry();
            
        
        for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) 
            Tuple2<String, Integer> prodInfo = entry.getValue();
            System.out.println("prodid->" + prodInfo.f0 + "  购买次数->" + prodInfo.f1);
            out.collect(prodInfo);
        
    

然后把以上两块内容合并,写入Redis就搞定这件事了,下面给出完整代码

完整代码

ProductAggregate.java

* 系统项目名称 com.aldi.com.cnflink.demo WatermarkInSource.java
 *
 * 2022年9月26日-下午1:20:26 2022XX公司-版权所有
 *
 */
package com.aldi.com.cnflink.demo;
 
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
 
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.util.Collector;
 
/**
 *
 * WatermarkInSource
 *
 *
 * 2022年9月26日 下午1:20:26
 *
 * @version 1.0.0
 *
 */
public class ProductAggregate 
    public static void main(String[] args) throws Exception 
 
        FlinkJedisPoolConfig conf =
            new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
        RedisSink redisSink = new RedisSink<>(conf, new SumReport2RedisMapper());
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
            .setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()).build();
        DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
 
        DataStream<Tuple2<String, Integer>> ds = kafkaDS.flatMap(new SumSplitter());
        DataStream<Tuple2<String, Integer>> prodCount =
            ds.keyBy(value -> value.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(60), Time.seconds(10)))
                // key之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口
                .sum(1);// 将相同的key的元素第二个count值相加
 
        prodCount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))// (shu1, xx) (shu2,xx)....
            // 所有key元素进入一个5s长的窗口(选5秒是因为上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
            .process(new TopNAllFunction(5)).addSink(redisSink);// 5代表前5名
 
        env.execute();
    
 
    private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> 
 
        private int topSize = 5;
 
        public TopNAllFunction(int topSize) 
 
            this.topSize = topSize;
        
 
        @Override
        public void process(
 
            ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
            Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception 
 
            TreeMap<Integer, Tuple2<String, Integer>> treemap =
                new TreeMap<Integer, Tuple2<String, Integer>>(new Comparator<Integer>() 
 
                    @Override
                    public int compare(Integer y, Integer x) 
                        return (x < y) ? -1 : 1;
                    
 
                ); // treemap按照key降序排列,相同count值不覆盖
 
            for (Tuple2<String, Integer> element : input) 
                treemap.put(element.f1, element);
                if (treemap.size() > topSize)  // 只保留前面TopN个元素
                    treemap.pollLastEntry();
                
            
            for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) 
                Tuple2<String, Integer> prodInfo = entry.getValue();
                System.out.println("prodid->" + prodInfo.f0 + "  购买次数->" + prodInfo.f1);
                out.collect(prodInfo);
            
        
    

SumSplitter.java

* 系统项目名称 com.aldi.flink.demo LineSplitter.java
 *
 * 2022年9月23日-下午4:31:36 2022XX公司-版权所有
 *
 */
package com.aldi.com.cnflink.demo;
 
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
 
/**
 *
 * LineSplitter
 *
 *
 * 2022年9月23日 下午4:31:36
 *
 * @version 1.0.0
 *
 */
public class SumSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> 
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception 
        try 
            if (s != null || s.length() > 0) 
                String[] tokens = s.toLowerCase().split(",");
                if (tokens != null && tokens.length > 1) 
                    if (tokens[1].equals("101")) 
                        collector.collect(new Tuple2<String, Integer>(tokens[0], 1));
                        System.out.println(
                            ">>>>>>product id->" + tokens[1] + " and status->" + tokens[1] + " will be counted");
                    
                
            
         catch (Exception e) 
            e.printStackTrace();
        
    

SumReport2RedisMapper.java

* 系统项目名称 com.aldi.flink.demo SinkRedisMapper.java
 *
 * 2022年9月23日-下午3:52:49 2022XX公司-版权所有
 *
 */
package com.aldi.com.cnflink.demo;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 
/**
 *
 * SinkRedisMapper
 *
 *
 * 2022年9月23日 下午3:52:49
 *
 * @version 1.0.0
 *
 */
public class SumReport2RedisMapper implements RedisMapper<Tuple2<String, Integer>> 
    @Override
    public RedisCommandDescription getCommandDescription() 
        // hset
        return new RedisCommandDescription(RedisCommand.HSET, "flinkdemo:kafka:simplekafka");
    
 
    @Override
    public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) 
        return stringIntegerTuple2.f0;
    
 
    @Override
    public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) 
        return stringIntegerTuple2.f1.toString();
    

运行代码

然后我们在kafka处输入如下内容

 我们去Redis内可以看到有一个这样的Key生成了,内容如下:

然后页面做个实时查询的。。。不管什么页面也好、做个什么APP也好、做个什么小程序也好,连着这个Redis数据结果,随便刷。。。啊我。。。”洗刷刷洗刷刷,洗刷刷“

以上是关于FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜的主要内容,如果未能解决你的问题,请参考以下文章

Flink视频教程_基于Flink流处理的动态实时电商实时分析系统

FLINK 基于1.15.2的Java开发-如何使用外部配置文件

日均20万亿次计算量!腾讯基于Flink的实时流计算平台演进之路丨附PPT下载

FLINK 基于1.15.2的Java开发-入门

大数据开发-Flink-Flink简介和入门

大数据开发-Flink-Flink简介和入门