18.Flink-练习-订单自动好评数据实现步骤代码实现
Posted 涂作权的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了18.Flink-练习-订单自动好评数据实现步骤代码实现相关的知识,希望对你有一定的参考价值。
18.Flink-练习-订单自动好评
18.1.数据
18.2.实现步骤
18.3.代码实现
18.Flink-练习-订单自动好评-掌握
需求:在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评,我们今天主要使用Flink的定时器来简单实现这一功能。
18.1.数据
/**
* 自定义source实时产生订单数据Tuple3<用户id,订单id, 订单生成时间>
*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>>
private boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception
Random random = new Random();
while (flag)
String userId = random.nextInt(5) + "";
String orderId = UUID.randomUUID().toString();
long currentTimeMillis = System.currentTimeMillis();
ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
Thread.sleep(500);
@Override
public void cancel()
flag = false;
18.2.实现步骤
1.env
2.source
3.transformation
设置经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置5s的时间
long interval = 5000L;
分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评
dataStream.keyBy(0).process(new TimerProcessFuntion(interval));
3.1定义MapState类型的状态,key是订单号,value是订单完成时间
3.2创建MapState
MapStateDescriptor<String, Long> mapStateDesc =
new MapStateDescriptor<>("mapStateDesc", String.class, Long.class);
mapState = getRuntimeContext().getMapState(mapStateDesc);
3.3注册定时器
mapState.put(value.f0, value.f1);
ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);
3.4定时器被触发时执行并输出结果
4.sink
5.execute
18.3.代码实现
package day5;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
/**
* TODO
*
* @author tuzuoquan
* @date 2022/6/13 9:40
*/
public class OrderAutomaticFavorableComments
public static void main(String[] args) throws Exception
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//TODO 2.source
//Tuple3<用户id,订单id,订单生成时间>
DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource());
//TODO 3.transformation
//设置经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置5s的时间
//5s
long interval = 5000L;
//分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评
orderDS.keyBy(t -> t.f0)
.process(new TimerProcessFunction(interval));
//TODO 4.sink
//TODO 5.execute
env.execute();
/**
* 自定义source实时产生订单数据Tuple3<用户id,订单id, 订单生成时间>
*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>>
private boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception
Random random = new Random();
while (flag)
String userId = random.nextInt(5) + "";
String orderId = UUID.randomUUID().toString();
long currentTimeMillis = System.currentTimeMillis();
ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
Thread.sleep(500);
@Override
public void cancel()
flag = false;
/**
* 自定义ProcessFunction完成订单自动好评
* 进来一条数据应该在interval时间后进行判断该订单是否超时是否需要自动好评
* abstract class KeyedProcessFunction<K, I, O>
*/
private static class TimerProcessFunction extends KeyedProcessFunction<String, Tuple3<String, String, Long>, Object>
private long interval;//订单超时时间 传进来的是5000ms/5s
public TimerProcessFunction(long interval)
this.interval = interval;
//-0.准备一个State来存储订单id和订单生成时间
private MapState<String, Long> mapState = null;
//-1.初始化
@Override
public void open(Configuration parameters) throws Exception
MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
//-2.处理每一条数据并存入状态并注册定时器
@Override
public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception
//Tuple3<用户id,订单id, 订单生成时间> value里面是当前进来的数据里面有订单生成时间
//把订单数据保存到状态中
mapState.put(value.f1, value.f2);//xxx,2020-11-11 00:00:00 ||xx,2020-11-11 00:00:01
//该订单在value.f2 + interval时过期/到期,这时如果没有评价的话需要系统给与默认好评
//注册一个定时器在value.f2 + interval时检查是否需要默认好评
ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);//2020-11-11 00:00:05 || 2020-11-11 00:00:06
//-3.执行定时任务
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception
//检查历史订单数据(在状态中存储着)
//遍历取出状态中的订单数据
Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
while (iterator.hasNext())
Map.Entry<String, Long> map = iterator.next();
String orderId = map.getKey();
Long orderTime = map.getValue();
//先判断是否好评--实际中应该去调用订单评价系统看是否好评了,我们这里写个方法模拟一下
if (!isFavorable(orderId)) //该订单没有给好评
//判断是否超时--不用考虑进来的数据是否过期,统一判断是否超时更保险!
if (System.currentTimeMillis() - orderTime >= interval)
System.out.println("orderId:" + orderId + "该订单已经超时未评价,系统自动给与好评!....");
//移除状态中的数据,避免后续重复判断
iterator.remove();
mapState.remove(orderId);
else
System.out.println("orderId:" + orderId + "该订单已经评价....");
//移除状态中的数据,避免后续重复判断
iterator.remove();
mapState.remove(orderId);
//自定义一个方法模拟订单系统返回该订单是否已经好评
public boolean isFavorable(String orderId)
return orderId.hashCode() % 2 == 0;
输出结果:
orderId:e7246ce7-ef64-42b3-ae99-b3a640622c42该订单已经评价....
orderId:fb8c713d-2674-4ecf-8b62-6f4883ca7fff该订单已经超时未评价,系统自动给与好评!....
orderId:72e4350f-5652-4ccd-abb5-cb07f8c8fc05该订单已经超时未评价,系统自动给与好评!....
orderId:5fbb1489-96cd-48b8-8e17-b09eb8057fa3该订单已经评价....
orderId:8682815e-1568-48a4-831c-9d39889b7fb1该订单已经超时未评价,系统自动给与好评!....
orderId:2c3e2f0b-9f5e-4ef0-b4f8-fce7cea14f35该订单已经评价....
orderId:989cae1a-b005-41ba-a337-c9b2ad049ad8该订单已经超时未评价,系统自动给与好评!....
orderId:018d83d0-e311-47ce-884b-3a422a16ea33该订单已经超时未评价,系统自动给与好评!....
orderId:fddebcf5-6050-4e7c-90d9-bffdcf3d6e58该订单已经超时未评价,系统自动给与好评!....
orderId:3d9987ad-c98e-4e81-abc5-5751589df876该订单已经评价....
以上是关于18.Flink-练习-订单自动好评数据实现步骤代码实现的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Flink(四十一):Flink实现订单自动好评
从0到1Flink的成长之路-Flink Action 综合案例-订单自动好评
从0到1Flink的成长之路-Flink Action 综合案例-订单自动好评