❤️电商用户行为分析-FlinkJava重写版本,内附具体代码❤️,可以直接学习使用❤️建议收藏!
Posted 爱小可爱的IT白
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了❤️电商用户行为分析-FlinkJava重写版本,内附具体代码❤️,可以直接学习使用❤️建议收藏!相关的知识,希望对你有一定的参考价值。
前言
近些年,随着对实时数据需求越来越高,掀起了一波学习Flink的热潮,本文借鉴于尚硅谷大数据实战_电商用户行为分析(项目开发实战)学习,原始项目使用Scala,本文尝试用Java对项目进行重写,也会结合官方文档,介绍一些api的用处。话不多说,直接开始我们今天的正题:
项目整体介绍
项目主要模块
基于对电商用户行为数据的基本分类,我们可以发现主要有以下三个分析方向:
1.热门统计
利用用户的点击浏览行为,进行流量统计、近期热门商品统计等。
2.偏好统计
利用用户的偏好行为,比如收藏、喜欢、评分等,进行用户画像分析,给出个性化的商品推荐列表。
3.风险控制
利用用户的常规业务行为,比如登录、下单、支付等,分析数据,对异常情况进行报警提示。
本项目限于数据,我们只实现热门统计和风险控制中的部分内容,将包括以下五大模块:实时热门商品统计、实时流量统计、市场营销商业指标统计、恶意登录监控和订单支付失效监控,其中细分为以下9个具体指标:
由于对实时性要求较高,我们会用flink作为数据处理的框架。在项目中,我们将综合运用flink的各种API,基于EventTime去处理基本的业务需求,并且灵活地使用底层的processFunction,基于状态编程和CEP去处理更加复杂的情形。
数据源解析
行为数据UserBehavior
字段名 | 数据类型 | 说明 |
---|---|---|
userId | Long | 加密后的用户ID |
itemId | Long | 加密后的商品ID |
categoryId | Int | 加密后的商品所属类别ID |
behavior | String | 用户行为类型,包括(‘pv’, ‘’buy, ‘cart’, ‘fav’) |
timestamp | Long | 行为发生的时间戳,单位秒 |
web日志数据
字段名 | 数据类型 | 说明 |
---|---|---|
ip | String | 访问的 IP |
userId | Long | 访问的 user ID |
eventTime | Long | 访问时间 |
method | String | 访问方法 GET/POST/PUT/DELETE |
url | String | 访问的 url |
热门时事商品统计
基本需求
统计近一小时热门商品,每五秒钟更新一次
热门数用浏览度pv来衡量
解决思路
过滤出用户行为中的pv
构建滑动窗口
按照商品id进行分区
.keyBy(“itemid”)
设置时间窗口
.timeWindow(Time.minutes(60),Time.minutes(5)) 滑动窗口
时间窗口左闭右开,同一份数据可以发送给满足条件的多份窗口
窗口聚合
.aggregate(new CountAgg(),new WindowResultFunction())
new CountAgg():定义聚合规则
new WindowResultFunction():定义输出的数据结构
实时流量统计–热门页面
基本需求
从web服务器日志中,统计实时热门访问页面
统计每分钟ip访问量,取出访问量最大的五个地址,每五秒更新一次
解决思路
将日志中的时间转换为时间戳
构建滑动窗口
市场营销分析–APP市场推广统计
基本需求
统计APP市场推广的数据指标
按照不同的推广渠道,分别统计数据
解决思路
通过滤过,按照不同渠道进行统计
自定义processFunction
市场营销分析–页面广告统计
基本需求
按照不同省份,统计每小时页面访问量,五秒钟统计一次
对于频繁的点击行为进行过滤,放入黑名单
解决思路
滑动窗口
利用processFunction进行黑名单过滤
其实需求的具体细节还有很多,代码实现中再展开
项目编写
热门商品统计
数据分析
// userID,itemId,categoryId,mode,timeStamp
543462,1715,1464116,pv,1511658000
定义数据输入输出的结构
// input structure
class UserBehavior {
public long userId;
public long itemId;
public int categoryId;
public String behavior;
public long timeStamp;
public UserBehavior() {
}
public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timeStamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timeStamp = timeStamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", itemId=" + itemId +
", categoryId=" + categoryId +
", behavior='" + behavior + '\\'' +
", timeStamp=" + timeStamp +
'}';
}
}
// output structure
class ItemViewCount{
public long itemID;
public long windowEnd;
public long count;
public ItemViewCount() {
}
public ItemViewCount(long itemID, long windowEnd, long count) {
this.itemID = itemID;
this.windowEnd = windowEnd;
this.count = count;
}
@Override
public String toString() {
return "ItemViewCount{" +
"itemID=" + itemID +
", windowEnd=" + windowEnd +
", count=" + count +
'}';
}
WATERMARKS
为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。
时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式。
使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。
// 设置水印,处理乱序数据
// 水印策略,有界无序,定义一个固定延迟事件
// 同时时间的语义,由我们对象中的timeStamp指定
SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));
AGGREGATEFUNCTION自定义聚合规则
AggregateFunction比ReduceFunction更加通用,它有三个参数:输入类型(IN),累加器类型(ACC)和输出类型(OUT)
class CountAgg implements AggregateFunction<UserBehavior,Long,Long> {
// 定义初始值
@Override
public Long createAccumulator() {
return null;
}
// 组内规则
@Override
public Long add(UserBehavior userBehavior, Long aLong) {
return null;
}
// 返回值
@Override
public Long getResult(Long aLong) {
return null;
}
// 组间规则
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}
WINDOWFUNCTION自定义窗口处理元素的规则
@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
void apply(KEY var1, W var2, Iterable<IN> var3, Collector<OUT> var4) throws Exception;
}
处理函数(PROCESSFUNCTIONS)
ProcessFunction 将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction 十分相似, 但是增加了 Timer。
这里展示了其中一种ProcessFunction。
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public KeyedProcessFunction() {
}
public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;
public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
......
class TopNHotItems extends KeyedProcessFunction<Long,ItemViewCount,String> {
public int n;
// 定义一个状态变量 list state,用来保存所有的 ItemViewCont
public ListState<ItemViewCount> itemState;
public TopNHotItems(int n) {
this.n = n;
}
// // 在执行processElement方法之前,会最先执行并且只执行一次 open 方法
@Override
public void open(Configuration parameters) throws Exception {
itemState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("itemState",ItemViewCount.class));
}
@Override
public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
itemState.add(itemViewCount);
// 注册 windowEnd+1 的 EventTime Timer, 延迟触发,当触发时,说明收齐了属于windowEnd窗口的所有商品数据,统一排序处理
context.timerService().registerEventTimeTimer(itemViewCount.windowEnd+1);
}
// 定时器触发时,会执行这个方法
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 已经收集到所有的数据,首先把所有的数据放到一个 List 中
List<ItemViewCount> allItems = new ArrayList<>();
Iterable<ItemViewCount> itemViewCounts = itemState.get();
Iterator<ItemViewCount> iterator = itemViewCounts.iterator();
int cnt=0;
while (iterator.hasNext()) {
if(cnt>=3) break;
allItems.add(iterator.next());
cnt++;
}
// 清除状态
itemState.clear();
// 按照 count 大小 倒序排序
Collections.sort(allItems, new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
if(o1.count>o2.count) return -1;
else if(o1.count==o2.count) return 0;
else return 1;
}
});
StringBuilder result = new StringBuilder();
result.append("======================================================\\n");
// 触发定时器时,我们多设置了1秒的延迟,这里我们将时间减去0.1获取到最精确的时间
result.append("时间:").append(new Timestamp(timestamp - 1)).append("\\n");
for(ItemViewCount elem:allItems) result.append(elem.toString());
result.append("\\n");
result.append("======================================================\\n");
out.collect(result.toString());
}
}
完整代码如下
注释还是写得很详细的,层层递进
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.*;
/**
* @Description: 热门时事商品统计
*/
// input structure
class UserBehavior {
public long userId;
public long itemId;
public int categoryId;
public String behavior;
public long timeStamp;
public UserBehavior() {
}
public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timeStamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timeStamp = timeStamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", itemId=" + itemId +
", categoryId=" + categoryId +
", behavior='" + behavior + '\\'' +
", timeStamp=" + timeStamp +
'}';
}
}
// output structure
class ItemViewCount{
public long itemID;
public long windowEnd;
public long count;
public ItemViewCount() {
}
public ItemViewCount(long itemID, long windowEnd, long count) {
this.itemID = itemID;
this.windowEnd = windowEnd;
this.count = count;
}
@Override
public String toString() {
return "ItemViewCount{" +
"itemID=" + itemID +
", windowEnd=" + windowEnd +
", count=" + count +
'}';
}
}
public class HotItems {
public static void main(String[] args) throws Exception {
// 定义流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
// 设置时间特征为事件事件
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// source
// input data
DataStreamSource<String> stringDataStreamSource = env.readTextFile("/UserBehavior.csv");
// transform
// 将原始数据变成UserBehavior类型
SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] split = s.split(",");
return new UserBehavior(LongFlink实战之电商用户行为实时分析