从0到1Flink的成长之路-Flink Action 综合案例-订单自动好评
Posted 熊老二-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路-Flink Action 综合案例-订单自动好评相关的知识,希望对你有一定的参考价值。
订单自动好评
需求
在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做
出评价,系统自动给与五星好评,使用Flink 定时器Timer来简单实现这一功能。
Flink Timer
Timer(定时器)是Flink Streaming API提供的用于感知并利用处理时间/事件时间变化的机
制。Ververica blog上给出的描述如下:
对于普通用户来说,最常见的显式利用Timer的地方就是KeyedProcessFunction,在其
processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。
根据时间特征的不同:
处理时间ProcessingTime——调用Context.timerService().registerProcessingTimeTimer()注
册,onTimer()在系统时间戳达到Timer设定的时间戳时触发。
事件时间EventTime——调用Context.timerService().registerEventTimeTimer()注册,
onTimer()在Flink内部水印达到或超过Timer设定的时间戳时触发。
举个栗子,按天实时统计指标并存储在状态中,每天0点清除状态重新统计,就可以在
processElement()方法里注册Timer,再在onTimer()方法里执行state.clear()。
除了KeyedProcessFunction之外,Timer在窗口机制中也有重要的地位。提起窗口自然就能想
到Trigger,即触发器。Flink Timer的4大特点,如下图所示:
数据
首先通过自定义 source 模拟订单的生成,交易订单实体类:TmallOrder
package xx.xxxxxx.flink.tmall;
import lombok.*;
@Setter
@Getter
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public class TmallOrder {
private String orderId ;
private Integer userId ;
private Double orderAmount ;
private String orderTime ;
private String category ; @Override
public String toString() {
return orderId + ", " + userId + ", " + orderAmount + ", " + category + ", " + orderTime;
}
}
自定义数据源,生成交易订单数据,OrderSource代码如下:
> package xx.xxxxxx.flink.tmall; import
> org.apache.commons.lang3.time.FastDateFormat; import
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import java.util.Random; import java.util.concurrent.TimeUnit; /**
> * 自定义数据源,实时产出订单数据,封装至TmallOrder实例对象
> */ public class OrderSource extends RichParallelSourceFunction<TmallOrder> { private boolean isRunning =
> true ; // 商品类别 String category[] = { "女装", "男装", "图书", "家电", "洗护",
> "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公" }; @Override public void
> run(SourceContext<TmallOrder> ctx) throws Exception { Random random =
> new Random() ; FastDateFormat dataFormat =
> FastDateFormat.getInstance("yyyyMMddHHmmssSSS") ; FastDateFormat
> format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS") ; while
> (isRunning){ // 创建订单 long millis = System.currentTimeMillis(); String
> orderPrice = String.format((5 + random.nextInt(100)) + ".%2d", 10 +
> random.nextInt(90)); TmallOrder order = new TmallOrder(
> String.format(dataFormat.format(millis) + "%5d", 10000 +
> random.nextInt(1000)), // (10000 * (random.nextInt(5) + 1))+
> random.nextInt(10000), // Double.parseDouble(orderPrice), //
> format.format(millis), // category[random.nextInt(category.length)] );
> ctx.collect(order); TimeUnit.MILLISECONDS.sleep(1000 +
> (random.nextInt(5) * 500)); } }@Override public void cancel() {
> isRunning = false ;
> }
> }
代码实现
使用Flink完成自动好评功能,调用KeyedProcessFunction函数onTimer定时方法,在订单完
成后多长时间内用户未进行评价,自动给予好评,代码如下所示:
package xx.xxxxxx.flink.tmall;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
/**
* 在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评,
TODO: 使用 Flink Timer定时器实现此功能
*/
public class TmallAutoComments {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 数据源-source
DataStreamSource<TmallOrder> orderStream = env.addSource(new OrderSource());
// 3. 数据转换-transformation
// TODO: 为了更好地演示,设置时间间隔Interval为10秒,如果订单未评价,自动进行好评
/*
定时处理逻辑:
step1. 定义一个MapState类型的状态,key是订单号,value是订单完成时间
step2. 在processElement处理数据的时候,把每个订单的信息存入状态中,此时不做任何处理,
并且注册一个定时器(在订单完成时间+间隔时间(interval)时触发)。
step3. 注册的定时器到达了订单完成时间+间隔时间(interval)时就会触发onTimer方法
调用外部的接口来判断用户是否做过评价,如果没做评价,调用接口给与五星好评;如果做过评价,
则什么也不处理,最后记得把相应的订单从MapState删除。
*/
SingleOutputStreamOperator<String> commentStream = orderStream
.keyBy("orderId")
.process(new KeyedProcessFunction<Tuple, TmallOrder, String>() {
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
private Long interval = 5000L;
// step1. 定义MapState类型状态,key为订单号,value为订单完成时间
private MapState<String, Long> orderState;
@Override
public void open(Configuration parameters) throws Exception {
// 状态描述符
MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<String, Long>(
"orderState", String.class, Long.class
);
// 状态初始化
orderState = getRuntimeContext().getMapState(descriptor);
}@Override
public void processElement(TmallOrder value,
Context ctx,
Collector<String> out) throws Exception {
System.out.println("Order>>>>" + value.toString());
// step2. 将每条数据加入MapState中
long orderTime = format.parse(value.getOrderTime()).getTime();
orderState.put(value.getOrderId(), orderTime);
// step3. 注册定时器,超过多长时间(interval,单位:毫秒) 没有评价,则自动五星好评
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + interval );
}@Override
public void onTimer(long timestamp,
OnTimerContext ctx,
Collector<String> out) throws Exception {
System.err.println("========== 触发定时器时间:" + format.format(timestamp) + " ==========");
Iterator<Map.Entry<String, Long>> iterator = orderState.iterator();
while (iterator.hasNext()){
Map.Entry<String, Long> entry = iterator.next();
// 依据订单ID,到外部存储(比如mysql数据库订单评论表t_order_comment)获取此订单是否评论
String orderId = entry.getKey();
boolean evaluated = isComment(orderId);
if (evaluated) {
System.out.println(
"订单[" + orderId + "]在完成后的[" + interval + "]毫秒时间内已经评价.........");
} else {
// 如果没有完成评价,调用相关的接口给与默认的五星评价
System.err.println(
"订单[" + orderId + "]在完成后,超过[" + interval + "]毫秒未评价,调用接口给与五星自动好.........");
}
orderState.remove(orderId); } }
// 模拟判断此订单是否被评论, 简单判断订单是否可以被5整除,可以的话为未评论
private boolean isComment(String orderId) {
// orderId -> 20201025 211443 969 10794
return Integer.parseInt(orderId.substring(19)) % 5 != 0; }
});
// 4. 数据终端-sink
commentStream.printToErr();
// 5. 触发应用-execute
env.execute(TmallAutoComments.class.getSimpleName());
}
}
运行效果如下所示:
以上是关于从0到1Flink的成长之路-Flink Action 综合案例-订单自动好评的主要内容,如果未能解决你的问题,请参考以下文章