大数据(9d)Flink流处理核心编程练习:计算PV和UV
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9d)Flink流处理核心编程练习:计算PV和UV相关的知识,希望对你有一定的参考价值。
文章目录
概述
- 本地开发环境(WIN10+IDEA)
(本文代码可作为Flink之Transform练习模板,在#####################################之间修改业务逻辑) - 计算
PV:每个页面的浏览量
UV:每个页面的访客数(按设备ID去重)
数据样本
"device_id": "d1", "page_id": "p1", "timestamp": 1666578240
"device_id": "d1", "page_id": "p2", "timestamp": 1666578241
"device_id": "d2", "page_id": "p2", "timestamp": 1666578242
"device_id": "d1", "page_id": "p3", "timestamp": 1666578243
"device_id": "d2", "page_id": "p3", "timestamp": 1666578244
"device_id": "d3", "page_id": "p3", "timestamp": 1666578245
代码
pom.xml
<!-- 配置 -->
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.3</slf4j.version>
<log4j.version>2.17.2</log4j.version>
<fastjson.version>2.0.19</fastjson.version>
<lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>$log4j.version</version>
</dependency>
<!-- JSON解析 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>$fastjson.version</version>
</dependency>
<!-- 简化JavaBean书写 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>$lombok.version</version>
</dependency>
</dependencies>
log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
PV计算
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Scanner;
public class PvCount
public static void main(String[] args) throws Exception
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//加入自定义数据源
env.addSource(new MySource())
//###################################业务逻辑########################################
//1、过滤非JSON
.filter(PvCount::isValidJSON)
//2、String转JSON
.map(JSONObject::parseObject)
//3、JSON转JavaBean
.map(s -> s.toJavaObject(PV.class))
//4、过滤空pageId
.filter(s -> s.getPageId() != null)
//5、统计每个页面的浏览量
.map(s -> Tuple2.of(s.getPageId(), 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(s -> s.f0)
.sum(1)
//################################### 业务逻辑 ########################################
.print();
//执行
env.execute();
/** 判断字符串是否能转换成合法的JSON */
public static boolean isValidJSON(String string)
try
JSONObject.parseObject(string);
return true;
catch (JSONException e)
return false;
/** 统计PageView的JavaBean */
@Data
@AllArgsConstructor
public static class PV
String deviceId;
String pageId;
Long timestamp;
/** 自定义数据源 */
public static class MySource implements SourceFunction<String>
public MySource()
@Override
public void run(SourceContext<String> sc)
Scanner scanner = new Scanner(System.in);
while (true)
String str = scanner.nextLine().trim();
if (str.equals("STOP")) break;
if (!str.equals("")) sc.collect(str);
scanner.close();
@Override
public void cancel()
Run结果
"device_id": "d1", "page_id": "p1", "timestamp": 1666578240
(p1,1)
"device_id": "d1", "page_id": "p2", "timestamp": 1666578241
(p2,1)
"device_id": "d2", "page_id": "p2", "timestamp": 1666578242
(p2,2)
"device_id": "d1", "page_id": "p3", "timestamp": 1666578243
(p3,1)
"device_id": "d2", "page_id": "p3", "timestamp": 1666578244
(p3,2)
"device_id": "d3", "page_id": "p3", "timestamp": 1666578245
(p3,3)
UV计算
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Scanner;
public class UvCount
public static void main(String[] args) throws Exception
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//加入自定义数据源
env.addSource(new MySource())
//过滤非JSON数据
.filter(UvCount::isValidJSON)
//String=>JSON=>JavaBean
.map(s -> JSONObject.parseObject(s).toJavaObject(PV.class))
//过滤空ID
.filter(s -> (s.getDeviceId() != null && s.getPageId() != null))
//按key的hash来分区
.keyBy(PV::getPageId)
//按pageId分组,按deviceId去重
.process(new KeyedProcessFunction<String, PV, Tuple2<String, Integer>>()
final HashMap<String, HashSet<String>> m = new HashMap<>();
@Override
public void processElement(PV p, Context c, Collector<Tuple2<String, Integer>> o)
String pageId = p.getPageId();
HashSet<String> s;
if (m.containsKey(pageId))
s = m.get(pageId);
else
s = new HashSet<>();
s.add(p.getDeviceId());
m.put(pageId, s);
o.collect(Tuple2.of(pageId, s.size()));
)
.print();
//执行
env.execute();
/** 判断字符串是否能转换成合法的JSON */
public static boolean isValidJSON(String string)
try
JSONObject.parseObject(string);
return true;
catch (JSONException e)
return false;
/** 统计PageView的JavaBean */
@Data
@AllArgsConstructor
public static class PV
String deviceId;
String pageId;
Long timestamp;
/** 自定义数据源 */
public static class MySource implements SourceFunction<String>
public MySource()
@Override
public void run(SourceContext<String> sc)
Scanner scanner = new Scanner(System.in);
while (true)
String str = scanner.nextLine().trim();
if (str.equals("STOP")) break;
if (!str.equals("")) sc.collect(str);
scanner.close();
@Override
public void cancel()
Run结果
"device_id": "d1"大数据(9d)Flink流处理核心编程