从0到1Flink的成长之路-Flink Action 综合案例

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路-Flink Action 综合案例相关的知识,希望对你有一定的参考价值。

Flink Action 综合案例

需求
在大数据的实时处理中,实时大屏展示已经成了一个很重要的展示项,比如最有名的双十一
大屏实时销售成交额展示。除了这个,还有一些其他场景的应用,比如在后台系统实时的展示网
站当前的pv、uv等等,其实做法都是类似的。
做一个最简单的模拟电商统计大屏的小例子,需求如下:

1.实时计算出当天零点截止到当前时间的销售总额
2.计算出各个分类的销售top3
3.每秒钟更新一次统计结果

window: [2020-10-25 19:08:31]: 男装 = 1051.2
window: [2020-10-25 19:08:31]: 女装 = 1507.69
window: [2020-10-25 19:08:31]: 办公 = 1022.95
window: [2020-10-25 19:08:31]: 家具 = 373.47
window: [2020-10-25 19:08:31]: 乐器 = 182.95
window: [2020-10-25 19:08:31]: 游戏 = 820.56
window: [2020-10-25 19:08:31]: 户外 = 566.54
window: [2020-10-25 19:08:31]: 图书 = 783.45
window: [2020-10-25 19:08:31]: 家电 = 870.21
window: [2020-10-25 19:08:31]: 洗护 = 1235.08
All>>>>[2020-10-25 19:08:31]: all = 8414.1
Top3>>>>2020-10-25 19:08:31, 男装 = 1051.2, 洗护 = 1235.08, 女装 = 1507.69
window: [2020-10-25 19:08:32]: 乐器 = 335.43
window: [2020-10-25 19:08:32]: 游戏 = 820.56
window: [2020-10-25 19:08:32]: 男装 = 1051.2
window: [2020-10-25 19:08:32]: 美妆 = 519.95
window: [2020-10-25 19:08:32]: 家具 = 373.47
window: [2020-10-25 19:08:32]: 女装 = 1761.21
window: [2020-10-25 19:08:32]: 办公 = 1330.54
window: [2020-10-25 19:08:32]: 家电 = 1060.58
window: [2020-10-25 19:08:32]: 洗护 = 1235.08
window: [2020-10-25 19:08:32]: 图书 = 783.45
window: [2020-10-25 19:08:32]: 户外 = 660.51
window: [2020-10-25 19:08:32]: 运动 = 667.7
All>>>>[2020-10-25 19:08:32]: all = 10599.68
Top3>>>>2020-10-25 19:08:32, 洗护 = 1235.08, 办公 = 1330.54, 女装 = 1761.21
window: [2020-10-25 19:08:33]: 女装 = 1761.21
window: [2020-10-25 19:08:33]: 办公 = 1597.3
window: [2020-10-25 19:08:33]: 游戏 = 1020.5
window: [2020-10-25 19:08:33]: 乐器 = 335.43
window: [2020-10-25 19:08:33]: 家具 = 518.09
window: [2020-10-25 19:08:33]: 男装 = 1461.53
window: [2020-10-25 19:08:33]: 美妆 = 662.53
window: [2020-10-25 19:08:33]: 户外 = 762.15
window: [2020-10-25 19:08:33]: 运动 = 1152.23
window: [2020-10-25 19:08:33]: 洗护 = 1496.14
window: [2020-10-25 19:08:33]: 家电 = 1434.22
window: [2020-10-25 19:08:33]: 图书 = 783.45
All>>>>[2020-10-25 19:08:33]: all = 12984.78
Top3>>>>2020-10-25 19:08:33, 洗护 = 1496.14, 女装 = 1761.21, 办公 = 1597.3

实现思路如下图所示:

数据
首先通过自定义 source 模拟订单的生成,交易订单实体类:TmallOrder

package xx.xxxxxx.flink.tmall;
import lombok.*;
@Setter
@Getter
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public class TmallOrder 
private String orderId ;
private Integer userId ;
private Double orderAmount ;
private String orderTime ;
private String category ; @Override
public String toString() 
return orderId + ", " + userId + ", " + orderAmount + ", " + category + ", " + orderTime;
 

自定义数据源,生成交易订单数据,OrderSource代码如下:

package xx.xxxxxx.flink.tmall;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 自定义数据源,实时产出订单数据,封装至TmallOrder实例对象
*/
public class OrderSource extends RichParallelSourceFunction<TmallOrder> 
private boolean isRunning = true ;
// 商品类别
String category[] =  "女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"
;
@Override
public void run(SourceContext<TmallOrder> ctx) throws Exception 
Random random = new Random() ;
FastDateFormat dataFormat = FastDateFormat.getInstance("yyyyMMddHHmmssSSS") ;
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS") ;
while (isRunning)
// 创建订单
long millis = System.currentTimeMillis();
String orderPrice = String.format((5 + random.nextInt(100)) + ".%2d", 10 + random.nextInt(90));
TmallOrder order = new TmallOrder(
String.format(dataFormat.format(millis) + "%5d", 10000 + random.nextInt(1000)), //
(10000 * (random.nextInt(5) + 1))+ random.nextInt(10000), //
Double.parseDouble(orderPrice), //
format.format(millis), //
category[random.nextInt(category.length)]
);
ctx.collect(order);
TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(500));
 @Override
public void cancel() 
isRunning = false ; 
	

先统计各个类别销售额,将其封装在实体类CategoryAmount,代码如下:

package xx.xxxxxx.flink.tmall;
import lombok.*;
@Setter
@Getter
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public class CategoryAmount 
private String category ;
private Double totalAmount ;
private String computeDateTime ;
@Override
public String toString() 
return "[" + computeDateTime + "]: " + category + " = " + totalAmount;

public String toContent() 
return category + " = " + totalAmount;
	 

Flink Window Trigger

Trigger(触发器) 的作用
英文单词 trigger 的意思是触发,作为名词是扳机的意思,例如枪支上的扳机就叫 trigger,
所以也有开火的意思。Flink中,window操作需要伴随对窗口中的数据进行处理的逻辑,也就是窗
口函数,而 Trigger 的作用就是决定何时触发窗口函数中的逻辑执行。

Trigger 抽象类
Flink中定义了Trigger抽象类,任何trigger必须继承Trigger类,并实现其中的
onElement()、onProcessingTime()、onEventTime()、clear()等抽象方法,Flink官方提供了几种常
用的trigger实现,同时,用户可以根据需求自定义trigger。

Flink提供 Triggers
1.EventTimeTrigger:通过对比Watermark和窗口的Endtime确定是否触发窗口计算,如果
Watermark大于Window EndTime则触发,否则不触发,窗口将继续等待。
2.ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果
ProcessTime大于EndTime则触发计算,否则窗口继续等待。
3.ContinuousEventTimeTrigger:根据间隔时间,周期性触发窗口或者Window的结束时间
小于当前EndTime触发窗口计算。
4.ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束
时间小于当前ProcessTime触发窗口计算。
5.CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
6.DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否
触发窗口计算。
7.PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数
据将被清理。

函数:AggregateFunction
Flink 的AggregateFunction是一个基于中间计算结果状态进行增量计算的函数。由于是迭代
计算方式,所以,在窗口处理过程中,不用缓存整个窗口的数据,所以效率执行比较高。
该函数会将给定的聚合函数应用于每个窗口和键。 对每个元素调用聚合函数,以递增方式聚
合值,并将每个键和窗口的状态保持在一个累加器中。
参数类型:AggregateFunction接口,该接口的继承关系和方法如下:

自定义聚合函数需要实现AggregateFunction接口类,它有四个接口实现方法:




代码实现
使用Flink 编程实现每日消费额统计,每隔1秒执行一次。先按照类别统计销售额,再进行总
金额统计和获取Top类别。

package xx.xxxxxx.flink.tmall;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
/**
* Flink 实现:模拟简易双11实时统计大屏
* - 实时计算出当天零点截止到当前时间的销售总额
* - 计算出销售top3类别
* - 每秒钟更新一次统计结果
*/
public class TmallBigScreen 
public static void main(String[] args) throws Exception 
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 第一、设置时间语义:事件时间EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 数据源-source
DataStreamSource<TmallOrder> orderStream = env.addSource(new OrderSource());
//orderStream.printToErr();
// 3. 数据转换-transformation
// 第二、设置事件时间字段及watermark水位线
SingleOutputStreamOperator<TmallOrder> timeStream = orderStream.assignTimestampsAndWatermarks(
// 设置最大允许乱序或延迟数据为5秒
new BoundedOutOfOrdernessTimestampExtractor<TmallOrder>(Time.seconds(5)) 
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS");
@Override
public long extractTimestamp(TmallOrder order) 
long eventTime = System.currentTimeMillis() - 5 * 1000 ;
try
eventTime = format.parse(order.getOrderTime()).getTime();
catch (Exception e) e.printStackTrace(); 
return eventTime;
 
);
//timeStream.printToErr();
// TODO:step1. 每秒统计今日各个类别销售额(window size:1d,trigger interval: 1s,keyBy:category)
SingleOutputStreamOperator<CategoryAmount> categoryWindowStream = timeStream
// a. 设置分组字段:类别category
.keyBy("category")
// b. 事件时间窗口:1 day
.window(TumblingEventTimeWindows.of(Time.days(1)))
// c. 设置触发器trigger:1 second
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(1)))
// d. 窗口聚合操作,实现AggregateFunction,其中金额转换BigDecimal
.aggregate(
// TODO: 定义聚合函数, 对数据进行增量聚合操作
new AggregateFunction<TmallOrder, BigDecimal, Double>()  @Override
public BigDecimal createAccumulator() 
// 初始化中间临时变量,此处创建BigDecimal对象
return new BigDecimal(0);
@Override
public BigDecimal add(TmallOrder order, BigDecimal accumulator) 
// 获取订单金额
Double orderAmount = order.getOrderAmount();
// 累加操作
BigDecimal addBigDecimal = accumulator.add(new BigDecimal(orderAmount));
// 返回值
return addBigDecimal;
@Override
public BigDecimal merge(BigDecimal a, BigDecimal b) 
return a.add(b);
@Override
public Double getResult(BigDecimal accumulator) 
return accumulator.setScale(2, RoundingMode.HALF_UP).doubleValue();

, // TODO: 定义窗口函数, 对窗口数据进行计算并输出
new WindowFunction<Double, CategoryAmount, Tuple, TimeWindow>() 
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
@Override
public void apply(Tuple tuple,TimeWindow window,Iterable<Double> input,
Collector<CategoryAmount> out) throws Exception 
// 类别category
String category = ((Tuple1<String>)tuple).f0 ;
// 窗口中消费金额
Double windowAmount = input.iterator().next();
// 窗口结束时间
String computeDataTime = format.format(System.currentTimeMillis());
// 输出结果
out.collect(new CategoryAmount(category, windowAmount, computeDataTime));
 
);
/*
[2021-01-27 22:15:40]: 办公 = 120.84
[2021-01-27 22:15:40]: 男装 = 415.24
[2021-01-27 22:15:40]: 运动 = 319.08
[2021-01-27 22:15:40]: 乐器 = 165.33
[2021-01-27 22:15:40]: 户外 = 54.12
[2021-01-27 22:15:40]: 家电 = 111.76
[2021-01-27 22:15:40]: 图书 = 309.51
[2021-01-27 22:15:40]: 游戏 = 92.55
[2021-01-27 22:15:40]: 家具 = 199.87
[2021-01-27 22:15:40]: 美妆 = 164.76
*/
//categoryWindowStream.printToErr();
// TODO: step2. 每秒钟统计消费额Top3类别和总销售额(window:1s,processTime 处理时间)
/*
- 销售总额 ; - Top3销售的类别 ; - 每秒计算一次输出
*/
SingleOutputStreamOperator<String> resultStream = categoryWindowStream
// 按照计算时间
.keyBy("computeDataTime")
// 设置窗口:1s
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
// 窗口内聚合,获取总销售额和Top类别
.apply(new WindowFunction<CategoryAmount, String, Tuple, TimeWindow>()  @Override
public void apply(Tuple tuple,
TimeWindow window,
Iterable<CategoryAmount> input,
Collector<String> out) throws Exception 
// TODO: a-1. 声明金额累加
BigDecimal sumDecimal = new BigDecimal(0.0) ;
// TODO: a-2. 定义优先队列,存储CategoryAmount对象,按照金额降序排序
Queue<CategoryAmount> queue = new PriorityQueue<CategoryAmount>(
4, //
new Comparator<CategoryAmount>()  @Override
public int compare(CategoryAmount o1, CategoryAmount o2) 
int comp = 0;
if(o1.getTotalAmount() > o2.getTotalAmount())
comp = 1; // 升序
else if(o1.getTotalAmount() < o2.getTotalAmount())
comp = -1 ; // 降序

return comp;
  //
);
// TODO: b. 遍历窗口数据,进行计算
for (CategoryAmount element : input) 
System.out.println("window: " + element);
// b-1. 加入队列,获取Top3
queue.add(element);
if (queue.size() > 3) queue.poll();
// b-2. 累加金额
sumDecimal = sumDecimal.add(new BigDecimal(element.getTotalAmount()));

// TODO: c. 输出队列数据
String computeDataTime = ((Tuple1<String>) tuple).f0 ;
// c-1. 输出总金额
double sumAmount = sumDecimal.setScale(2, RoundingMode.HALF_UP).doubleValue();
CategoryAmount all = new CategoryAmount("all", sumAmount, computeDataTime);
out.collect("All>>>>" + all.toString());
// c.2. 输出Top3
StringBuilder builder = new StringBuilder(computeDataTime).append(", ");
for (CategoryAmount item : queue) 
builder.append(item.toContent()).append(", ");

String output = builder.toString();
out.collect("Top3>>>>" + output.substring(0, output.length() - 2));

);
// 4. 数据终端-sink
resultStream.printToErr();
/*
window: [2021-01-27 22:31:07]: 家电 = 292.35
window: [2021-01-27 22:31:07]: 办公 = 158.36
window: [2021-01-27 22:31:07]: 男装 = 114.84
window: [2021-01-27 22:31:07]: 图书 = 172.78
window: [2021-01-27 22:31:07]: 家具 = 157.0
window: [2021-01-27 22:31:07]: 乐器 = 304.14
window: [2021-01-27 22:31:07]: 洗护 = 101.39
window: [2021-01-27 22:31:07]: 运动 = 313.77
window: [2021-01-27 22:31:07]: 游戏 = 22.7
window: [2021-01-27 22:31:07]: 女装 = 143.94
window: [2021-01-27 22:31:07]: 户外 = 45.32
window: [2021-01-27 22:31:07]: 美妆 = 252.45
All>>>>[2021-01-27 22:31:07]: all = 2079.04
Top3>>>>2021-01-27 22:31:07, 家电 = 292.35, 乐器 = 304.14, 运动 = 313.77
*/
// 5. 触发执行-execute
env.execute(TmallBigScreen.class.getSimpleName());
	

运行效果如下所示:

以上是关于从0到1Flink的成长之路-Flink Action 综合案例的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路

从0 到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路(十三)

从0到1Flink的成长之路- Flink 原理探析