FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜
Posted TGITCIC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜相关的知识,希望对你有一定的参考价值。
需求
每一个商品被卖出去一条就以以下格式通过kafka发送过来,只对status=101的productId进行统计:
#格式:productId,statusCode
a1001,101
a1001,102
a1001,101
a1003,101
a1002,101
假设每过60s有上述内容被发送过来,那么flink应该会形成以下这样的一个排行榜在Redis内并且随着kafka传送过来的数据变化面实时变化着这个排行榜:
#productId, 被卖次数
#按照被卖次数从上到下显示最大到最小
a1001,2
a1003,1
a1002,1
为什么a1001是2而不是3?因为只统计statusCode为101的商品
进入开发前需要解决的问题
时间窗口的问题;
每60秒一统计,这就是标准的流式计算了。我们假设每隔60秒进入的数据都是不同的,第一次60秒可能进入10条数据,那么这10条数据的“排行榜”情况和下一个60秒内进入的数据会有差别,因此每个“时间窗口”进入的数据都不一样,我们的统计就是根据这个“时间窗口”内的数据进行计算的,因此才叫实时处理。
它和传统的做法的区别在于如下根本区别。
传统的做法:
假设目前mysql内有100万条记录,取出最近60s数据虽然可能取出的结果只有10条但是数据量的底有100万,即你表面看到的是取出10条背后是100万条数据参与了计算。那么如果我们说这个时间窗口假设缩短到了每5秒一变化。试想一下前端是一个小程序,有1,000个用户并行打开这个界面,每个连接以每5s时间去生产假设生产内是1亿条记录这么刷一次,每5秒刷一次,你的生产直接会被搞爆。
流式的做法:
我才不管你这个底是1000万还是1个亿,我只取60秒内的数据,可能有10条也可能是1万条,也可能是10万条,对于flink来说,这点数据量so what?一点不care。
所以我们需要先解决这个时间窗口的问题。
KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
.setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema()).build();
DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<Tuple2<String, Integer>> ds = kafkaDS.flatMap(new SumSplitter());
DataStream<Tuple2<String, Integer>> prodCount =
ds.keyBy(value -> value.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(60), Time.seconds(10)))
// key之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口
.sum(1);// 将相同的key的元素第二个count值相加
prodCount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))// (shu1, xx) (shu2,xx)....
// 所有key元素进入一个5s长的窗口(选5秒是因为上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
.process(new TopNAllFunction(5)).addSink(redisSink);// 5代表前5名
env.execute();
排名的问题;
这边我们可以取巧,我们使用了TreeMap里的内置compare,我们覆写这个方法,下面上代码
private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>
private int topSize = 5;
public TopNAllFunction(int topSize)
this.topSize = topSize;
@Override
public void process(
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception
TreeMap<Integer, Tuple2<String, Integer>> treemap =
new TreeMap<Integer, Tuple2<String, Integer>>(new Comparator<Integer>()
@Override
public int compare(Integer y, Integer x)
return (x < y) ? -1 : 1;
); // treemap按照key降序排列,相同count值不覆盖
for (Tuple2<String, Integer> element : input)
treemap.put(element.f1, element);
if (treemap.size() > topSize) // 只保留前面TopN个元素
treemap.pollLastEntry();
for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet())
Tuple2<String, Integer> prodInfo = entry.getValue();
System.out.println("prodid->" + prodInfo.f0 + " 购买次数->" + prodInfo.f1);
out.collect(prodInfo);
然后把以上两块内容合并,写入Redis就搞定这件事了,下面给出完整代码
完整代码
ProductAggregate.java
* 系统项目名称 com.aldi.com.cnflink.demo WatermarkInSource.java
*
* 2022年9月26日-下午1:20:26 2022XX公司-版权所有
*
*/
package com.aldi.com.cnflink.demo;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.util.Collector;
/**
*
* WatermarkInSource
*
*
* 2022年9月26日 下午1:20:26
*
* @version 1.0.0
*
*/
public class ProductAggregate
public static void main(String[] args) throws Exception
FlinkJedisPoolConfig conf =
new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
RedisSink redisSink = new RedisSink<>(conf, new SumReport2RedisMapper());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
.setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema()).build();
DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<Tuple2<String, Integer>> ds = kafkaDS.flatMap(new SumSplitter());
DataStream<Tuple2<String, Integer>> prodCount =
ds.keyBy(value -> value.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(60), Time.seconds(10)))
// key之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口
.sum(1);// 将相同的key的元素第二个count值相加
prodCount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))// (shu1, xx) (shu2,xx)....
// 所有key元素进入一个5s长的窗口(选5秒是因为上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
.process(new TopNAllFunction(5)).addSink(redisSink);// 5代表前5名
env.execute();
private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>
private int topSize = 5;
public TopNAllFunction(int topSize)
this.topSize = topSize;
@Override
public void process(
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception
TreeMap<Integer, Tuple2<String, Integer>> treemap =
new TreeMap<Integer, Tuple2<String, Integer>>(new Comparator<Integer>()
@Override
public int compare(Integer y, Integer x)
return (x < y) ? -1 : 1;
); // treemap按照key降序排列,相同count值不覆盖
for (Tuple2<String, Integer> element : input)
treemap.put(element.f1, element);
if (treemap.size() > topSize) // 只保留前面TopN个元素
treemap.pollLastEntry();
for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet())
Tuple2<String, Integer> prodInfo = entry.getValue();
System.out.println("prodid->" + prodInfo.f0 + " 购买次数->" + prodInfo.f1);
out.collect(prodInfo);
SumSplitter.java
* 系统项目名称 com.aldi.flink.demo LineSplitter.java
*
* 2022年9月23日-下午4:31:36 2022XX公司-版权所有
*
*/
package com.aldi.com.cnflink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
*
* LineSplitter
*
*
* 2022年9月23日 下午4:31:36
*
* @version 1.0.0
*
*/
public class SumSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception
try
if (s != null || s.length() > 0)
String[] tokens = s.toLowerCase().split(",");
if (tokens != null && tokens.length > 1)
if (tokens[1].equals("101"))
collector.collect(new Tuple2<String, Integer>(tokens[0], 1));
System.out.println(
">>>>>>product id->" + tokens[1] + " and status->" + tokens[1] + " will be counted");
catch (Exception e)
e.printStackTrace();
SumReport2RedisMapper.java
* 系统项目名称 com.aldi.flink.demo SinkRedisMapper.java
*
* 2022年9月23日-下午3:52:49 2022XX公司-版权所有
*
*/
package com.aldi.com.cnflink.demo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
*
* SinkRedisMapper
*
*
* 2022年9月23日 下午3:52:49
*
* @version 1.0.0
*
*/
public class SumReport2RedisMapper implements RedisMapper<Tuple2<String, Integer>>
@Override
public RedisCommandDescription getCommandDescription()
// hset
return new RedisCommandDescription(RedisCommand.HSET, "flinkdemo:kafka:simplekafka");
@Override
public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2)
return stringIntegerTuple2.f0;
@Override
public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2)
return stringIntegerTuple2.f1.toString();
运行代码
然后我们在kafka处输入如下内容
我们去Redis内可以看到有一个这样的Key生成了,内容如下:
然后页面做个实时查询的。。。不管什么页面也好、做个什么APP也好、做个什么小程序也好,连着这个Redis数据结果,随便刷。。。啊我。。。”洗刷刷洗刷刷,洗刷刷“
以上是关于FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜的主要内容,如果未能解决你的问题,请参考以下文章
Flink视频教程_基于Flink流处理的动态实时电商实时分析系统
FLINK 基于1.15.2的Java开发-如何使用外部配置文件