❤️电商用户行为分析-FlinkJava重写版本,内附具体代码❤️,可以直接学习使用❤️建议收藏!

Posted 爱小可爱的IT白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了❤️电商用户行为分析-FlinkJava重写版本,内附具体代码❤️,可以直接学习使用❤️建议收藏!相关的知识,希望对你有一定的参考价值。

前言

近些年,随着对实时数据需求越来越高,掀起了一波学习Flink的热潮,本文借鉴于尚硅谷大数据实战_电商用户行为分析(项目开发实战)学习,原始项目使用Scala,本文尝试用Java对项目进行重写,也会结合官方文档,介绍一些api的用处。话不多说,直接开始我们今天的正题:

项目整体介绍

项目主要模块

基于对电商用户行为数据的基本分类,我们可以发现主要有以下三个分析方向:

1.热门统计
利用用户的点击浏览行为,进行流量统计、近期热门商品统计等。

2.偏好统计
利用用户的偏好行为,比如收藏、喜欢、评分等,进行用户画像分析,给出个性化的商品推荐列表。

3.风险控制
利用用户的常规业务行为,比如登录、下单、支付等,分析数据,对异常情况进行报警提示。

本项目限于数据,我们只实现热门统计和风险控制中的部分内容,将包括以下五大模块:实时热门商品统计、实时流量统计、市场营销商业指标统计、恶意登录监控和订单支付失效监控,其中细分为以下9个具体指标:
由于对实时性要求较高,我们会用flink作为数据处理的框架。在项目中,我们将综合运用flink的各种API,基于EventTime去处理基本的业务需求,并且灵活地使用底层的processFunction,基于状态编程和CEP去处理更加复杂的情形。

数据源解析

行为数据UserBehavior

字段名数据类型说明
userIdLong加密后的用户ID
itemIdLong加密后的商品ID
categoryIdInt加密后的商品所属类别ID
behaviorString用户行为类型,包括(‘pv’, ‘’buy, ‘cart’, ‘fav’)
timestampLong行为发生的时间戳,单位秒

web日志数据

字段名数据类型说明
ipString访问的 IP
userIdLong访问的 user ID
eventTimeLong访问时间
methodString访问方法 GET/POST/PUT/DELETE
urlString访问的 url

热门时事商品统计

基本需求

统计近一小时热门商品,每五秒钟更新一次
热门数用浏览度pv来衡量

解决思路

过滤出用户行为中的pv
构建滑动窗口

按照商品id进行分区

.keyBy(“itemid”)

设置时间窗口

.timeWindow(Time.minutes(60),Time.minutes(5)) 滑动窗口
时间窗口左闭右开,同一份数据可以发送给满足条件的多份窗口

窗口聚合

.aggregate(new CountAgg(),new WindowResultFunction())
new CountAgg():定义聚合规则
new WindowResultFunction():定义输出的数据结构

实时流量统计–热门页面

基本需求

从web服务器日志中,统计实时热门访问页面
统计每分钟ip访问量,取出访问量最大的五个地址,每五秒更新一次

解决思路

将日志中的时间转换为时间戳
构建滑动窗口

市场营销分析–APP市场推广统计

基本需求

统计APP市场推广的数据指标
按照不同的推广渠道,分别统计数据

解决思路

通过滤过,按照不同渠道进行统计
自定义processFunction

市场营销分析–页面广告统计

基本需求

按照不同省份,统计每小时页面访问量,五秒钟统计一次
对于频繁的点击行为进行过滤,放入黑名单

解决思路

滑动窗口
利用processFunction进行黑名单过滤
其实需求的具体细节还有很多,代码实现中再展开

项目编写

热门商品统计

数据分析

// userID,itemId,categoryId,mode,timeStamp
543462,1715,1464116,pv,1511658000

定义数据输入输出的结构

// input structure
class UserBehavior {
    public long userId;
    public long itemId;
    public int categoryId;
    public String behavior;
    public long timeStamp;

    public UserBehavior() {
    }

    public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timeStamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timeStamp = timeStamp;
    }

    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", categoryId=" + categoryId +
                ", behavior='" + behavior + '\\'' +
                ", timeStamp=" + timeStamp +
                '}';
    }
}

// output structure
class ItemViewCount{
    public long itemID;
    public long windowEnd;
    public long count;

    public ItemViewCount() {
    }

    public ItemViewCount(long itemID, long windowEnd, long count) {
        this.itemID = itemID;
        this.windowEnd = windowEnd;
        this.count = count;
    }

    @Override
    public String toString() {
        return "ItemViewCount{" +
                "itemID=" + itemID +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }

WATERMARKS

为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。

时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式。

使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。

// 设置水印,处理乱序数据
// 水印策略,有界无序,定义一个固定延迟事件
// 同时时间的语义,由我们对象中的timeStamp指定
SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));

AGGREGATEFUNCTION自定义聚合规则

AggregateFunction比ReduceFunction更加通用,它有三个参数:输入类型(IN),累加器类型(ACC)和输出类型(OUT)

class CountAgg implements AggregateFunction<UserBehavior,Long,Long> {

    // 定义初始值
    @Override
    public Long createAccumulator() {
        return null;
    }

    // 组内规则
    @Override
    public Long add(UserBehavior userBehavior, Long aLong) {
        return null;
    }

    // 返回值
    @Override
    public Long getResult(Long aLong) {
        return null;
    }

    // 组间规则
    @Override
    public Long merge(Long aLong, Long acc1) {
        return null;
    }
}

WINDOWFUNCTION自定义窗口处理元素的规则

@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
    void apply(KEY var1, W var2, Iterable<IN> var3, Collector<OUT> var4) throws Exception;
}

处理函数(PROCESSFUNCTIONS)

ProcessFunction 将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction 十分相似, 但是增加了 Timer。

这里展示了其中一种ProcessFunction。

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    private static final long serialVersionUID = 1L;

    public KeyedProcessFunction() {
    }

    public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;

    public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
    }
  ......
class TopNHotItems extends KeyedProcessFunction<Long,ItemViewCount,String> {

    public int n;
    // 定义一个状态变量 list state,用来保存所有的 ItemViewCont
    public ListState<ItemViewCount> itemState;

    public TopNHotItems(int n) {
        this.n = n;
    }

    // // 在执行processElement方法之前,会最先执行并且只执行一次 open 方法
    @Override
    public void open(Configuration parameters) throws Exception {
        itemState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("itemState",ItemViewCount.class));
    }

    @Override
    public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
        itemState.add(itemViewCount);
        // 注册 windowEnd+1 的 EventTime Timer, 延迟触发,当触发时,说明收齐了属于windowEnd窗口的所有商品数据,统一排序处理
        context.timerService().registerEventTimeTimer(itemViewCount.windowEnd+1);
    }

    // 定时器触发时,会执行这个方法
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        // 已经收集到所有的数据,首先把所有的数据放到一个 List 中
        List<ItemViewCount> allItems = new ArrayList<>();
        Iterable<ItemViewCount> itemViewCounts = itemState.get();
        Iterator<ItemViewCount> iterator = itemViewCounts.iterator();
        int cnt=0;
        while (iterator.hasNext()) {
            if(cnt>=3) break;
            allItems.add(iterator.next());
            cnt++;
        }
        // 清除状态
        itemState.clear();
        // 按照 count 大小  倒序排序
        Collections.sort(allItems, new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                if(o1.count>o2.count) return -1;
                else if(o1.count==o2.count) return 0;
                else return 1;
            }
        });
        StringBuilder result = new StringBuilder();
        result.append("======================================================\\n");
        // 触发定时器时,我们多设置了1秒的延迟,这里我们将时间减去0.1获取到最精确的时间
        result.append("时间:").append(new Timestamp(timestamp - 1)).append("\\n");
        for(ItemViewCount elem:allItems) result.append(elem.toString());
        result.append("\\n");
        result.append("======================================================\\n");
        out.collect(result.toString());
    }
}

完整代码如下

注释还是写得很详细的,层层递进

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.*;

/**
 * @Description: 热门时事商品统计
 */

// input structure
class UserBehavior {
    public long userId;
    public long itemId;
    public int categoryId;
    public String behavior;
    public long timeStamp;

    public UserBehavior() {
    }

    public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timeStamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timeStamp = timeStamp;
    }

    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", categoryId=" + categoryId +
                ", behavior='" + behavior + '\\'' +
                ", timeStamp=" + timeStamp +
                '}';
    }
}

// output structure
class ItemViewCount{
    public long itemID;
    public long windowEnd;
    public long count;

    public ItemViewCount() {
    }

    public ItemViewCount(long itemID, long windowEnd, long count) {
        this.itemID = itemID;
        this.windowEnd = windowEnd;
        this.count = count;
    }

    @Override
    public String toString() {
        return "ItemViewCount{" +
                "itemID=" + itemID +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }
}


public class HotItems {

    public static void main(String[] args) throws Exception {

        // 定义流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);
        // 设置时间特征为事件事件
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // source
        // input data
        DataStreamSource<String> stringDataStreamSource = env.readTextFile("/UserBehavior.csv");

        // transform
        // 将原始数据变成UserBehavior类型
        SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String s) throws Exception {
                String[] split = s.split(",");
                return new UserBehavior(LongFlink实战之电商用户行为实时分析

Flink实战之电商用户行为实时分析

数据仓库之电商数仓-- 1用户行为数据采集

MySQL礼品电商用户行为分析

电商用户消费行为数据分析

小红书分析报告