20.Flink高级特性--新特性--双流Joinjoin的分类API代码演示-WindowJoin代码演示-IntervalJoin
Posted 涂作权的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了20.Flink高级特性--新特性--双流Joinjoin的分类API代码演示-WindowJoin代码演示-IntervalJoin相关的知识,希望对你有一定的参考价值。
20.Flink高级特性–新特性–双流Join
20.1.join的分类
20.2.API
20.3.代码演示-WindowJoin
20.4.代码演示-IntervalJoin
20.Flink高级特性–新特性–双流Join
20.1.join的分类
双流Join是Flink面试的高频问题。一般情况下说明以下几点就可以hold了:
- Join大体分类只有两种:Window Join和Interval Join
Window Join又可以根据Window的类型细分出3种:
Tumbling Window Join、Sliding Window Join、Session Window Join。
Windows类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行Join操作。
Interval join也是利用state存储数据再处理,区别在于State中的数据有失效机制,依靠数据触发数据清理。
目前Stream Join的结果时数据笛卡尔积;
- Tumbling Window Join
- Sliding Window Join
20.2.API
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
20.3.代码演示-WindowJoin
需求
来做个案例:
使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过Window Join,将数据关联到一起。
import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* TODO
*
* @author tuzuoquan
* @date 2022/6/15 19:30
*/
public class JoinDemo01_WindowJoin
public static void main(String[] args) throws Exception
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
//商品数据流
DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());
//订单数据流
DataStreamSource<OrderItem> OrderItemDS = env.addSource(new OrderItemSource());
//给数据添加水印(这里简单一点直接使用系统时间作为事件时间)
/*
SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))//指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间
.withTimestampAssigner((order, timestamp) -> order.getEventTime())//指定事件时间列
);
*/
SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark());
SingleOutputStreamOperator<OrderItem> orderItemDSWithWatermark =
OrderItemDS.assignTimestampsAndWatermarks(new OrderItemWatermark());
//TODO 2.transformation---这里是重点
//商品类(商品id,商品名称,商品价格)
//订单明细类(订单id,商品id,商品数量)
//关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
DataStream<FactOrderItem> resultDS = goodsDSWithWatermark.join(orderItemDSWithWatermark)
.where(Goods::getGoodsId)
.equalTo(OrderItem::getGoodsId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//<IN1, IN2, OUT>
.apply(new JoinFunction<Goods, OrderItem, FactOrderItem>()
@Override
public FactOrderItem join(Goods first, OrderItem second) throws Exception
FactOrderItem result = new FactOrderItem();
result.setGoodsId(first.getGoodsId());
result.setGoodsName(first.getGoodsName());
result.setCount(new BigDecimal(second.getCount()));
result.setTotalMoney(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));
return result;
);
//TODO 3.sink
resultDS.print();
//TODO 4.execute
env.execute();
//商品类(商品id,商品名称,商品价格)
@Data
public static class Goods
private String goodsId;
private String goodsName;
private BigDecimal goodsPrice;
public static List<Goods> GOODS_LIST;
public static Random r;
static
r = new Random();
GOODS_LIST = new ArrayList<>();
GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
public static Goods randomGoods()
int rIndex = r.nextInt(GOODS_LIST.size());
return GOODS_LIST.get(rIndex);
public Goods()
public Goods(String goodsId, String goodsName, BigDecimal goodsPrice)
this.goodsId = goodsId;
this.goodsName = goodsName;
this.goodsPrice = goodsPrice;
@Override
public String toString()
return JSON.toJSONString(this);
//订单明细类(订单id,商品id,商品数量)
@Data
public static class OrderItem
private String itemId;
private String goodsId;
private Integer count;
@Override
public String toString()
return JSON.toJSONString(this);
//商品类(商品id,商品名称,商品价格)
//订单明细类(订单id,商品id,商品数量)
//关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
@Data
public static class FactOrderItem
private String goodsId;
private String goodsName;
private BigDecimal count;
private BigDecimal totalMoney;
@Override
public String toString()
return JSON.toJSONString(this);
//实时生成商品数据流
//构建一个商品Stream源(这个好比就是维表)
public static class GoodsSource extends RichSourceFunction<Goods>
private Boolean isCancel;
@Override
public void open(Configuration parameters) throws Exception
isCancel = false;
@Override
public void run(SourceContext sourceContext) throws Exception
while(!isCancel)
Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
TimeUnit.SECONDS.sleep(1);
@Override
public void cancel()
isCancel = true;
//实时生成订单数据流
//构建订单明细Stream源
public static class OrderItemSource extends RichSourceFunction<OrderItem>
private Boolean isCancel;
private Random r;
@Override
public void open(Configuration parameters) throws Exception
isCancel = false;
r = new Random();
@Override
public void run(SourceContext sourceContext) throws Exception
while(!isCancel)
Goods goods = Goods.randomGoods();
OrderItem orderItem = new OrderItem();
orderItem.setGoodsId(goods.getGoodsId());
orderItem.setCount(r.nextInt(10) + 1);
orderItem.setItemId(UUID.randomUUID().toString());
sourceContext.collect(orderItem);
orderItem.setGoodsId("111");
sourceContext.collect(orderItem);
TimeUnit.SECONDS.sleep(1);
@Override
public void cancel()
isCancel = true;
//构建水印分配器,学习测试直接使用系统时间了
//构建水印分配器,学习测试直接使用系统时间了
public static class GoodsWatermark implements WatermarkStrategy<Goods>
@Override
public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context)
return (element, recordTimestamp) -> System.currentTimeMillis();
@Override
public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context)
return new WatermarkGenerator<Goods>()
@Override
public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output)
output.emitWatermark(new Watermark(System.currentTimeMillis()));
@Override
public void onPeriodicEmit(WatermarkOutput output)
output.emitWatermark(new Watermark(System.currentTimeMillis()));
;
//构建水印分配器,学习测试直接使用系统时间了
public static class OrderItemWatermark implements WatermarkStrategy<OrderItem>
@Override
public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context)
return (element, recordTimestamp) -> System.currentTimeMillis();
@Override
public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context)
return new WatermarkGenerator<OrderItem>()
@Override
public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output)
output.emitWatermark(new Watermark(System.currentTimeMillis()));
@Override
public void onPeriodicEmit(WatermarkOutput output)
output.emitWatermark(new Watermark(System.currentTimeMillis()));
;
输出结果:
2> "count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000
1> "count":6,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":58800
2> "count":10,"goodsId":"2","goodsName":"iphone12","totalMoney":120000
1> "count":6,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":58800
1> "count":6,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":58800
2> "count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000
2> "count":10,"goodsId":"2","goodsName":"iphone12","totalMoney":120000
1> "count":6,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":58800
2> "count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000
2> "count":10,"goodsId":"2","goodsName":"iphone12","totalMoney":120000
20.4.代码演示-IntervalJoin
代码演示
1、通过keyBy将两个流join到一起
2、interval join需要设置流A去关联哪个时间范围的流B中的元素。此外,我设置的下界为-1、上界为0,且上界是一个开区间。表达的意思就是流A中某个元素的时间,对应上一秒的流B中的元素。
3、process中将两个key一样的元素,关联在一起,并加载到一个新的FactOrderItem对象中。
import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
以上是关于20.Flink高级特性--新特性--双流Joinjoin的分类API代码演示-WindowJoin代码演示-IntervalJoin的主要内容,如果未能解决你的问题,请参考以下文章
23.Flink-高级特性-新特性-Streaming Flie Sink介绍代码演示Flink-高级特性-新特性-FlinkSQL整合Hive添加依赖和jar包和配置
23.Flink-高级特性-新特性-Streaming Flie Sink介绍代码演示Flink-高级特性-新特性-FlinkSQL整合Hive添加依赖和jar包和配置
23.Flink-高级特性-新特性-Streaming Flie Sink介绍代码演示Flink-高级特性-新特性-FlinkSQL整合Hive添加依赖和jar包和配置