Flink-使用flink处理函数以及状态编程实现TopN案例
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink-使用flink处理函数以及状态编程实现TopN案例相关的知识,希望对你有一定的参考价值。
7.5 应用案例-TopN
7.5.1 使用ProcessAllWindowFunction
- 场景
例如,需要统计最近10秒内最热门的两个url链接,并且每5秒
- 思路
- 使用全窗口函数ProcessAllWindowFunction开窗处理,使用HashMap来保存每个url的访问次数(通过遍历)
- 然后转成ArrayList,然后进行排序,取前两名输出即可
- 代码
- 数据源代码
public class ClickSource implements SourceFunction<Event>
//声明一个标志位控制数据生成
private Boolean running = true;
@Override
//泛型为Event
public void run(SourceContext<Event> ctx) throws Exception
//随机生成数据
Random random = new Random();
//定义字段选取的数据集
String[] users = "Mary","Alice","Bob","Cary";
String[] urls = "./home","./cart","./fav","./prod?id=100","/prod?id=10";
//一直循环生成数据
while (running)
String user = users[random.nextInt(users.length-1)];
String url = urls[random.nextInt(urls.length-1)];
//系统当前事件的毫秒数
Long timestamp = Calendar.getInstance().getTimeInMillis();
//collect收集Event发往下游
ctx.collect(new Event(user,url,timestamp));
Thread.sleep(1000L);
@Override
public void cancel()
running =false;
- 核心代码
public class TopNExample_ProcessAllWindowFunction
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序种延迟0,相当于-1毫秒而已
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>()
@Override
public long extractTimestamp(Event element, long recordTimestamp)
return element.timestamp;
)
);
//直接开窗,收集数据排序
stream.map(data->data.url)//得到String类型的Stream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))//直接开窗
.aggregate(new UrlHashMapCountAgg(),new UrlAllWindowResult())
.print();
env.execute();
//实现自定义的增量聚合函数
public static class UrlHashMapCountAgg implements AggregateFunction<String, HashMap<String,Long>, ArrayList<Tuple2<String,Long>>>
@Override
public HashMap<String, Long> createAccumulator()
return new HashMap<>();
@Override
public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator)
if(accumulator.containsKey(value))
Long count = accumulator.get(value);
accumulator.put(value,count+1);
else
accumulator.put(value,1L);
return accumulator;
//就HashMap转成ArrayList<Tuple2<String, Long>>的操作
@Override
public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator)
ArrayList<Tuple2<String, Long>> result = new ArrayList<>();
for(String key:accumulator.keySet())
result.add(Tuple2.of(key,accumulator.get(key)));
//排序
result.sort(new Comparator<Tuple2<String, Long>>()
@Override
//降序,后减前
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2)
return o2.f1.intValue()-o1.f1.intValue();
);
return result;
@Override
public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b)
return null;
//实现自定义全窗口函数,包装信息输出结果
public static class UrlAllWindowResult extends ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>
@Override
public void process(Context context, Iterable<ArrayList<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception
//先拿出来
ArrayList<Tuple2<String, Long>> list = elements.iterator().next();
StringBuilder result = new StringBuilder();
result.append("---------------\\n");
//获取窗口信息
result.append("窗口结束时间:"+new Timestamp(context.window().getEnd())+"\\n");
//取List排过序后的前两个,包装信息输出
for(int i = 0;i<2;i++)
Tuple2<String, Long> currTuple = list.get(i);
String info = "No."+(i+1)+" "
+"url:"+currTuple.f0+" "
+"访问量"+currTuple.f1+"\\n ";
result.append(info);
result.append("--------------------\\n");
out.collect(result.toString());
- 结果
窗口结束时间:2022-11-25 21:58:35.0
No.1 url:./fav 访问量1
No.2 url:./home 访问量1
--------------------
---------------
窗口结束时间:2022-11-25 21:58:40.0
No.1 url:./home 访问量3
No.2 url:./prod?id=100 访问量3
--------------------
---------------
窗口结束时间:2022-11-25 21:58:45.0
No.1 url:./prod?id=100 访问量4
No.2 url:./cart 访问量2
--------------------
---------------
窗口结束时间:2022-11-25 21:58:50.0
No.1 url:./prod?id=100 访问量4
No.2 url:./fav 访问量3
--------------------
- 评价
用这个方法思路易懂,但是使用了windowAll的全窗口函数,stream直接开窗,所有数据收集到窗口中,导致无分区也就是并行度会变成1,大数据场景下内存估计会炸产生OOM
7.5.2 使用 KeyedProcessFunction
- 场景
例如,需要统计最近10秒内最热门的两个url链接,并且每5秒
- 思路
-
触发
-
参照窗口的流式处理原理,将数据汇聚一段时间后输出,就可以使用定时器
-
窗口结束时间+1豪秒使得watermark触发,即数据到齐
-
-
收集
- 定义一个列表把所有数据保存下来
- 使用状态,根据之前keyby按键分组的状态
-
输出
- 排序
- 输出
- 代码
跟上面差不多,多了状态设置,可以理解urlViewCountListState这个就是用来存有状态的数据的
- 代码
public class TopNExample
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序种延迟0,相当于-1毫秒而已
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>()
@Override
public long extractTimestamp(Event element, long recordTimestamp)
return element.timestamp;
)
);
//1.按照url分组,统计窗口内每个url的访问量
SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream
.keyBy(data -> data.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());
urlCountStream.print("url count");
//2.对一同一窗口统计出的访问量,进行手机和排序(以聚合过的结果按照窗口间隔不间断流式输出)
urlCountStream.keyBy(data->data.windowEnd)
.process(new TopNProcessResult(2))
.print();
env.execute();
//实现自定义的KeyProcessFunction
public static class TopNProcessResult extends KeyedProcessFunction<Long,UrlViewCount,String>
//定义一个属性n
private Integer n;
//1.定义列表状态
private ListState<UrlViewCount> urlViewCountListState;
public TopNProcessResult(Integer n)
this.n = n;
//2.管理状态,在环境中获取状态,使用生命周期方法获取
@Override
public void open(Configuration parameters) throws Exception
urlViewCountListState= getRuntimeContext().getListState(//传入描述器
//两个参数:一个名字,一个类型
new ListStateDescriptor<UrlViewCount>("url-count-list", Types.POJO(UrlViewCount.class)));
@Override
public void processElement(UrlViewCount value,Context ctx, Collector<String> out) throws Exception
//3.将数据保存到状态中
urlViewCountListState.add(value);
//4.注册windowEnd+1ms的定时器
ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey()+1);
//5.用来触发定时器
//将状态拿出来,保存成ArrayList
//输出包装
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception
ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
for(UrlViewCount urlViewCount:urlViewCountListState.get())//得到OUT是一个iterable类型
urlViewCountArrayList.add(urlViewCount);
//排序
urlViewCountArrayList.sort(new Comparator<UrlViewCount>()
@Override
public int compare(UrlViewCount o1, UrlViewCount o2)
return o2.count.intValue()-o1.count.intValue();
);
//6.包装信息打印输出
StringBuilder result = new StringBuilder();
result.append("---------------\\n");
//获取窗口信息
result.append("窗口结束时间:"+new Timestamp(ctx.getCurrentKey())+"\\n");
//包装信息输出
for(int i = 0;i<2;i++)
UrlViewCount currTuple = urlViewCountArrayList.get(i);
String info = "No."+(i+1)+" "
+"url:"+currTuple.url+" "
+"访问量"+currTuple.count+"\\n ";
result.append(info);
result.append("--------------------\\n");
out.collect(result.toString());
- 结果
url count> UrlViewCounturl=./home, count=1, windowStart=2022-11-25 22:42:30.0, windowEnd=2022-11-25 22:42:40.0
url count> UrlViewCounturl=./cart, count=2, windowStart=2022-11-25 22:42:30.0, windowEnd=2022-11-25 22:42:40.0
---------------
窗口结束时间:2022-11-25 22:42:40.0
No.1 url:./cart 访问量2
No.2 url:./home 访问量1
--------------------
url count> UrlViewCounturl=./home, count=2, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0
url count> UrlViewCounturl=./prod?id=100, count=2, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0
url count> UrlViewCounturl=./cart, count=4, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0
---------------
窗口结束时间:2022-11-25 22:42:45.0
No.1 url:./cart 访问量4
No.2 url:./home 访问量2
--------------------
- 评价
可以做并行计算
以上是关于Flink-使用flink处理函数以及状态编程实现TopN案例的主要内容,如果未能解决你的问题,请参考以下文章