Flink的Union算子和Connect算子,流合并

Posted 月疯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的Union算子和Connect算子,流合并相关的知识,希望对你有一定的参考价值。

Union算子
合并多个流,新的流会包含所有流当中的数据,但是Union有一个限制,就是所有合并的流的数据类型必须是一直的,该算子总额和配合窗口使用。该算子基本和Join没没什么关系。

package Flink_API;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Properties;

public class TestUnion 
    public static void main(String[] args) throws Exception 
        //创建运行环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //Flink是以数据自带的时间戳字段为准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置并行度
        env.setParallelism(1);

        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs","page01:9001");
        consumerProperties.setProperty("grop.id","browsegroup");

        DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

        DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() 
            @Override
            public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception 
                try
                   UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                    if(browseLog !=null)
                        collector.collect(browseLog);
                    
                catch(Exception e)
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                
            
        ).setParallelism(2);
        DataStream<UserBrowseLog> processData1=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() 
            @Override
            public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception 
                try
                    UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                    if(browseLog !=null)
                        collector.collect(browseLog);
                    
                catch(Exception e)
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                
            
        ).setParallelism(2);

        //Union可以合并多个流
        DataStream<UserBrowseLog> unionStream = processData.union(processData1);
        unionStream.map(new MapFunction<UserBrowseLog, String>() 
            @Override
            public String map(UserBrowseLog userBrowseLog) throws Exception 
                return userBrowseLog.getUserID();
            
        );
        //程序的入口类
        env.execute("TestJoin");

    


    //浏览类
    public static class UserBrowseLog implements Serializable 
        private String userID;
        private String eventTime;
        private String eventType;
        private String productID;
        private Integer productPrice;

        public String getUserID() 
            return userID;
        

        public void setUserID(String userID) 
            this.userID = userID;
        

        public String getEventTime() 
            return eventTime;
        

        public void setEventTime(String eventTime) 
            this.eventTime = eventTime;
        

        public String getEventType() 
            return eventType;
        

        public void setEventType(String eventType) 
            this.eventType = eventType;
        

        public String getProductID() 
            return productID;
        

        public void setProductID(String productID) 
            this.productID = productID;
        

        public Integer getProductPrice() 
            return productPrice;
        

        public void setProductPrice(Integer productPrice) 
            this.productPrice = productPrice;
        

        @Override
        public String toString() 
            return "UserBrowseLog" +
                    "userID='" + userID + '\\'' +
                    ", eventTime='" + eventTime + '\\'' +
                    ", eventType='" + eventType + '\\'' +
                    ", productID='" + productID + '\\'' +
                    ", productPrice=" + productPrice +
                    '';
        
    

Connect算子:

和Union类似,但是只能连接2个流,2个流的数据类型可以不同,会对2个流当中的数据应用不同的处理方法,ConnectedStream<T,R>中T代表第一个数据流中的数据类型,R代表第二个数据流中的数据类型。

注意:该算子本质上和Join没有事很么关系,connect算子一般和广播流BroadcastStream(配置使用)。

package Flink_API;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Properties;

public class TextConnect 

        public static void main(String[] args) throws Exception 
            //创建运行环境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink是以数据自带的时间戳字段为准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //设置并行度
            env.setParallelism(1);
           //1、获取第一个流,获取用户的浏览信息
            DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
            //2、获取用户的点击信息
            DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);

            //打印结果
            browseStream.print();
            clickStream.print();
            //进行双流Connect操作
            ConnectedStreams<UserBrowseLog,UserClickLog> connectStream = browseStream.connect(clickStream);
            DataStream<String> resData = connectStream.map(new CoMapFunction<UserBrowseLog, UserClickLog, String>() 
                @Override
                public String map1(UserBrowseLog userBrowseLog) throws Exception 
                    return userBrowseLog.getProductID();
                

                @Override
                public String map2(UserClickLog userClickLog) throws Exception 
                    return userClickLog.getUserID();
                
            );
            env.execute("TextConnect");
        


    private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env) 
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs","page01:9002");
        consumerProperties.setProperty("grop.id","browsegroup");

        DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

        DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>() 
            @Override
            public void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception 
                try
                    UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserClickLog.class);
                    if(browseLog !=null)
                        collector.collect(browseLog);
                    
                catch(Exception e)
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                
            
        );
        //设置watermark
        return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0))
            @Override
            public long extractTimestamp(UserClickLog userBrowseLog) 
                DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                //用数字表示时间戳,单位是ms,13位
                return dateTime.getMillis();
            
        );
    

    private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) 
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs","page01:9001");
        consumerProperties.setProperty("grop.id","browsegroup");

        DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

        DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() 
            @Override
            public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception 
                try
                    UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                    if(browseLog !=null)
                        collector.collect(browseLog);
                    
                catch(Exception e)
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                
            
        );
        //设置watermark
        return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0)) 
            @Override
            public long extractTimestamp(UserBrowseLog userBrowseLog) 
                DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                //用数字表示时间戳,单位是ms,13位
                return dateTime.getMillis();
            
        );
    

    //浏览类
    public static class UserBrowseLog implements Serializable
        private String userID;
        private String eventTime;
        private String eventType;
        private String productID;
        private Integer productPrice;

        public String getUserID() 
            return userID;
        

        public void setUserID(String userID) 
            this.userID = userID;
        

        public String getEventTime() 
            return eventTime;
        

        public void setEventTime(String eventTime) 
            this.eventTime = eventTime;
        

        public String getEventType() 
            return eventType;
        

        public void setEventType(String eventType) 
            this.eventType = eventType;
        

        public String getProductID() 
            return productID;
        

        public void setProductID(String productID) 
            this.productID = productID;
        

        public Integer getProductPrice() 
            return productPrice;
        

        public void setProductPrice(Integer productPrice) 
            this.productPrice = productPrice;
        

        @Override
        public String toString() 
            return "UserBrowseLog" +
                    "userID='" + userID + '\\'' +
                    ", eventTime='" + eventTime + '\\'' +
                    ", eventType='" + eventType + '\\'' +
                    ", productID='" + productID + '\\'' +
                    ", productPrice=" + productPrice +
                    '';
        
    
    //点击类
    public static class UserClickLog implements Serializable
        private String userID;
        private String eventTime;
        private String eventType;
        private String pageID;

        public String getUserID() 
            return userID;
        

        public void setUserID(String userID) 
            this.userID = userID;
        

        public String getEventTime() 
            return eventTime;
        

        public void setEventTime(String eventTime) 
            this.eventTime = eventTime;
        

        public String getEventType() 
            return eventType;
        

        public void setEventType(String eventType) 
            this.eventType = eventType;
        

        public String getPageID() 
            return pageID;
        

        public void setPageID(String pageID) 
            this.pageID = pageID;
        

        @Override
        public String toString() 
            return "UserClickLog" +
                    "userID='" + userID + '\\'' +
                    ", eventTime='" + eventTime + '\\'' +
                    ", eventType='" + eventType + '\\'' +
                    ", pageID='" + pageID + '\\'' +
                    '';
        
    
    

以上是关于Flink的Union算子和Connect算子,流合并的主要内容,如果未能解决你的问题,请参考以下文章

flink数据流转换算子

08-flink-1.10.1- flink Transform api 转换算子

08-flink-1.10.1- flink Transform api 转换算子

Flink中的算子操作

Flink单流算子

Flink流处理之窗口算子分析