2021年大数据Flink(四十一):Flink实现订单自动好评
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年大数据Flink(四十一):Flink实现订单自动好评相关的知识,希望对你有一定的参考价值。
目录
Flink实现订单自动好评
需求
在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评,我们今天主要使用Flink的定时器来简单实现这一功能。
数据
自定义source模拟生成一些订单数据.
在这里,我们生了一个最简单的二元组Tuple3,包含用户id,订单id和订单完成时间三个字段.
/**
* 自定义source实时产生订单数据Tuple3<用户id,订单id, 订单生成时间>
*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
private boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
Random random = new Random();
while (flag) {
String userId = random.nextInt(5) + "";
String orderId = UUID.randomUUID().toString();
long currentTimeMillis = System.currentTimeMillis();
ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
Thread.sleep(500);
}
}
@Override
public void cancel() {
flag = false;
}
}
编码步骤
1.env
2.source
3.transformation
设置经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置5s的时间
long interval = 5000L;
分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评
dataStream.keyBy(0).process(new TimerProcessFuntion(interval));
3.1定义MapState类型的状态,key是订单号,value是订单完成时间
3.2创建MapState
MapStateDescriptor<String, Long> mapStateDesc =
new MapStateDescriptor<>("mapStateDesc", String.class, Long.class);
mapState = getRuntimeContext().getMapState(mapStateDesc);
3.3注册定时器
mapState.put(value.f0, value.f1);
ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);
3.4定时器被触发时执行并输出结果
4.sink
5.execute
参考代码
package cn.itcast.action;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
/**
* Author itcast
* Desc
*/
public class OrderAutomaticFavorableComments {
public static void main(String[] args) throws Exception {
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//TODO 2.source
//Tuple3<用户id,订单id,订单生成时间>
DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource());
//TODO 3.transformation
//设置经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置5s的时间
long interval = 5000L;//5s
//分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评
orderDS.keyBy(t -> t.f0)
.process(new TimerProcessFunction(interval));
//TODO 4.sink
//TODO 5.execute
env.execute();
}
/**
* 自定义source实时产生订单数据Tuple3<用户id,订单id, 订单生成时间>
*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
private boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
Random random = new Random();
while (flag) {
String userId = random.nextInt(5) + "";
String orderId = UUID.randomUUID().toString();
long currentTimeMillis = System.currentTimeMillis();
ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
Thread.sleep(500);
}
}
@Override
public void cancel() {
flag = false;
}
}
/**
* 自定义ProcessFunction完成订单自动好评
* 进来一条数据应该在interval时间后进行判断该订单是否超时是否需要自动好评
* abstract class KeyedProcessFunction<K, I, O>
*/
private static class TimerProcessFunction extends KeyedProcessFunction<String, Tuple3<String, String, Long>, Object> {
private long interval;//订单超时时间 传进来的是5000ms/5s
public TimerProcessFunction(long interval) {
this.interval = interval;
}
//-0.准备一个State来存储订单id和订单生成时间
private MapState<String, Long> mapState = null;
//-1.初始化
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
}
//-2.处理每一条数据并存入状态并注册定时器
@Override
public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {
//Tuple3<用户id,订单id, 订单生成时间> value里面是当前进来的数据里面有订单生成时间
//把订单数据保存到状态中
mapState.put(value.f1, value.f2);//xxx,2020-11-11 00:00:00 || xx,2020-11-11 00:00:01
//该订单在value.f2 + interval时过期/到期,这时如果没有评价的话需要系统给与默认好评
//注册一个定时器在value.f2 + interval时检查是否需要默认好评
ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);//2020-11-11 00:00:05 || 2020-11-11 00:00:06
}
//-3.执行定时任务
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
//检查历史订单数据(在状态中存储着)
//遍历取出状态中的订单数据
Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> map = iterator.next();
String orderId = map.getKey();
Long orderTime = map.getValue();
//先判断是否好评--实际中应该去调用订单评价系统看是否好评了,我们这里写个方法模拟一下
if (!isFavorable(orderId)) {//该订单没有给好评
//判断是否超时--不用考虑进来的数据是否过期,统一判断是否超时更保险!
if (System.currentTimeMillis() - orderTime >= interval) {
System.out.println("orderId:" + orderId + "该订单已经超时未评价,系统自动给与好评!....");
//移除状态中的数据,避免后续重复判断
iterator.remove();
mapState.remove(orderId);
}
} else {
System.out.println("orderId:" + orderId + "该订单已经评价....");
//移除状态中的数据,避免后续重复判断
iterator.remove();
mapState.remove(orderId);
}
}
}
//自定义一个方法模拟订单系统返回该订单是否已经好评
public boolean isFavorable(String orderId) {
return orderId.hashCode() % 2 == 0;
}
}
}
参考效果
实现代码:
package cn.lanson.action;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
/**
* Author lanson
* Desc
* 在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评(或者下单之后在一定时间内没有付款, 就触发站内信/短信提醒/取消...)
* 我们今天主要使用Flink的定时器来简单实现这一功能。
* 注意: 这个需求不使用大数据的技术,就是用Web的定时器也可以做
* 课后可以用你熟悉的编程语言/工具/框架去实现
*/
public class OrderAutomaticFavorite {
public static void main(String[] args) throws Exception {
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source
//Tuple3<用户id,订单id,订单生成时间>
DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource());
//TODO 3.transformation
//设置经过interval毫秒用户未对订单做出评价就自动给予好评,为了方便测试,设置5000ms/5s(实际中可以长一点)
long interval = 5000L;
//实现这个功能原本不需要分组,但是为了后面使用keyedState状态,所以这里分下组
orderDS.keyBy(t->t.f0)
.process(new MyKeyedProcessFunction(interval));
//TODO 4.sink
//TODO 5.execute
env.execute();
}
/**
* public abstract class KeyedProcessFunction<K, I, O>
*/
public static class MyKeyedProcessFunction extends KeyedProcessFunction<String,Tuple3<String, String, Long>,Object> {
//准备一个MapState存储订单信息<订单号,订单时间>
private MapState<String,Long> mapState = null;
private long interval = 0L;
public MyKeyedProcessFunction(long interval) {
this.interval = interval;
}
@Override
public void open(Configuration parameters) throws Exception {
//创建状态描述器
MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);
mapState = getRuntimeContext().getMapState(descriptor);
}
//处理进来的每个元素/订单,然后注册定时器,到时候判断是否进行了好评
@Override
public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {
//把订单信息存入状态中方便后续使用
mapState.put(value.f1,value.f2);
//注册定时器在interval时间后执行/在value.f2 + interval时间时执行
ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);
}
//实现定时器执行方法
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
//定时器触发的时候需要检查状态中的订单是否已经好评了
Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
while (iterator.hasNext()){
Map.Entry<String, Long> entry = iterator.next();
String orderId = entry.getKey();
Long orderTime = entry.getValue();
//判断该订单是否已经评价--实际中需要调用外部订单系统的接口,我们自己简单起见直接调用模拟的方法
if(isEvaluate(orderId)){
//已经评价过了
System.out.println("该订单:"+orderId+"用户已评价");
//移除当前订单
iterator.remove();//迭代器可以直接移除元素
//保险一定状态中也移除
mapState.remove(orderId);
}else{
//没有评价
//注意:一个key(用户)有很多订单,有的可能超时,有的可能还未超时
//所以需要判断是否超时
if(System.currentTimeMillis() - orderTime >= interval){
//超时且未评价,需要系统给予自动好评
System.out.println("该订单:"+orderId+"已超时未评价,系统给予自动好评");
//移除当前订单
iterator.remove();//迭代器可以直接移除元素
//保险一定状态中也移除
mapState.remove(orderId);
}/*else{
//未超时,不用管
}*/
}
}
}
//模拟订单系统,传入订单id,返回该订单是否已经评价
public boolean isEvaluate(String orderId){
//下面这行代码会随机返回订单是否已经评价
return new Random().nextInt(10) % 2 == 0;
}
}
/**
* 自定义source实时产生订单数据Tuple3<用户id,订单id,订单生成时间>
*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
private boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
Random random = new Random();
while (flag) {
String userId = random.nextInt(5) + "";
String orderId = UUID.randomUUID().toString();
long currentTimeMillis = System.currentTimeMillis();
ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
Thread.sleep(500);
}
}
@Override
public void cancel() {
flag = false;
}
}
}
以上是关于2021年大数据Flink(四十一):Flink实现订单自动好评的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Flink(四十):Flink模拟双十一实时大屏统计
2021年大数据Flink(四十七):扩展阅读 File Sink