刁钻导师难为人?我转手丢给他一个Flink史上最简单双十一实时分析案例
Posted ChinaManor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了刁钻导师难为人?我转手丢给他一个Flink史上最简单双十一实时分析案例相关的知识,希望对你有一定的参考价值。
引言
大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。
上期带大家用StructredStreaming做了双十一实时报表分析,没看过的朋友可以看看,这是链接:
StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)
这次导师布置了一个最新任务:需求不变,用Flink完成,
阿这
我是菜鸡,刚学Flink,不懂阿~
没办法,只能硬着头皮上了!
先明确一下需求:
1.实时计算出当天零点截止到当前时间的销售总额
2.计算出各个分类的销售额最大的top3
3.每秒钟更新一次统计结果
不管会不会,上来先创建一个流:
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置成流批一体模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
牛批~
下一步:
添加订单数据,Tuple2<分类, 金额>
DataStreamSource<Tuple2<String, Double>> orderDS = env.addSource(new MySource());
第三步转换:
需求一:每秒预聚合各个分类的销售总额:从当天0点开始截止到目前为止的各个分类的销售总额
SingleOutputStreamOperator<CategoryPojo> aggregateResult = orderDS.keyBy(t -> t.f0)
//注意:中国使用UTC+08:00,您需要一天大小的时间窗口,
//窗口从当地时间的每00:00:00开始,您可以使用{@code of(time.days(1),time.hours(-8))}
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
//注意:下面表示每秒触发计算
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
//聚合(可以使用之前学习的简单聚合:sum/reduce/或自定义聚合:apply或使用aggregate聚合(可以指定如何聚合及如何收集聚合结果))
.aggregate(new MyAggregate(), new MyWindow());
敲了这么久,忙得满头大汉~先看看效果对不对,不对不就白干一场
了:
aggregateResult.print();
env.execute();
还好,成功了!
需求二:计算所有分类的销售总额和分类销售额最大Top3
aggregateResult.keyBy(c -> c.getDateTime())
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
//先按照时间对数据分组,因为后续要每秒更新/计算销售总额和分类销售额Top3
.process(new MyProcessWindowFunction());
好像又成功了吧?!Flink实时计算也没那么难
加上注释只有76行代码
…
眉头一皱,发现事情并没有那么简单
博主,博主还有自定义类呢,被你吞了??
CategoryPojo.class
/**
* 用于存储聚合的结果
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CategoryPojo {
private String category;//分类名称
private double totalPrice;//该分类总销售额
private String dateTime;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
}
MyWindow .class
/**
// * interface WindowFunction<IN, OUT, KEY, W extends Window>
// * 自定义窗口函数,实现窗口聚合数据的收集
// */
public static class MyWindow implements WindowFunction<Double, CategoryPojo, String, TimeWindow> {
private FastDateFormat df =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
@Override
public void apply(String key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {
double totalPrice =0d;
for (Double price : input) {
totalPrice +=price;
}
CategoryPojo categoryPojo = new CategoryPojo();
categoryPojo.setCategory(key);
categoryPojo.setDateTime(df.format(System.currentTimeMillis()));
categoryPojo.setTotalPrice(totalPrice);
out.collect(categoryPojo);
}
}
MyAggregate.class
/**
* interface AggregateFunction<IN, ACC, OUT>
* 自定义聚合函数,实现各个分类销售额的预聚合/累加
*/
public static class MyAggregate implements AggregateFunction<Tuple2<String,Double>,Double,Double>{
//初始化累加器
@Override
public Double createAccumulator() {
return 0d;
}
//累加过程
@Override
public Double add(Tuple2<String, Double> value, Double accumulator) {
return value.f1+accumulator;
}
//累加结果
@Override
public Double getResult(Double accumulator) {
return accumulator;
}
//合并结果
@Override
public Double merge(Double a, Double b) {
return a+b;
}
}
计算分类销售额最大的Top3,我用的是之前学的外比较器进行排序
:
数据结构与算法__冒泡排序__Java外比较器和内比较器(排序专题)
MyProcessWindowFunction.class
/**
* abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
*/
public static class MyProcessWindowFunction extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<CategoryPojo> categoryPojos, Collector<Object> out) throws Exception {
Double totalAmount = 0d;//用来记录销售总额
//尝试使用外比较器进行排序
ArrayList<CategoryPojo> list = new ArrayList<>();
for (CategoryPojo categoryPojo : categoryPojos) {
//--1.计算截止到目前为止的所有分类的销售总额
totalAmount += categoryPojo.getTotalPrice();
//--2. 分类销售额最大的Top3
if (list.size()<3){
list.add(categoryPojo);
}else {
//>=3
CategoryPojo first = list.get(0);
if (categoryPojo.getTotalPrice()>first.getTotalPrice()){
list.remove(first);
list.add(categoryPojo);
}//进来元素小就不用变
}
}
list.sort(new Comparator<CategoryPojo>() {
@Override
public int compare(CategoryPojo o1, CategoryPojo o2) {
return (int) (o1.getTotalPrice()-o2.getTotalPrice());
}
});
//--3.直接在这里输出
System.out.println("================================================================================================================================");
System.out.println("----当前时间:----");
System.out.println(key);
System.out.println("----销售总额:----");
System.out.println(new BigDecimal(totalAmount).setScale(2, RoundingMode.HALF_UP));
System.out.println("----销售额Top3分类:----");
list.stream()
.map(c -> {
c.setTotalPrice(new BigDecimal(c.getTotalPrice()).setScale(2, RoundingMode.HALF_UP).doubleValue());
return c;
})
.sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1)
.forEach(System.out::println); }}
下面是完整代码:
package demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Random;
/**
* @author ChinaManor
* #Description
* * Desc今天我们就做一个最简单的模拟电商统计大屏的小例子,
* * 需求如下:
* * 1.实时计算出当天零点截止到当前时间的销售总额
* * 2.计算出各个分类的销售额最大的top3
* * 3.每秒钟更新一次统计结果
* #Date: 25/6/2021 08:28
*/
public class T4 {
public static void main(String[] args) throws Exception {
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source
//订单数据Tuple2<分类, 金额>
DataStreamSource<Tuple2<String, Double>> orderDS = env.addSource(new MySource());
//TODO 3.transformation
//-1.每秒预聚合各个分类的销售总额:从当天0点开始截止到目前为止的各个分类的销售总额
SingleOutputStreamOperator<CategoryPojo> aggregateResult = orderDS.keyBy(t -> t.f0)
//注意:中国使用UTC+08:00,您需要一天大小的时间窗口,
//窗口从当地时间的每00:00:00开始,您可以使用{@code of(time.days(1),time.hours(-8))}
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
//注意:下面表示每秒触发计算
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
//聚合(可以使用之前学习的简单聚合:sum/reduce/或自定义聚合:apply或使用aggregate聚合(可以指定如何聚合及如何收集聚合结果))
.aggregate(new MyAggregate(), new MyWindow());
//输出查看下预聚合的结果
// aggregateResult.print();
//按照分类将订单金额进行聚合:
//分类名称 金额 时间
/* //男装 100 2021-11-11 11:11:11
//女装 100 2021-11-11 11:11:11
//男装 200 2021-11-11 11:11:12
//女装 200 2021-11-11 11:11:12*/
//TODO 4.sink
//-2.计算所有分类的销售总额和分类销售额最大Top3
//要求每秒更新/计算所有分类目前的销售总额和分类销售额Top3
// aggregateResult.keyBy(CategoryPojo::getDateTime)
aggregateResult.keyBy(c -> c.getDateTime())
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
//先按照时间对数据分组,因为后续要每秒更新/计算销售总额和分类销售额Top3
.process(new MyProcessWindowFunction());
//TODO 5.execute
env.execute();
}
/**
* abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
*/
public static class MyProcessWindowFunction extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<CategoryPojo> categoryPojos, Collector<Object> out) throws Exception {
Double totalAmount = 0d;//用来记录销售总额
//尝试使用外比较器进行排序
ArrayList<CategoryPojo> list = new ArrayList<>();
for (CategoryPojo categoryPojo : categoryPojos) {
//--1.计算截止到目前为止的所有分类的销售总额
totalAmount += categoryPojo.getTotalPrice();
//--2. 分类销售额最大的Top3
if (list.size()<3){
list.add(categoryPojo);
}else {
//>=3
CategoryPojo first = list.get(0);
if (categoryPojo.getTotalPrice()>first.getTotalPrice()){
list.remove(first);
list.add(categoryPojo);
}//进来元素小就不用变
}
}
list.sort(new Comparator<CategoryPojo>() {
@Override
public int compare(CategoryPojo o1, CategoryPojo o2) {
return (int) (o1.getTotalPrice()-o2.getTotalPrice());
}
});
//--3.直接在这里输出
System.out.println("================================================================================================================================");
System.out.println("----当前时间:----");
System.out.println(key);
System.out.println("----销售总额:----");
System.out.println(new BigDecimal(totalAmount).setScale(2, RoundingMode.HALF_UP));
System.out.println("----销售额Top3分类:----");
list.stream()
.map(c -> {
c.setTotalPrice(new BigDecimal(c.getTotalPrice()).setScale(2, RoundingMode.HALF_UP).doubleValue());
return c;
})
.sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1)
.forEach(System.out::println); }}
/**
* interface AggregateFunction<IN, ACC, OUT>
* 自定义聚合函数,实现各个分类销售额的预聚合/累加
*/
public static class MyAggregate implements AggregateFunction<Tuple2<String,Double>,Double,Double>{
//初始化累加器
@Override
public Double createAccumulator() {
return 0d;
}
//累加过程
@Override
public Double add(Tuple2<String, Double> value, Double accumulator) {
return value.f1+accumulator;
}
//累加结果
@Override
public Double getResult(Double accumulator以上是关于刁钻导师难为人?我转手丢给他一个Flink史上最简单双十一实时分析案例的主要内容,如果未能解决你的问题,请参考以下文章
史上最简SLAM零基础解读 - Jacobian matrix(雅可比矩阵) → 理论分析与应用详解(Bundle Adjustment)
Centos6.7系统环境下使用 yum install 安装mysql-community-5.7.22(史上最简)