大数据(9d)Flink流处理核心编程练习:计算PV和UV

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9d)Flink流处理核心编程练习:计算PV和UV相关的知识,希望对你有一定的参考价值。

文章目录

概述

  • 本地开发环境(WIN10+IDEA)
    (本文代码可作为Flink之Transform练习模板,在#####################################之间修改业务逻辑)
  • 计算
    PV:每个页面的浏览量
    UV:每个页面的访客数(按设备ID去重)

数据样本

"device_id": "d1", "page_id": "p1", "timestamp": 1666578240
"device_id": "d1", "page_id": "p2", "timestamp": 1666578241
"device_id": "d2", "page_id": "p2", "timestamp": 1666578242
"device_id": "d1", "page_id": "p3", "timestamp": 1666578243
"device_id": "d2", "page_id": "p3", "timestamp": 1666578244
"device_id": "d3", "page_id": "p3", "timestamp": 1666578245

代码

pom.xml

<!-- 配置 -->
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.14.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>2.0.3</slf4j.version>
    <log4j.version>2.17.2</log4j.version>
    <fastjson.version>2.0.19</fastjson.version>
    <lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>$log4j.version</version>
    </dependency>
    <!-- JSON解析 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>$fastjson.version</version>
    </dependency>
    <!-- 简化JavaBean书写 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>$lombok.version</version>
    </dependency>
</dependencies>

log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

PV计算

import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Scanner;

public class PvCount 
    public static void main(String[] args) throws Exception 
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //加入自定义数据源
        env.addSource(new MySource())
        //###################################业务逻辑########################################
            //1、过滤非JSON
            .filter(PvCount::isValidJSON)
            //2、String转JSON
            .map(JSONObject::parseObject)
            //3、JSON转JavaBean
            .map(s -> s.toJavaObject(PV.class))
            //4、过滤空pageId
            .filter(s -> s.getPageId() != null)
            //5、统计每个页面的浏览量
            .map(s -> Tuple2.of(s.getPageId(), 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG))
            .keyBy(s -> s.f0)
            .sum(1)
        //################################### 业务逻辑 ########################################
        .print();
        //执行
        env.execute();
    

    /** 判断字符串是否能转换成合法的JSON */
    public static boolean isValidJSON(String string) 
        try 
            JSONObject.parseObject(string);
            return true;
         catch (JSONException e) 
            return false;
        
    

    /** 统计PageView的JavaBean */
    @Data
    @AllArgsConstructor
    public static class PV 
        String deviceId;
        String pageId;
        Long timestamp;
    

    /** 自定义数据源 */
    public static class MySource implements SourceFunction<String> 
        public MySource() 

        @Override
        public void run(SourceContext<String> sc) 
            Scanner scanner = new Scanner(System.in);
            while (true) 
                String str = scanner.nextLine().trim();
                if (str.equals("STOP")) break;
                if (!str.equals("")) sc.collect(str);
            
            scanner.close();
        

        @Override
        public void cancel() 
    

Run结果

"device_id": "d1", "page_id": "p1", "timestamp": 1666578240
(p1,1)
"device_id": "d1", "page_id": "p2", "timestamp": 1666578241
(p2,1)
"device_id": "d2", "page_id": "p2", "timestamp": 1666578242
(p2,2)
"device_id": "d1", "page_id": "p3", "timestamp": 1666578243
(p3,1)
"device_id": "d2", "page_id": "p3", "timestamp": 1666578244
(p3,2)
"device_id": "d3", "page_id": "p3", "timestamp": 1666578245
(p3,3)

UV计算

import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Scanner;

public class UvCount 
    public static void main(String[] args) throws Exception 
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //加入自定义数据源
        env.addSource(new MySource())
            //过滤非JSON数据
            .filter(UvCount::isValidJSON)
            //String=>JSON=>JavaBean
            .map(s -> JSONObject.parseObject(s).toJavaObject(PV.class))
            //过滤空ID
            .filter(s -> (s.getDeviceId() != null && s.getPageId() != null))
            //按key的hash来分区
            .keyBy(PV::getPageId)
            //按pageId分组,按deviceId去重
            .process(new KeyedProcessFunction<String, PV, Tuple2<String, Integer>>() 
                final HashMap<String, HashSet<String>> m = new HashMap<>();
                @Override
                public void processElement(PV p, Context c, Collector<Tuple2<String, Integer>> o) 
                    String pageId = p.getPageId();
                    HashSet<String> s;
                    if (m.containsKey(pageId)) 
                        s = m.get(pageId);
                     else 
                        s = new HashSet<>();
                    
                    s.add(p.getDeviceId());
                    m.put(pageId, s);
                    o.collect(Tuple2.of(pageId, s.size()));
                
            )
        .print();
        //执行
        env.execute();
    

    /** 判断字符串是否能转换成合法的JSON */
    public static boolean isValidJSON(String string) 
        try 
            JSONObject.parseObject(string);
            return true;
         catch (JSONException e) 
            return false;
        
    

    /** 统计PageView的JavaBean */
    @Data
    @AllArgsConstructor
    public static class PV 
        String deviceId;
        String pageId;
        Long timestamp;
    

    /** 自定义数据源 */
    public static class MySource implements SourceFunction<String> 
        public MySource() 

        @Override
        public void run(SourceContext<String> sc) 
            Scanner scanner = new Scanner(System.in);
            while (true) 
                String str = scanner.nextLine().trim();
                if (str.equals("STOP")) break;
                if (!str.equals("")) sc.collect(str);
            
            scanner.close();
        

        @Override
        public void cancel() 
    

Run结果

"device_id": "d1"大数据(9d)Flink流处理核心编程

大数据(9d)Flink流处理核心编程

从Storm到Flink:大数据处理的开源系统及编程模型(文末福利)

大数据框架—Flink与Beam

大数据计算引擎之Flink Flink CEP复杂事件编程

从Storm到Flink:大数据处理的开源系统及编程模型