Flink的Union算子和Connect算子,流合并
Posted 月疯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的Union算子和Connect算子,流合并相关的知识,希望对你有一定的参考价值。
Union算子
合并多个流,新的流会包含所有流当中的数据,但是Union有一个限制,就是所有合并的流的数据类型必须是一直的,该算子总额和配合窗口使用。该算子基本和Join没没什么关系。
package Flink_API;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Properties;
public class TestUnion
public static void main(String[] args) throws Exception
//创建运行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//Flink是以数据自带的时间戳字段为准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置并行度
env.setParallelism(1);
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.severs","page01:9001");
consumerProperties.setProperty("grop.id","browsegroup");
DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>()
@Override
public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception
try
UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
if(browseLog !=null)
collector.collect(browseLog);
catch(Exception e)
System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
).setParallelism(2);
DataStream<UserBrowseLog> processData1=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>()
@Override
public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception
try
UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
if(browseLog !=null)
collector.collect(browseLog);
catch(Exception e)
System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
).setParallelism(2);
//Union可以合并多个流
DataStream<UserBrowseLog> unionStream = processData.union(processData1);
unionStream.map(new MapFunction<UserBrowseLog, String>()
@Override
public String map(UserBrowseLog userBrowseLog) throws Exception
return userBrowseLog.getUserID();
);
//程序的入口类
env.execute("TestJoin");
//浏览类
public static class UserBrowseLog implements Serializable
private String userID;
private String eventTime;
private String eventType;
private String productID;
private Integer productPrice;
public String getUserID()
return userID;
public void setUserID(String userID)
this.userID = userID;
public String getEventTime()
return eventTime;
public void setEventTime(String eventTime)
this.eventTime = eventTime;
public String getEventType()
return eventType;
public void setEventType(String eventType)
this.eventType = eventType;
public String getProductID()
return productID;
public void setProductID(String productID)
this.productID = productID;
public Integer getProductPrice()
return productPrice;
public void setProductPrice(Integer productPrice)
this.productPrice = productPrice;
@Override
public String toString()
return "UserBrowseLog" +
"userID='" + userID + '\\'' +
", eventTime='" + eventTime + '\\'' +
", eventType='" + eventType + '\\'' +
", productID='" + productID + '\\'' +
", productPrice=" + productPrice +
'';
Connect算子:
和Union类似,但是只能连接2个流,2个流的数据类型可以不同,会对2个流当中的数据应用不同的处理方法,ConnectedStream<T,R>中T代表第一个数据流中的数据类型,R代表第二个数据流中的数据类型。
注意:该算子本质上和Join没有事很么关系,connect算子一般和广播流BroadcastStream(配置使用)。
package Flink_API;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Properties;
public class TextConnect
public static void main(String[] args) throws Exception
//创建运行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//Flink是以数据自带的时间戳字段为准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置并行度
env.setParallelism(1);
//1、获取第一个流,获取用户的浏览信息
DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
//2、获取用户的点击信息
DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);
//打印结果
browseStream.print();
clickStream.print();
//进行双流Connect操作
ConnectedStreams<UserBrowseLog,UserClickLog> connectStream = browseStream.connect(clickStream);
DataStream<String> resData = connectStream.map(new CoMapFunction<UserBrowseLog, UserClickLog, String>()
@Override
public String map1(UserBrowseLog userBrowseLog) throws Exception
return userBrowseLog.getProductID();
@Override
public String map2(UserClickLog userClickLog) throws Exception
return userClickLog.getUserID();
);
env.execute("TextConnect");
private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env)
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.severs","page01:9002");
consumerProperties.setProperty("grop.id","browsegroup");
DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>()
@Override
public void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception
try
UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserClickLog.class);
if(browseLog !=null)
collector.collect(browseLog);
catch(Exception e)
System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
);
//设置watermark
return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0))
@Override
public long extractTimestamp(UserClickLog userBrowseLog)
DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
//用数字表示时间戳,单位是ms,13位
return dateTime.getMillis();
);
private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env)
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.severs","page01:9001");
consumerProperties.setProperty("grop.id","browsegroup");
DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>()
@Override
public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception
try
UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
if(browseLog !=null)
collector.collect(browseLog);
catch(Exception e)
System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
);
//设置watermark
return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0))
@Override
public long extractTimestamp(UserBrowseLog userBrowseLog)
DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
//用数字表示时间戳,单位是ms,13位
return dateTime.getMillis();
);
//浏览类
public static class UserBrowseLog implements Serializable
private String userID;
private String eventTime;
private String eventType;
private String productID;
private Integer productPrice;
public String getUserID()
return userID;
public void setUserID(String userID)
this.userID = userID;
public String getEventTime()
return eventTime;
public void setEventTime(String eventTime)
this.eventTime = eventTime;
public String getEventType()
return eventType;
public void setEventType(String eventType)
this.eventType = eventType;
public String getProductID()
return productID;
public void setProductID(String productID)
this.productID = productID;
public Integer getProductPrice()
return productPrice;
public void setProductPrice(Integer productPrice)
this.productPrice = productPrice;
@Override
public String toString()
return "UserBrowseLog" +
"userID='" + userID + '\\'' +
", eventTime='" + eventTime + '\\'' +
", eventType='" + eventType + '\\'' +
", productID='" + productID + '\\'' +
", productPrice=" + productPrice +
'';
//点击类
public static class UserClickLog implements Serializable
private String userID;
private String eventTime;
private String eventType;
private String pageID;
public String getUserID()
return userID;
public void setUserID(String userID)
this.userID = userID;
public String getEventTime()
return eventTime;
public void setEventTime(String eventTime)
this.eventTime = eventTime;
public String getEventType()
return eventType;
public void setEventType(String eventType)
this.eventType = eventType;
public String getPageID()
return pageID;
public void setPageID(String pageID)
this.pageID = pageID;
@Override
public String toString()
return "UserClickLog" +
"userID='" + userID + '\\'' +
", eventTime='" + eventTime + '\\'' +
", eventType='" + eventType + '\\'' +
", pageID='" + pageID + '\\'' +
'';
以上是关于Flink的Union算子和Connect算子,流合并的主要内容,如果未能解决你的问题,请参考以下文章
08-flink-1.10.1- flink Transform api 转换算子