大数据(9f)Flink窗口函数练习:计算PV和UV
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9f)Flink窗口函数练习:计算PV和UV相关的知识,希望对你有一定的参考价值。
文章目录
概述
需求
计算每小时每个页面的 PV 和 UV
数据样本(页面ID,用户ID,时间戳)
"device_id": "d1", "page_id": "p1", "timestamp": 3600
"device_id": "d1", "page_id": "p2", "timestamp": 3601
"device_id": "d2", "page_id": "p2", "timestamp": 3602
"device_id": "d1", "page_id": "p3", "timestamp": 3603
"device_id": "d2", "page_id": "p3", "timestamp": 3604
"device_id": "d3", "page_id": "p3", "timestamp": 3610
"device_id": "d3", "page_id": "p3", "timestamp": 3610
"device_id": "d11", "page_id": "p1", "timestamp": 7200
"device_id": "d11", "page_id": "p2", "timestamp": 7201
"device_id": "d12", "page_id": "p2", "timestamp": 7202
"device_id": "d11", "page_id": "p3", "timestamp": 7203
"device_id": "d12", "page_id": "p3", "timestamp": 7204
"device_id": "d13", "page_id": "p3", "timestamp": 7210
"device_id": "d11", "page_id": "p1", "timestamp": 7210
"device_id": "d1", "page_id": "p1", "timestamp": 10800
"device_id": "d1", "page_id": "p2", "timestamp": 10801
"device_id": "d2", "page_id": "p2", "timestamp": 10802
"device_id": "d1", "page_id": "p3", "timestamp": 10803
"device_id": "d2", "page_id": "p3", "timestamp": 10811
"device_id": "d3", "page_id": "p3", "timestamp": 10811
环境和依赖
本地开发环境:WIN10+JDK1.8+IDEA+Maven3.6.3
<!-- 配置 -->
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<fastjson.version>2.0.19</fastjson.version>
<lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- JSON解析 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>$fastjson.version</version>
</dependency>
<!-- 简化JavaBean书写 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>$lombok.version</version>
</dependency>
</dependencies>
PV计算代码
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Scanner;
public class PvCount
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
//加入自定义数据源
DataStreamSource<String> d = env.addSource(new MySource());
//水位线策略
WatermarkStrategy<PV> w = WatermarkStrategy.<PV>forMonotonousTimestamps()
.withTimestampAssigner((SerializableTimestampAssigner<PV>) (e, recordTimestamp) -> e.timestamp * 1000L);
//WatermarkStrategy<PV> w = WatermarkStrategy.<PV>forBoundedOutOfOrderness(Duration.ofSeconds(2))
// .withTimestampAssigner((SerializableTimestampAssigner<PV>) (e, r) -> e.timestamp * 1000L);
//########################################## JSON解析 #############################################
SingleOutputStreamOperator<Tuple2<String, Long>> s = d
//1、过滤非JSON
.filter(PvCount::isValidJSON)
//2、String转JSON
.map(JSONObject::parseObject)
//3、JSON转JavaBean
.map(j -> j.toJavaObject(PV.class))
//4、过滤空pageId
.filter(j -> j.getPageId() != null)
//5、确定水位线
.assignTimestampsAndWatermarks(w)
//6、记数
.map(j -> Tuple2.of(j.getPageId(), 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));
//######################################## 计算每小时PV ###########################################
//s.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).sum(1).print("总PV");
//################################### 计算每小时每个页面PV ##########################################
s
//7、按页面ID分区
.keyBy(t -> t.f0)
//8、开启1小时的滚动窗口
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.seconds(1)) //允许迟到
//9、求和
.sum(1)
//10、输出(假设写到数据库)
.addSink(new RichSinkFunction<Tuple2<String, Long>>()
@Override
public void invoke(Tuple2<String, Long> value, Context context)
//允许迟到的情况下,当迟到数据来了后,要求更新数据库
long windowEnd = context.timestamp();
long windowStart = windowEnd - 3599999;
System.out.println(windowStart + "~" + windowEnd + ":" + value);
@Override
public void open(Configuration parameters) System.out.println("假设创建数据库连接");
@Override
public void close() System.out.println("假设关闭数据库连接");
);
//执行
env.execute();
/** 判断字符串是否能转换成合法的JSON */
public static boolean isValidJSON(String string)
try
JSONObject.parseObject(string);
return true;
catch (JSONException e)
return false;
/** 统计PageView的JavaBean */
@Data
@AllArgsConstructor
public static class PV
String deviceId;
String pageId;
Long timestamp;
/** 自定义数据源 */
public static class MySource implements SourceFunction<String>
public MySource()
@Override
public void run(SourceContext<String> sc)
Scanner scanner = new Scanner(System.in);
while (true)
String str = scanner.nextLine().trim();
if (str.equals("STOP")) break;
if (!str.equals("")) sc.collect(str);
scanner.close();
@Override
public void cancel()
并行度设1,关闭允许迟到,测试结果如下
UV计算代码
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.Scanner;
public class UvCount
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
//加入自定义数据源
DataStreamSource<String> d = env.addSource(new MySource());
//水位线策略
WatermarkStrategy<PV> w = WatermarkStrategy.<PV>forMonotonousTimestamps()
.withTimestampAssigner((SerializableTimestampAssigner<PV>) (e, recordTimestamp) -> e.timestamp * 1000L);
//########################################## JSON解析 #############################################
SingleOutputStreamOperator<PV大数据(9d)Flink流处理核心编程练习-计算PV和UV