Flink之IntervalJoin介绍

Posted 月疯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink之IntervalJoin介绍相关的知识,希望对你有一定的参考价值。

InterValJoin算子
间隔流,一条流去join另一条流去过去一段时间内的数据,该算子将keyedStream与keyedStream转化为DataStream;再给定的时间边界内(默认包含边界),相当于一个窗口,按指定的key对俩个KeyedStream进行Join操作,把符合join条件的俩个event拉倒一起,然后咋么处理右用户来决定。
1、key1 == key2 && e1.timestamp +lowerBound <= e2.timestamp +upperBound
2、场景:把一定时间范围内相关的分组数据拉成一个宽表

语法规则:

leftKeyedStream
.intervalJoin(rightKeyedStream)
//时间间隔,设定下界和上界
.between(Time.minutes(-10),Time.seconds(0))
//不包含下界
.lowerBoundExclusive()
//不包含上界
.upperBoundExclusive()
//自定义ProcessJoinFunction 处理join到的元组
.process(ProcessJoinFunction) 

该算子的注意事项:
1、俩条流都缓存在内部state中。leftElement到达,去获取State中rightElement响应时间范围内的数据,然后执行ProcessJoinFunciton进行Join操作;
2、时间间隔:leftElement默认和【leftElementEventTime + lowerBound,leftElementEventTime +upperBound】时间范围内的rightElement join;
3、举例:leftElementEventTime = 2019-11-16 17:30:00,lowerBound=-10minute,upperBound=0,则这条leftElement按Key和【2019-11-16 17:20:00,2019-11-16 17:30:00】时间范围内的rightElementJoin;
4、IntervalJoin目前只支持EventTime;
5、数据量比较大,可能使用RocksDBStateBackend

demo案列:

package Flink_API;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Properties;

public class TestInterViewJoin 

        public static void main(String[] args) throws Exception 
            //创建运行环境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink是以数据自带的时间戳字段为准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //设置并行度
            env.setParallelism(1);

            //1、获取第一个流,获取用户的浏览信息
            DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
            //2、获取用户的点击信息
            DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);

            //打印结果
            browseStream.print();
            clickStream.print();

            //核心:双流进行IntervalJoin操作:每个用户的点击信息Join这个用户最近10分钟内的浏览信息
            //browseStream(左流)关联clickStream(右流)
            KeyedStream<UserClickLog,String> userClickLogStringKeyedStream = clickStream.keyBy(new KeySelector<UserClickLog,String>()

                @Override
                public String getKey(UserClickLog userClickLog) throws Exception 
                    return userClickLog.userID;
                
            );
            KeyedStream<UserBrowseLog,String> userBrowseLogStringKeyedStream1=browseStream.keyBy(new KeySelector<UserBrowseLog,String>()
                @Override
                public String getKey(UserBrowseLog userBrowseLog) throws Exception 
                    return userBrowseLog.userID;
                
            );
            //每个用户的点击Join这个用户最近的10分钟内的浏览
            DataStream<String> processData = userClickLogStringKeyedStream.intervalJoin(userBrowseLogStringKeyedStream1)
                    .between(Time.minutes(-10),Time.seconds(0))//下界:10分钟,上界:当前EventTime时刻(左流去右流10分钟之前去找数据)
                    .process(new ProcessJoinFunction<UserClickLog, UserBrowseLog, String>() 
                        //leftElement到达,去获取State中rightElement响应范围内的数据,然后执行ProcessJoinFunction进行Join操作:
                        @Override
                        public void processElement(UserClickLog left, UserBrowseLog right, Context context, Collector<String> collector) throws Exception 
                            collector.collect(left+"<IntevalJoin>"+right);
                        
                    );
            processData.print();

            //程序的入口类
            env.execute("TestInterViewJoin");

        

        private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env) 
            Properties consumerProperties = new Properties();
            consumerProperties.setProperty("bootstrap.severs","page01:9002");
            consumerProperties.setProperty("grop.id","browsegroup");

            DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

            DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>() 
                @Override
                public void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception 
                    try
                        UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserClickLog.class);
                        if(browseLog !=null)
                            collector.collect(browseLog);
                        
                    catch(Exception e)
                        System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                    
                
            );
            //设置watermark
            return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0))
                @Override
                public long extractTimestamp(UserClickLog userBrowseLog) 
                    DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                    DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                    //用数字表示时间戳,单位是ms,13位
                    return dateTime.getMillis();
                
            );
        

        private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) 
            Properties consumerProperties = new Properties();
            consumerProperties.setProperty("bootstrap.severs","page01:9001");
            consumerProperties.setProperty("grop.id","browsegroup");

            DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

            DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() 
                @Override
                public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception 
                    try
                        UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                        if(browseLog !=null)
                            collector.collect(browseLog);
                        
                    catch(Exception e)
                        System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                    
                
            );
            //设置watermark
            return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0)) 
                @Override
                public long extractTimestamp(UserBrowseLog userBrowseLog) 
                    DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                    DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                    //用数字表示时间戳,单位是ms,13位
                    return dateTime.getMillis();
                
            );
        

        //浏览类
        public static class UserBrowseLog implements Serializable 
            private String userID;
            private String eventTime;
            private String eventType;
            private String productID;
            private Integer productPrice;

            public String getUserID() 
                return userID;
            

            public void setUserID(String userID) 
                this.userID = userID;
            

            public String getEventTime() 
                return eventTime;
            

            public void setEventTime(String eventTime) 
                this.eventTime = eventTime;
            

            public String getEventType() 
                return eventType;
            

            public void setEventType(String eventType) 
                this.eventType = eventType;
            

            public String getProductID() 
                return productID;
            

            public void setProductID(String productID) 
                this.productID = productID;
            

            public Integer getProductPrice() 
                return productPrice;
            

            public void setProductPrice(Integer productPrice) 
                this.productPrice = productPrice;
            

            @Override
            public String toString() 
                return "UserBrowseLog" +
                        "userID='" + userID + '\\'' +
                        ", eventTime='" + eventTime + '\\'' +
                        ", eventType='" + eventType + '\\'' +
                        ", productID='" + productID + '\\'' +
                        ", productPrice=" + productPrice +
                        '';
            
        
        //点击类
        public static class UserClickLog implements Serializable
            private String userID;
            private String eventTime;
            private String eventType;
            private String pageID;

            public String getUserID() 
                return userID;
            

            public void setUserID(String userID) 
                this.userID = userID;
            

            public String getEventTime() 
                return eventTime;
            

            public void setEventTime(String eventTime) 
                this.eventTime = eventTime;
            

            public String getEventType() 
                return eventType;
            

            public void setEventType(String eventType) 
                this.eventType = eventType;
            

            public String getPageID() 
                return pageID;
            

            public void setPageID(String pageID) 
                this.pageID = pageID;
            

            @Override
            public String toString() 
                return "UserClickLog" +
                        "userID='" + userID + '\\'' +
                        ", eventTime='" + eventTime + '\\'' +
                        ", eventType='" + eventType + '\\'' +
                        ", pageID='" + pageID + '\\'' +
                        '';
            
        


以上是关于Flink之IntervalJoin介绍的主要内容,如果未能解决你的问题,请参考以下文章

20.Flink高级特性--新特性--双流Joinjoin的分类API代码演示-WindowJoin代码演示-IntervalJoin

20.Flink高级特性--新特性--双流Joinjoin的分类API代码演示-WindowJoin代码演示-IntervalJoin

Flink Interval Join,Temporal Join,Lookup Join区别

Flink Interval Join,Temporal Join,Lookup Join区别

Flink Interval Join,Temporal Join,Lookup Join区别

Flink1.12之双流Join详解