Flink的ConGroup算子介绍

Posted 月疯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的ConGroup算子介绍相关的知识,希望对你有一定的参考价值。

ConGroup是Join的底层算子,就是Join算子也是通过CoGroup算子来实现的。

CoCgoup是在同一个窗口当中对同一个key上的俩组集合进行操作,比Join算子更通用,可以实现Inner Join、LeftJoin、RightJoin的效果,CoGroup的作用基本和Join基本相同,但是有一点不一样的是,如果未能找到新来的数据与另一个流在window中存在的匹配数据,仍可将该条记录进行输出,该算子只能在window中使用,但是就Inner Join而言推荐使用Join,因为Join在策略上做了优化,更高效。

场景:获取每个用户每个时刻的浏览和点击,模拟inner、left、right、join的功能。
结论:若在window中没有能够与之匹配的数据,CoGroup也会输出结果

来一个demo:

package Flink_API;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Properties;

public class TestConectGroup 

    public static void main(String[] args) throws Exception 
        //创建运行环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //Flink是以数据自带的时间戳字段为准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置并行度
        env.setParallelism(1);

        //1、获取第一个流,获取用户的浏览信息
        DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
        //2、获取用户的点击信息
        DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);

        //打印结果
        browseStream.print();
        clickStream.print();

        //核心:通过CoGroup来实现三个Join作用
        //browseStream(左流)关联clickStream(右流)
        browseStream.coGroup(clickStream)
                .where(new KeySelector<UserBrowseLog, String>() 
                    @Override
                    public String getKey(UserBrowseLog userBrowseLog) throws Exception 
                        return userBrowseLog.getUserID()+"_"+userBrowseLog.getEventTime();
                    
                )
                .equalTo(new KeySelector<UserClickLog, String>() 
                    @Override
                    public String getKey(UserClickLog userClickLog) throws Exception 
                        return userClickLog.getUserID()+"_"+userClickLog.getEventTime();
                    
                )
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))//滚动窗口
                .apply(new InnerJoin());

        env.execute("TestConectGroup");
    
    //通过CoGroup模拟Inner Join的功能:获取每个用户每个时刻的浏览和点击,即浏览和点击不为才输出。
    public static class InnerJoin implements CoGroupFunction<UserBrowseLog,UserClickLog,String> 

        @Override
        public void coGroup(Iterable<UserBrowseLog> left, Iterable<UserClickLog> right, Collector<String> collector) throws Exception 
            //俩个key相同的时候才输出
            for (UserBrowseLog userBrowseLog:left)
                        for(UserClickLog clickLog:right)
                            collector.collect(userBrowseLog+"<Inner Join>"+clickLog);
                        
                    
        
    
    //通过CoGroup模拟Left Join的功能:获取每个用户每个时刻的浏览信息,有点击顺带输出,每点击则不输出
    public static class LeftJoinFunction implements CoGroupFunction<UserBrowseLog,UserClickLog,String>

        @Override
        public void coGroup(Iterable<UserBrowseLog> left, Iterable<UserClickLog> right, Collector<String> collector) throws Exception 
            for(UserBrowseLog userBrowseLog:left)
                boolean noElements =true;
                for(UserClickLog userClickLog:right)
                    noElements=false;
                    collector.collect(userBrowseLog+"<Left Join>"+userClickLog);
                
                if(noElements)
                    collector.collect(userBrowseLog +"<Left Join>"+"null");
                
            

        
    
    //通过CoGroup模拟Right Join的功能:获取每个用户每个时刻的浏览信息,有点击顺带输出,每点击则不输出
    public static class RightJoinFunction implements CoGroupFunction<UserBrowseLog,UserClickLog,String>

        @Override
        public void coGroup(Iterable<UserBrowseLog> left, Iterable<UserClickLog> right, Collector<String> collector) throws Exception 
               for(UserClickLog  userClickLog:right)
                   boolean noElement = true;
                   for(UserBrowseLog userBrowseLog:left)
                       noElement =false;
                       collector.collect(userBrowseLog+"<right Join>"+userClickLog);
                   
                   if(noElement)
                       collector.collect("null"+"<right Join>"+userClickLog);
                   
               
        
    

    private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env) 
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs","page01:9002");
        consumerProperties.setProperty("grop.id","browsegroup");

        DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

        DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>() 
            @Override
            public void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception 
                try
                    UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserClickLog.class);
                    if(browseLog !=null)
                        collector.collect(browseLog);
                    
                catch(Exception e)
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                
            
        );
        //设置watermark
        return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0))
            @Override
            public long extractTimestamp(UserClickLog userBrowseLog) 
                DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                //用数字表示时间戳,单位是ms,13位
                return dateTime.getMillis();
            
        );
    

    private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) 
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs","page01:9001");
        consumerProperties.setProperty("grop.id","browsegroup");

        DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

        DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() 
            @Override
            public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception 
                try
                    UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                    if(browseLog !=null)
                        collector.collect(browseLog);
                    
                catch(Exception e)
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                
            
        );
        //设置watermark
        return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0)) 
            @Override
            public long extractTimestamp(UserBrowseLog userBrowseLog) 
                DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                //用数字表示时间戳,单位是ms,13位
                return dateTime.getMillis();
            
        );
    

    //浏览类
    public static class UserBrowseLog implements Serializable 
        private String userID;
        private String eventTime;
        private String eventType;
        private String productID;
        private Integer productPrice;

        public String getUserID() 
            return userID;
        

        public void setUserID(String userID) 
            this.userID = userID;
        

        public String getEventTime() 
            return eventTime;
        

        public void setEventTime(String eventTime) 
            this.eventTime = eventTime;
        

        public String getEventType() 
            return eventType;
        

        public void setEventType(String eventType) 
            this.eventType = eventType;
        

        public String getProductID() 
            return productID;
        

        public void setProductID(String productID) 
            this.productID = productID;
        

        public Integer getProductPrice() 
            return productPrice;
        

        public void setProductPrice(Integer productPrice) 
            this.productPrice = productPrice;
        

        @Override
        public String toString() 
            return "UserBrowseLog" +
                    "userID='" + userID + '\\'' +
                    ", eventTime='" + eventTime + '\\'' +
                    ", eventType='" + eventType + '\\'' +
                    ", productID='" + productID + '\\'' +
                    ", productPrice=" + productPrice +
                    '';
        
    
    //点击类
    public static class UserClickLog implements Serializable
        private String userID;
        private String eventTime;
        private String eventType;
        private String pageID;

        public String getUserID() 
            return userID;
        

        public void setUserID(String userID) 
            this.userID = userID;
        

        public String getEventTime() 
            return eventTime;
        

        public void setEventTime(String eventTime) 
            this.eventTime = eventTime;
        

        public String getEventType() 
            return eventType;
        

        public void setEventType(String eventType) 
            this.eventType = eventType;
        

        public String getPageID() 
            return pageID;
        

        public void setPageID(String pageID) 
            this.pageID = pageID;
        

        @Override
        public String toString() 
            return "UserClickLog" +
                    "userID='" + userID + '\\'' +
                    ", eventTime='" + eventTime + '\\'' +
                    ", eventType='" + eventType + '\\'' +
                    ", pageID='" + pageID + '\\'' +
                    '';
        
    

以上是关于Flink的ConGroup算子介绍的主要内容,如果未能解决你的问题,请参考以下文章

《Flink应用实战》--合并流-Union算子

影响 Flink 有状态函数和算子性能的 3 个重要因素

影响 Flink 有状态函数和算子性能的 3 个重要因素

异步屏障快照ABS

异步屏障快照ABS

Flink基础篇,基本概念设计理念架构模型编程模型常用算子