Flink的双流join介绍
Posted 月疯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的双流join介绍相关的知识,希望对你有一定的参考价值。
在flink中,双流join主要分为2中类型:window join和Interval join,window join又可以根据窗口的类型分为3中:滚动、滑动、会话窗口的双流join;
window类型的join都是利用window的机制,先将数据缓存在window state中,当窗口出发计算,执行
join:interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据出发数据清理。
通过join算子可以具体实现滚动窗口和滑动窗口、会话窗口:
滚动窗口join:
代码:
package Flink_API;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Properties;
//window join中的join算子:获取每个用户每个时刻的浏览和点击。即浏览和点击都不为空才输出该用户在当前时刻的信息。
// 结论:
// 1、join只返回匹配到的数据对。若在window中没有能够与之匹配的数据,则不会有输出。
// 2、join会输出window中所有的匹配数据对。
// 3、不在window内的数据不会被匹配到。
public class TestJoin
public static void main(String[] args) throws Exception
//创建运行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//Flink是以数据自带的时间戳字段为准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置并行度
env.setParallelism(1);
//1、获取第一个流,获取用户的浏览信息
DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
//2、获取用户的点击信息
DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);
//打印结果
browseStream.print();
clickStream.print();
//核心:双流join的逻辑
//browseStream(左流)关联clickStream(右流)
browseStream.join(clickStream)
.where(new KeySelector<UserBrowseLog, String>()
@Override
public String getKey(UserBrowseLog userBrowseLog) throws Exception
return userBrowseLog.getUserID()+"_"+userBrowseLog.getEventTime();
)
.equalTo(new KeySelector<UserClickLog, String>()
@Override
public String getKey(UserClickLog userClickLog) throws Exception
return userClickLog.getUserID()+"_"+userClickLog.getEventTime();
)
.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
.apply(new JoinFunction<UserBrowseLog, UserClickLog, Object>()
@Override
public Object join(UserBrowseLog left, UserClickLog right) throws Exception
System.out.print(left + "<Inner Join>" +right);
return left + "<Inner Join>" +right;
);
//程序的入口类
env.execute("TestJoin");
private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env)
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.severs","page01:9002");
consumerProperties.setProperty("grop.id","browsegroup");
DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>()
@Override
public void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception
try
UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s,UserClickLog.class);
if(browseLog !=null)
collector.collect(browseLog);
catch(Exception e)
System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
);
//设置watermark
return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0))
@Override
public long extractTimestamp(UserClickLog userBrowseLog)
DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
//用数字表示时间戳,单位是ms,13位
return dateTime.getMillis();
);
private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env)
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.severs","page01:9001");
consumerProperties.setProperty("grop.id","browsegroup");
DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>()
@Override
public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception
try
UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s,UserBrowseLog.class);
if(browseLog !=null)
collector.collect(browseLog);
catch(Exception e)
System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
);
//设置watermark
return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0))
@Override
public long extractTimestamp(UserBrowseLog userBrowseLog)
DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
//用数字表示时间戳,单位是ms,13位
return dateTime.getMillis();
);
//浏览类
public static class UserBrowseLog implements Serializable
private String userID;
private String eventTime;
private String eventType;
private String productID;
private Integer productPrice;
public String getUserID()
return userID;
public void setUserID(String userID)
this.userID = userID;
public String getEventTime()
return eventTime;
public void setEventTime(String eventTime)
this.eventTime = eventTime;
public String getEventType()
return eventType;
public void setEventType(String eventType)
this.eventType = eventType;
public String getProductID()
return productID;
public void setProductID(String productID)
this.productID = productID;
public Integer getProductPrice()
return productPrice;
public void setProductPrice(Integer productPrice)
this.productPrice = productPrice;
@Override
public String toString()
return "UserBrowseLog" +
"userID='" + userID + '\\'' +
", eventTime='" + eventTime + '\\'' +
", eventType='" + eventType + '\\'' +
", productID='" + productID + '\\'' +
", productPrice=" + productPrice +
'';
//点击类
public static class UserClickLog implements Serializable
private String userID;
private String eventTime;
private String eventType;
private String pageID;
public String getUserID()
return userID;
public void setUserID(String userID)
this.userID = userID;
public String getEventTime()
return eventTime;
public void setEventTime(String eventTime)
this.eventTime = eventTime;
public String getEventType()
return eventType;
public void setEventType(String eventType)
this.eventType = eventType;
public String getPageID()
return pageID;
public void setPageID(String pageID)
this.pageID = pageID;
@Override
public String toString()
return "UserClickLog" +
"userID='" + userID + '\\'' +
", eventTime='" + eventTime + '\\'' +
", eventType='" + eventType + '\\'' +
", pageID='" + pageID + '\\'' +
'';
maven:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>groupId</groupId>
<artifactId>Flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.flink/flink-java-->
<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-java</artifactId>-->
<!--<version>1.7.2</version>-->
<!--<!–下面命令provided只有在编译的时候才会生效,运行和打包的时候不使用–>-->
<!--<!–<scope>provided</scope>–>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!--java编译的插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<!--指定JDK的编译版本-->
<source>$java.version</source>
<target>$java.version</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!--打jar包插件(会包含所有依赖)-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!--可以设置jar包的入口类(可选)-->
<mainClass>Flink_Stream.FlinkStream</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
以上是关于Flink的双流join介绍的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Flink(四十五):扩展阅读 双流Join