Flink的双流join介绍

Posted 月疯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的双流join介绍相关的知识,希望对你有一定的参考价值。

在flink中,双流join主要分为2中类型:window join和Interval join,window join又可以根据窗口的类型分为3中:滚动、滑动、会话窗口的双流join;
window类型的join都是利用window的机制,先将数据缓存在window state中,当窗口出发计算,执行

join:interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据出发数据清理。

通过join算子可以具体实现滚动窗口和滑动窗口、会话窗口:
滚动窗口join:

代码:

package Flink_API;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Properties;

//window join中的join算子:获取每个用户每个时刻的浏览和点击。即浏览和点击都不为空才输出该用户在当前时刻的信息。
//        结论:
//        1、join只返回匹配到的数据对。若在window中没有能够与之匹配的数据,则不会有输出。
//        2、join会输出window中所有的匹配数据对。
//        3、不在window内的数据不会被匹配到。
public class TestJoin 
    public static void main(String[] args) throws Exception 
        //创建运行环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //Flink是以数据自带的时间戳字段为准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置并行度
        env.setParallelism(1);

        //1、获取第一个流,获取用户的浏览信息
        DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
        //2、获取用户的点击信息
        DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);

        //打印结果
        browseStream.print();
        clickStream.print();

        //核心:双流join的逻辑
        //browseStream(左流)关联clickStream(右流)
        browseStream.join(clickStream)
                .where(new KeySelector<UserBrowseLog, String>() 
                    @Override
                    public String getKey(UserBrowseLog userBrowseLog) throws Exception 
                        return userBrowseLog.getUserID()+"_"+userBrowseLog.getEventTime();
                    
                )
                .equalTo(new KeySelector<UserClickLog, String>() 
                    @Override
                    public String getKey(UserClickLog userClickLog) throws Exception 
                        return userClickLog.getUserID()+"_"+userClickLog.getEventTime();
                    
                )
                .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .apply(new JoinFunction<UserBrowseLog, UserClickLog, Object>() 
                    @Override
                    public Object join(UserBrowseLog left, UserClickLog right) throws Exception 
                        System.out.print(left + "<Inner Join>" +right);
                        return left + "<Inner Join>" +right;
                    
                );

        //程序的入口类
        env.execute("TestJoin");

    

    private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env) 
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs","page01:9002");
        consumerProperties.setProperty("grop.id","browsegroup");

        DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

        DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>() 
            @Override
            public void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception 
                try
                    UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s,UserClickLog.class);
                    if(browseLog !=null)
                        collector.collect(browseLog);
                    
                catch(Exception e)
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                
            
        );
        //设置watermark
        return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0))
            @Override
            public long extractTimestamp(UserClickLog userBrowseLog) 
                DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                //用数字表示时间戳,单位是ms,13位
                return dateTime.getMillis();
            
        );
    

    private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) 
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs","page01:9001");
        consumerProperties.setProperty("grop.id","browsegroup");

        DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

        DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() 
            @Override
            public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception 
                try
                    UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s,UserBrowseLog.class);
                    if(browseLog !=null)
                        collector.collect(browseLog);
                    
                catch(Exception e)
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                
            
        );
        //设置watermark
        return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0)) 
            @Override
            public long extractTimestamp(UserBrowseLog userBrowseLog) 
                DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                //用数字表示时间戳,单位是ms,13位
                return dateTime.getMillis();
            
        );
    

    //浏览类
    public static class UserBrowseLog implements Serializable
        private String userID;
        private String eventTime;
        private String eventType;
        private String productID;
        private Integer productPrice;

        public String getUserID() 
            return userID;
        

        public void setUserID(String userID) 
            this.userID = userID;
        

        public String getEventTime() 
            return eventTime;
        

        public void setEventTime(String eventTime) 
            this.eventTime = eventTime;
        

        public String getEventType() 
            return eventType;
        

        public void setEventType(String eventType) 
            this.eventType = eventType;
        

        public String getProductID() 
            return productID;
        

        public void setProductID(String productID) 
            this.productID = productID;
        

        public Integer getProductPrice() 
            return productPrice;
        

        public void setProductPrice(Integer productPrice) 
            this.productPrice = productPrice;
        

        @Override
        public String toString() 
            return "UserBrowseLog" +
                    "userID='" + userID + '\\'' +
                    ", eventTime='" + eventTime + '\\'' +
                    ", eventType='" + eventType + '\\'' +
                    ", productID='" + productID + '\\'' +
                    ", productPrice=" + productPrice +
                    '';
        
    
    //点击类
    public static class UserClickLog implements Serializable
        private String userID;
        private String eventTime;
        private String eventType;
        private String pageID;

        public String getUserID() 
            return userID;
        

        public void setUserID(String userID) 
            this.userID = userID;
        

        public String getEventTime() 
            return eventTime;
        

        public void setEventTime(String eventTime) 
            this.eventTime = eventTime;
        

        public String getEventType() 
            return eventType;
        

        public void setEventType(String eventType) 
            this.eventType = eventType;
        

        public String getPageID() 
            return pageID;
        

        public void setPageID(String pageID) 
            this.pageID = pageID;
        

        @Override
        public String toString() 
            return "UserClickLog" +
                    "userID='" + userID + '\\'' +
                    ", eventTime='" + eventTime + '\\'' +
                    ", eventType='" + eventType + '\\'' +
                    ", pageID='" + pageID + '\\'' +
                    '';
        
    

 maven:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>groupId</groupId>
    <artifactId>Flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <flink.version>1.9.0</flink.version>
            <java.version>1.8</java.version>
            <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
<!--https://mvnrepository.com/artifact/org.apache.flink/flink-java-->
        <!--<dependency>-->
            <!--<groupId>org.apache.flink</groupId>-->
            <!--<artifactId>flink-java</artifactId>-->
            <!--<version>1.7.2</version>-->
            <!--&lt;!&ndash;下面命令provided只有在编译的时候才会生效,运行和打包的时候不使用&ndash;&gt;-->
            <!--&lt;!&ndash;<scope>provided</scope>&ndash;&gt;-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
        </dependency>
<!--https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
    </dependencies>
        <build>
            <pluginManagement>
                <plugins>
                    <!--java编译的插件-->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.6.0</version>
                        <configuration>
                            <!--指定JDK的编译版本-->
                            <source>$java.version</source>
                            <target>$java.version</target>
                            <encoding>UTF-8</encoding>
                        </configuration>
                    </plugin>
                    <!--打jar包插件(会包含所有依赖)-->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-assembly-plugin</artifactId>
                        <version>2.6</version>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                            <archive>
                                <manifest>
                                    <!--可以设置jar包的入口类(可选)-->
                                    <mainClass>Flink_Stream.FlinkStream</mainClass>
                                </manifest>
                            </archive>
                        </configuration>
                        <executions>
                            <execution>
                                <id>make-assembly</id>
                                <phase>package</phase>
                                <goals>
                                    <goal>single</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>

                </plugins>
            </pluginManagement>
        </build>
</project>

以上是关于Flink的双流join介绍的主要内容,如果未能解决你的问题,请参考以下文章

2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join

大数据(9f)Flink双流JOIN

大数据(9f)Flink双流JOIN

flink双流join

面试官: Flink双流JOIN了解吗? 简单说说其实现原理

Flink DataStream 如何实现双流 Join