大数据(9f)Flink窗口函数练习:计算PV和UV

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9f)Flink窗口函数练习:计算PV和UV相关的知识,希望对你有一定的参考价值。

文章目录

概述

需求

计算每小时每个页面的 PVUV

数据样本(页面ID,用户ID,时间戳

"device_id": "d1", "page_id": "p1", "timestamp": 3600
"device_id": "d1", "page_id": "p2", "timestamp": 3601
"device_id": "d2", "page_id": "p2", "timestamp": 3602
"device_id": "d1", "page_id": "p3", "timestamp": 3603
"device_id": "d2", "page_id": "p3", "timestamp": 3604
"device_id": "d3", "page_id": "p3", "timestamp": 3610
"device_id": "d3", "page_id": "p3", "timestamp": 3610

"device_id": "d11", "page_id": "p1", "timestamp": 7200
"device_id": "d11", "page_id": "p2", "timestamp": 7201
"device_id": "d12", "page_id": "p2", "timestamp": 7202
"device_id": "d11", "page_id": "p3", "timestamp": 7203
"device_id": "d12", "page_id": "p3", "timestamp": 7204
"device_id": "d13", "page_id": "p3", "timestamp": 7210
"device_id": "d11", "page_id": "p1", "timestamp": 7210

"device_id": "d1", "page_id": "p1", "timestamp": 10800
"device_id": "d1", "page_id": "p2", "timestamp": 10801
"device_id": "d2", "page_id": "p2", "timestamp": 10802
"device_id": "d1", "page_id": "p3", "timestamp": 10803
"device_id": "d2", "page_id": "p3", "timestamp": 10811
"device_id": "d3", "page_id": "p3", "timestamp": 10811

环境和依赖

本地开发环境:WIN10+JDK1.8+IDEA+Maven3.6.3

<!-- 配置 -->
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.14.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <fastjson.version>2.0.19</fastjson.version>
    <lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- JSON解析 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>$fastjson.version</version>
    </dependency>
    <!-- 简化JavaBean书写 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>$lombok.version</version>
    </dependency>
</dependencies>

PV计算代码

import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Scanner;

public class PvCount 
    public static void main(String[] args) throws Exception 
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
        //加入自定义数据源
        DataStreamSource<String> d = env.addSource(new MySource());
        //水位线策略
        WatermarkStrategy<PV> w = WatermarkStrategy.<PV>forMonotonousTimestamps()
                .withTimestampAssigner((SerializableTimestampAssigner<PV>) (e, recordTimestamp) -> e.timestamp * 1000L);
        //WatermarkStrategy<PV> w = WatermarkStrategy.<PV>forBoundedOutOfOrderness(Duration.ofSeconds(2))
        //        .withTimestampAssigner((SerializableTimestampAssigner<PV>) (e, r) -> e.timestamp * 1000L);
        //########################################## JSON解析 #############################################
        SingleOutputStreamOperator<Tuple2<String, Long>> s = d
                //1、过滤非JSON
                .filter(PvCount::isValidJSON)
                //2、String转JSON
                .map(JSONObject::parseObject)
                //3、JSON转JavaBean
                .map(j -> j.toJavaObject(PV.class))
                //4、过滤空pageId
                .filter(j -> j.getPageId() != null)
                //5、确定水位线
                .assignTimestampsAndWatermarks(w)
                //6、记数
                .map(j -> Tuple2.of(j.getPageId(), 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //######################################## 计算每小时PV ###########################################
        //s.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).sum(1).print("总PV");
        //################################### 计算每小时每个页面PV ##########################################
        s
                //7、按页面ID分区
                .keyBy(t -> t.f0)
                //8、开启1小时的滚动窗口
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .allowedLateness(Time.seconds(1)) //允许迟到
                //9、求和
                .sum(1)
                //10、输出(假设写到数据库)
                .addSink(new RichSinkFunction<Tuple2<String, Long>>() 
                    @Override
                    public void invoke(Tuple2<String, Long> value, Context context) 
                        //允许迟到的情况下,当迟到数据来了后,要求更新数据库
                        long windowEnd = context.timestamp();
                        long windowStart = windowEnd - 3599999;
                        System.out.println(windowStart + "~" + windowEnd + ":" + value);
                    

                    @Override
                    public void open(Configuration parameters) System.out.println("假设创建数据库连接");

                    @Override
                    public void close() System.out.println("假设关闭数据库连接");
                );
        //执行
        env.execute();
    

    /** 判断字符串是否能转换成合法的JSON */
    public static boolean isValidJSON(String string) 
        try 
            JSONObject.parseObject(string);
            return true;
         catch (JSONException e) 
            return false;
        
    

    /** 统计PageView的JavaBean */
    @Data
    @AllArgsConstructor
    public static class PV 
        String deviceId;
        String pageId;
        Long timestamp;
    

    /** 自定义数据源 */
    public static class MySource implements SourceFunction<String> 
        public MySource() 

        @Override
        public void run(SourceContext<String> sc) 
            Scanner scanner = new Scanner(System.in);
            while (true) 
                String str = scanner.nextLine().trim();
                if (str.equals("STOP")) break;
                if (!str.equals("")) sc.collect(str); 
            
            scanner.close();
        

        @Override
        public void cancel() 
    

并行度设1,关闭允许迟到,测试结果如下

UV计算代码

import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.HashSet;
import java.util.Scanner;

public class UvCount 
    public static void main(String[] args) throws Exception 
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
        //加入自定义数据源
        DataStreamSource<String> d = env.addSource(new MySource());
        //水位线策略
        WatermarkStrategy<PV> w = WatermarkStrategy.<PV>forMonotonousTimestamps()
                .withTimestampAssigner((SerializableTimestampAssigner<PV>) (e, recordTimestamp) -> e.timestamp * 1000L);
        //########################################## JSON解析 #############################################
        SingleOutputStreamOperator<PV大数据(9d)Flink流处理核心编程练习-计算PV和UV

大数据(9d)Flink流处理核心编程练习:计算PV和UV

大数据(9f)Flink状态编程

大数据(9f)Flink状态编程

大数据(9f)Flink富函数RichFunction

大数据(9f)Flink富函数RichFunction