Flink之WaterMarker详解

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink之WaterMarker详解相关的知识,希望对你有一定的参考价值。

参考技术A 对于流计算来说,最核心的概念就是无穷数据集,而用来处理无穷数据集的计算就可以称为流计算。面对无穷数据集,有多种多样的处理方式,但是大致上可以分为四类:

1、时间无关:最基础的场景就是Filter,我们只关心我们想要的数据,这跟数据源是否是无穷的、失序都没有关系了。
2、近似算法:比如近似Top-N、流K-means聚类等。他们都以无穷数据为输入,并计算出差不多你想要的结果。
3、窗口:而窗口其实就是对无穷数据集进行分片,一种化无穷为有穷的抽象概念。

显然,无穷数据集有N多分片的方式,因此也就对应着N多的窗口。而其中最为引人注目的就是按时间划分的窗口了,是的,没有比时间窗口更有吸引力的划分方式了。而在时间窗口中,核心的一个概念就是时间,在流计算中一般可以分为处理时间和事件时间,当然还可以定义更多时间的概念,这完全看你自己喽。在Flink中就有这么一个东西:摄入时间。在这里,想说的是,只有事件时间才能保证正确性,程序进行回放也能保证一致性。

对于无穷数据集,我们缺乏一种有效的方式来判断数据完整性,因此就有了WaterMark。watermark是建立在事件时间上的一个概念,用来刻画数据流的完整性。如果按照处理时间来衡量事件,一切都是有序的,完美的,自然而然也就无需watermark了。
换句话说事件时间引入了乱序的问题,而watermark就是用来解决乱序问题。所谓的乱序,其实就是有事件迟到了,对于迟到的元素,我们不可能无限期的等下去,必须要有一种机制来保证一个特定的时间后,必须触发window进行计算了。这个特别的机制,就是watermark,它告诉了算子时间不大于 也就是小于等于 WaterMark 的消息不应该再被接收(如果出现意味着延迟到达)。
备注:后边有句话,当watermark时间 >= window maxTimestamp时,就符合了window触发的条件了,可以帮助理解这句话

在Flink中,window是按照自然时间进行划分的,如果window大小是3秒,那么1分钟内会把window划分为如下的形式:

如果window大小是10秒,则window会被分为如下的形式:当然还有一个offset值可以控制window的起始值不是整点。

到 EventTimeTrigger 的 onElement 中看看: EventTimeTrigger 中当 ctx.getCurrentWatermark >= window.maxTimestamp 时立刻触发窗口计算。

window.maxTimestamp = 窗口结束时间 - 1,flink时间窗口的单位为ms,也就是时间戳,也就是说就差一毫秒,也不会触发窗口。

然后到调用Evictor的地方看看:没有内容是不会触发计算的

输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当 watermark 时间 >= window maxTimestamp 时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window maxTimestamp决定。
waterMark,checkpoint其实都是上游节点广播消息给下游节点来处理的行为(都是在流中插入一种特殊的数据结构来做处理)

即按照固定的时间间隔周期的生成水位线。这个时间间隔可以通过 ExecutionConfig.setAutoWatermarkInterval(...) 进行设置。当然只有新生成的水位线不为空并且大于上一次生成的水位线,新水位线才会被发出。
生成新的水位线的逻辑完全是由用户自己定义的。最简单的水位线生成算法就是取目前为止最大的事件时间。当然这种算法比较暴力,容易水位线提升突涨(这个最大时间戳可能过大),因此该算法对乱序事件的容忍程度比较低,容易出现大量迟到事件。当然我们用的最多的是KeyedWindow,一个Window往往有多个输入,而Window算子会选择其中最小的一个。

通过数据流中某些特殊标记事件来触发新水位线的生成。

虽然水位线指示着早于它的事件不应该再出现,但是在实际情况中,水位线生成算法,往往生不成完美水位线,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

1、重新激活已经关闭的窗口并重新计算以修正结果。
2、将迟到事件收集起来另外处理。
3、将迟到事件视为错误消息并丢弃。

Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side Output和Allowed Lateness。

Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

Flink1.12之双流Join详解

双流Join是Flink面试的高频问题:
Join大体分类只有两种:Window Join和Interval Join。

Window Join又可以根据Window的类型细分出3种:

Tumbling Window Join

Sliding Window Join

Session Widnow Join

Windows类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作。

interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理。

01 Window Join

1.1Tumbling Window Join

执行翻滚窗口联接时,具有公共键和公共翻滚窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射!
如图所示,我们定义了一个大小为2毫秒的翻滚窗口,结果窗口的形式为[0,1]、[2,3]、。。。。该图显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。
在这里插入图片描述

1.2 Sliding Window Join

在执行滑动窗口联接时,具有公共键和公共滑动窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!请注意,某些元素可能会连接到一个滑动窗口中,但不会连接到另一个滑动窗口中!

在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,从而产生滑动窗口[-1,0],[0,1],[1,2],[2,3]…。x轴下方的连接元素是传递给每个滑动窗口的JoinFunction的元素。在这里,您还可以看到,例如,在窗口[2,3]中,橙色②与绿色③连接,但在窗口[1,2]中没有与任何对象连接。
在这里插入图片描述

1.3 Session Window Join

在执行会话窗口联接时,具有相同键(当“组合”时满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出!

在这里,我们定义了一个会话窗口连接,其中每个会话被至少1ms的间隔分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!
在这里插入图片描述

02 Interval Join

前面学习的Window Join必须要在一个Window中进行JOIN,那如果没有Window如何处理呢?
interval join也是使用相同的key来join两个流(流A、流B),

并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]

or

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

也就是:

流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且,流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界
在这里插入图片描述
在上面的示例中,我们将两个流“orange”和“green”连接起来,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是可以应用.lowerBoundExclusive()和.upperBoundExclusive来更改行为

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

03 代码演示

3.1 双流Join-windowJoin

需求:使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。

思路:

1、Window Join首先需要使用where和equalTo指定使用哪个key来进行关联,此处我们通过应用方法,基于GoodsId来关联两个流中的元素。

2、设置5秒的滚动窗口,流的元素关联都会在这个5秒的窗口中进行关联。

3、apply方法中实现将两个不同类型的元素关联并生成一个新类型的元素。


import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author SKY
 * Desc 演示Flink双流Join-windowJoin
 */
public class JoinDemo01_WindowJoin {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        //商品数据流
        DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());
        //订单数据流
        DataStreamSource<OrderItem> OrderItemDS = env.addSource(new OrderItemSource());
        //给数据添加水印(这里简单一点直接使用系统时间作为事件时间)
     
        SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark());
        SingleOutputStreamOperator<OrderItem> OrderItemDSWithWatermark = OrderItemDS.assignTimestampsAndWatermarks(new OrderItemWatermark());


        //TODO 2.transformation---这里是重点
        //商品类(商品id,商品名称,商品价格)
        //订单明细类(订单id,商品id,商品数量)
        //关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
        DataStream<FactOrderItem> resultDS = goodsDSWithWatermark.join(OrderItemDSWithWatermark)
                .where(Goods::getGoodsId)
                .equalTo(OrderItem::getGoodsId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //<IN1, IN2, OUT>
                .apply(new JoinFunction<Goods, OrderItem, FactOrderItem>() {
                    @Override
                    public FactOrderItem join(Goods first, OrderItem second) throws Exception {
                        FactOrderItem result = new FactOrderItem();
                        result.setGoodsId(first.getGoodsId());
                        result.setGoodsName(first.getGoodsName());
                        result.setCount(new BigDecimal(second.getCount()));
                        result.setTotalMoney(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));
                        return result;
                    }
                });


        //TODO 3.sink
        resultDS.print();

        //TODO 4.execute
        env.execute();
    }
    //商品类(商品id,商品名称,商品价格)
    @Data
    public static class Goods {
        private String goodsId;
        private String goodsName;
        private BigDecimal goodsPrice;
        public static List<Goods> GOODS_LIST;
        public static Random r;

        static  {
            r = new Random();
            GOODS_LIST = new ArrayList<>();
            GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
            GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
            GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
            GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
            GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
            GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
        }
        public static Goods randomGoods() {
            int rIndex = r.nextInt(GOODS_LIST.size());
            return GOODS_LIST.get(rIndex);
        }
        public Goods() {
        }
        public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
            this.goodsId = goodsId;
            this.goodsName = goodsName;
            this.goodsPrice = goodsPrice;
        }
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //订单明细类(订单id,商品id,商品数量)
    @Data
    public static class OrderItem {
        private String itemId;
        private String goodsId;
        private Integer count;
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //商品类(商品id,商品名称,商品价格)
    //订单明细类(订单id,商品id,商品数量)
    //关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
    @Data
    public static class FactOrderItem {
        private String goodsId;
        private String goodsName;
        private BigDecimal count;
        private BigDecimal totalMoney;
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //实时生成商品数据流
    //构建一个商品Stream源(这个好比就是维表)
    public static class GoodsSource extends RichSourceFunction<Goods> {
        private Boolean isCancel;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                TimeUnit.SECONDS.sleep(1);
            }
        }
        @Override
        public void cancel() {
            isCancel = true;
        }
    }
    //实时生成订单数据流
    //构建订单明细Stream源
    public static class OrderItemSource extends RichSourceFunction<OrderItem> {
        private Boolean isCancel;
        private Random r;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
            r = new Random();
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods goods = Goods.randomGoods();
                OrderItem orderItem = new OrderItem();
                orderItem.setGoodsId(goods.getGoodsId());
                orderItem.setCount(r.nextInt(10) + 1);
                orderItem.setItemId(UUID.randomUUID().toString());
                sourceContext.collect(orderItem);
                orderItem.setGoodsId("111");
                sourceContext.collect(orderItem);
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isCancel = true;
        }
    }
    //构建水印分配器,学习测试直接使用系统时间了
    public static class GoodsWatermark implements WatermarkStrategy<Goods> {
        @Override
        public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Goods>() {
                @Override
                public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
    //构建水印分配器,学习测试直接使用系统时间了
    public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
        @Override
        public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<OrderItem>() {
                @Override
                public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
}

结果显示:

2> {"count":4,"goodsId":"6","goodsName":"Mate 40","totalMoney":26000}
2> {"count":4,"goodsId":"6","goodsName":"Mate 40","totalMoney":26000}
2> {"count":4,"goodsId":"6","goodsName":"Mate 40","totalMoney":26000}
2> {"count":4,"goodsId":"6","goodsName":"Mate 40","totalMoney":26000}
6> {"count":8,"goodsId":"1","goodsName":"小米12","totalMoney":39120}
6> {"count":1,"goodsId":"1","goodsName":"小米12","totalMoney":4890}
6> {"count":8,"goodsId":"1","goodsName":"小米12","totalMoney":39120}
4> {"count":1,"goodsId":"3","goodsName":"MacBookPro","totalMoney":15000}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
4> {"count":1,"goodsId":"3","goodsName":"MacBookPro","totalMoney":15000}
6> {"count":1,"goodsId":"1","goodsName":"小米12","totalMoney":4890}
4> {"count":1,"goodsId":"3","goodsName":"MacBookPro","totalMoney":15000}
8> {"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
4> {"count":1,"goodsId":"3","goodsName":"MacBookPro","totalMoney":15000}
6> {"count":8,"goodsId":"1","goodsName":"小米12","totalMoney":39120}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
8> {"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
4> {"count":1,"goodsId":"3","goodsName":"MacBookPro","totalMoney":15000}
6> {"count":1,"goodsId":"1","goodsName":"小米12","totalMoney":4890}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
6> {"count":8,"goodsId":"1","goodsName":"小米12","totalMoney":39120}
8> {"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
6> {"count":1,"goodsId":"1","goodsName":"小米12","totalMoney":4890}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
6> {"count":8,"goodsId":"1","goodsName":"小米12","totalMoney":39120}
8> {"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
6> {"count":1,"goodsId":"1","goodsName":"小米12","totalMoney":4890}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
8> {"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
2> {"count":8,"goodsId":"6","goodsName":"Mate 40","totalMoney":52000}
2> {"count":8,"goodsId":"6","goodsName":"Mate 40","totalMoney":52000}
2> {"count":8,"goodsId":"6","goodsName":"Mate 40","totalMoney":52000}
4> {"count":2,"goodsId":"3","goodsName":"MacBookPro","totalMoney":30000}
4> {"count":2,"goodsId":"3","goodsName":"MacBookPro","totalMoney":30000}
4> {"count":2,"goodsId":"3","goodsName":"MacBookPro","totalMoney":30000}
6> {"count":6,"goodsId":"1","goodsName":"小米12","totalMoney":29340}
6> {"count":2,"goodsId":"1","goodsName":"小米12","totalMoney":9780}
6> {"count":6,"goodsId":"1","goodsName":"小米12","totalMoney":29340}
6> {"count":2,"goodsId":"1","goodsName":"小米12","totalMoney":9780}
6> {"count":6,"goodsId":"1","goodsName":"小米12","totalMoney":29340}
4> {"count":2,"goodsId":"3","goodsName":"MacBookPro","totalMoney":30000}
2> {"count":8,"goodsId":"6","goodsName":"Mate 40","totalMoney":52000}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
2> {"count":8,"goodsId":"6","goodsName":"Mate 40","totalMoney":52000}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
4> {"count":2,"goodsId":"3","goodsName":"MacBookPro","totalMoney":30000}
6> {"count":2,"goodsId":"1","goodsName":"小米12","totalMoney":9780}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
6> {"count":6,"goodsId":"1","goodsName":"小米12","totalMoney":29340}
6> {"count":2,"goodsId":"1","goodsName":"小米12","totalMoney":9780}
6> {"count":6,"goodsId":"1","goodsName":"小米12","totalMoney":29340}
6> {"count":2,"goodsId":"1","goodsName":"小米12","totalMoney":9780}
8> {"count":10,"goodsId":"5","goodsName":"MeiZu One","totalMoney":32000}
8> {"count":10,"goodsId":"5","goodsName":"MeiZu One","totalMoney":32000}
6> {"count":2,"goodsId":"1","goodsName":"小米12","totalMoney":9780}
1> {"count":1,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":9800}
1> {"count":1,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":9800}
4> {"count":4,"goodsId":"3","goodsName":"MacBookPro","totalMoney":60000}
1> {"count":1,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":9800}
6> {"count":2,"goodsId":"1","goodsName":"小米12","totalMoney":9780}

3.2 双流Join-Interval Join

思路

1、通过keyBy将两个流join到一起

2、interval join需要设置流A去关联哪个时间范围的流B中的元素。此处,我设置的下界为-1、上界为0,且上界是一个开区间。表达的意思就是流A中某个元素的时间,对应上一秒的流B中的元素。

3、process中将两个key一样的元素,关联在一起,并加载到一个新的FactOrderItem对象中


import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author SKY
 * Desc 演示Flink双流Join-IntervalJoin
 */
public class JoinDemo02_IntervalJoin {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        //商品数据流
        DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());
        //订单数据流
        DataStreamSource<OrderItem> OrderItemDS = env.addSource(new OrderItemSource());
        //给数据添加水印(这里简单一点直接使用系统时间作为事件时间)
        SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark());
        SingleOutputStreamOperator<OrderItem> OrderItemDSWithWatermark = OrderItemDS.assignTimestampsAndWatermarks(new OrderItemWatermark());


        //TODO 2.transformation---这里是重点
        //商品类(商品id,商品名称,商品价格)
        //订单明细类(订单id,商品id,商品数量)
        //关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
        SingleOutputStreamOperator<FactOrderItem> resultDS = goodsDSWithWatermark.keyBy(Goods::getGoodsId)
                .intervalJoin(OrderItemDSWithWatermark.keyBy(OrderItem::getGoodsId))
                //join的条件:
                // 条件1.id要相等
                // 条件2. OrderItem的时间戳 - 2 <=Goods的时间戳 <= OrderItem的时间戳 + 1
                .between(Time.seconds(-2), Time.seconds(1))
                //ProcessJoinFunction<IN1, IN2, OUT>
                .process(new ProcessJoinFunction<Goods, OrderItem, FactOrderItem>() {
                    @Override
                    public void processElement(Goods left, OrderItem right, Context ctx, Collector<FactOrderItem> out) throws Exception {
                        FactOrderItem result = new FactOrderItem();
                        result.setGoodsId(left.getGoodsId());
                        result.setGoodsName(left.getGoodsName());
                        result.setCount(new BigDecimal(right.getCount()));
                        result.setTotalMoney(new BigDecimal(right.getCount()).multiply(left.getGoodsPrice()));
                        out.collect(result);
                    }
                });

        //TODO 3.sink
        resultDS.print();

        //TODO 4.execute
        env.execute();
    }
    //商品类(商品id,商品名称,商品价格)
    @Data
    public static class Goods {
        private String goodsId;
        private String goodsName;
        private BigDecimal goodsPrice;
        public static List<Goods> GOODS_LIST;
        public static Random r;

        static  {
            r = new Random();
            GOODS_LIST = new ArrayList<>();
            GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
            GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
            GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
            GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
            GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
            GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
        }
        public static Goods randomGoods() {
            int rIndex = r.nextInt(GOODS_LIST.size());
            return GOODS_LIST.get(rIndex);
        }
        public Goods() {
        }
        public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
            this.goodsId = goodsId;
            this.goodsName = goodsName;
            this.goodsPrice = goodsPrice;
        }
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //订单明细类(订单id,商品id,商品数量)
    @Data
    public static class OrderItem {
        private String itemId;
        private String goodsId;
        private Integer count;
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //商品类(商品id,商品名称,商品价格)
    //订单明细类(订单id,商品id,商品数量)
    //关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
    @Data
    public static class FactOrderItem {
        private String goodsId;
        private String goodsName;
        private BigDecimal count;
        private BigDecimal totalMoney;
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //实时生成商品数据流
    //构建一个商品Stream源(这个好比就是维表)
    public static class GoodsSource extends RichSourceFunction<Goods> {
        private Boolean isCancel;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                TimeUnit.SECONDS.sleep(1);
            }
        }
        @Override
        public void cancel() {
            isCancel = true;
        }
    }
    //实时生成订单数据流
    //构建订单明细Stream源
    public static class OrderItemSource extends RichSourceFunction<OrderItem> {
        private Boolean isCancel;
        private Random r;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
            r = new Random();
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods goods = Goods.randomGoods();
                OrderItem orderItem = new OrderItem();
                orderItem.setGoodsId(goods.getGoodsId());
                orderItem.setCount(r.nextInt(10) + 1);
                orderItem.setItemId(UUID.randomUUID().toString());
                sourceContext.collect(orderItem);
                orderItem.setGoodsId("111");
                sourceContext.collect(orderItem);
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isCancel = true;
        }
    }
    //构建水印分配器,学习测试直接使用系统时间了
    public static class GoodsWatermark implements WatermarkStrategy<Goods> {
        @Override
        public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Goods>() {
                @Override
                public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
    //构建水印分配器,学习测试直接使用系统时间了
    public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
        @Override
        public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<OrderItem>() {
                @Override
                public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
}

结果显示:

6> {"count":3,"goodsId":"1","goodsName":"小米12","totalMoney":14670}
3> {"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}
6> {"count":3,"goodsId":"1","goodsName":"小米12","totalMoney":14670}
1> {"count":9,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":88200}
1> {"count":9,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":88200}
2> {"count":7,"goodsId":"6","goodsName":"Mate 40","totalMoney":45500}
1> {"count":9,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":88200}
2> {"count":7,"goodsId":"6","goodsName":"Mate 40","totalMoney":45500}
4> {"count":3,"goodsId":"3","goodsName":"MacBookPro","totalMoney":45000}
4> {"count":9,"goodsId":"3","goodsName":"MacBookPro","totalMoney":135000}
4> {"count":3,"goodsId":"3","goodsName":"MacBookPro","totalMoney":45000}
2> {"count":4,"goodsId":"6","goodsName":"Mate 40","totalMoney":26000}
4> {"count":9,"goodsId":"3","goodsName":"MacBookPro","totalMoney":135000}
2> {"count":4,"goodsId":"6","goodsName":"Mate 40","totalMoney":26000}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
2> {"count":4,"goodsId":"6","goodsName":"Mate 40","totalMoney":26000}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
3> {"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}
8> {"count":3,"goodsId":"5","goodsName":"MeiZu One","totalMoney":9600}
4> {"count":9,"goodsId":"3","goodsName":"MacBookPro","totalMoney":135000}
3> {"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}

以上是关于Flink之WaterMarker详解的主要内容,如果未能解决你的问题,请参考以下文章

flink核心之watermarker

flink核心之watermarker

flink核心之watermarker

Flink1.6系列之—Flink on yarn流程详解

Flink详解系列之六--窗口机制

Flink1.12之双流Join详解