Flink 统计页面点击量
Posted 小码农叔叔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 统计页面点击量相关的知识,希望对你有一定的参考价值。
前言
在一些大型的电商网站上,对于产品经理或线上推广的营销人员来说,经常需要对页面的点击量进行分析统计,从而为页面的广告投放做更精确的数据支撑;
在实际的业务场景中,大致是这样的一个流程,页面先做用户行为的日志数据埋点,然后由实时或准实时应用将行为数据经过ETL落盘(HDFS或大数据存储引擎),之后再由下游应用对这些行为日志的数据根据业务指标进行统计分析,输出并展示成相关的大屏或报表;
如下,为一个经过ETL之后的页面点击行为的CSV文件,对于每一行数据来说,按照逗号进行分割的话,从左到右,每个字段的含义依次表示:用户ID,广告ID,省份,城市,以及时间戳;
业务实现的需求:
- 从埋点日志中,统计每小时页面广告的点击量,5秒刷新一次,并按照不同省份进行划分;
- 对于“刷单”式的频繁点击行为进行过滤,并将该用户加入黑名单;
解决思路分析:
- 根据省份进行分组,创建长度为1小时、滑动距离为5秒的时间窗口进行统计;
- 利用用 process function 进行黑名单过滤,检测用户对同一广告(页面)的点击量,如果超过指定的上限,将用户信息以侧输出流输出到黑名单中(后续针对该用户进行相关的处理);
本例的需求主要有2个,下面就这两个需求分别进行实现
一、按时间窗口统计各省份广告点击量
1、定义CSV文件的值对象
public class AdClickEvent
private Long userId;
private Long adId;
private String province;
private String city;
private Long timestamp;
public AdClickEvent()
public AdClickEvent(Long userId, Long adId, String province, String city, Long timestamp)
this.userId = userId;
this.adId = adId;
this.province = province;
this.city = city;
this.timestamp = timestamp;
public Long getUserId()
return userId;
public void setUserId(Long userId)
this.userId = userId;
public Long getAdId()
return adId;
public void setAdId(Long adId)
this.adId = adId;
public String getProvince()
return province;
public void setProvince(String province)
this.province = province;
public String getCity()
return city;
public void setCity(String city)
this.city = city;
public Long getTimestamp()
return timestamp;
public void setTimestamp(Long timestamp)
this.timestamp = timestamp;
@Override
public String toString()
return "AdClickEvent" +
"userId=" + userId +
", adId=" + adId +
", province='" + province + '\\'' +
", city='" + city + '\\'' +
", timestamp=" + timestamp +
'';
2、输出的结果值对象
public class AddCountResult
private String province;
private String windowEnd;
private Long count;
public AddCountResult(String province, String windowEnd, Long count)
this.province = province;
this.windowEnd = windowEnd;
this.count = count;
public AddCountResult()
public String getProvince()
return province;
public void setProvince(String province)
this.province = province;
public String getWindowEnd()
return windowEnd;
public void setWindowEnd(String windowEnd)
this.windowEnd = windowEnd;
public Long getCount()
return count;
public void setCount(Long count)
this.count = count;
@Override
public String toString()
return "AddCountResult" +
"province='" + province + '\\'' +
", windowEnd='" + windowEnd + '\\'' +
", count=" + count +
'';
3、统计输出核心代码
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class AdvertiseClickAgg
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//时间语义设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//从文件中读取数据
String path = "E:\\\\code-self\\\\flink_study\\\\src\\\\main\\\\resources\\\\AdClickLog.csv";
//读取数据
SingleOutputStreamOperator<AdClickEvent> adClickEventStream = env.readTextFile(path)
.map(line ->
String[] fields = line.split(",");
return new AdClickEvent(new Long(fields[0]), new Long(fields[1]), fields[2], fields[3], new Long(fields[4]));
).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<AdClickEvent>()
@Override
public long extractAscendingTimestamp(AdClickEvent element)
return element.getTimestamp() * 1000L;
);
//基于省份分组,进行开窗聚合
SingleOutputStreamOperator<AddCountResult> addCountResultStream = adClickEventStream.keyBy(AdClickEvent::getProvince)
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new AdCountAgg(), new AdCountRes());
addCountResultStream.print();
env.execute("add click count by province");
public static class AdCountRes implements WindowFunction<Long,AddCountResult,String,TimeWindow>
@Override
public void apply(String province, TimeWindow window, Iterable<Long> input, Collector<AddCountResult> out) throws Exception
String windowEnd = new Timestamp(window.getEnd()).toString();
Long count = input.iterator().next();
out.collect(new AddCountResult(province,windowEnd,count));
public static class AdCountAgg implements AggregateFunction<AdClickEvent,Long,Long>
@Override
public Long createAccumulator()
return 0L;
@Override
public Long add(AdClickEvent adClickEvent, Long aLong)
return aLong + 1;
@Override
public Long getResult(Long aLong)
return aLong;
@Override
public Long merge(Long aLong, Long acc1)
return aLong + acc1;
在这段代码中,由于我们的需求是按照窗口进行区域广告数据的统计,因此肯定要用到WindowFunction窗口函数 与 AggregateFunction 统计函数;
运行这段代码,观察控制台输出效果,这个效果即为程序中定义的,每个小时为一个统计,每5分钟滚动一次输出统计结果;
统计每个省用户广告点击量并过滤恶意点击的用户
在上一个需求的基础上,进一步有这样一个需求,即对那些恶意刷页面的用户,需要能够通过程序过滤这些用户,并以侧输出流的方式进行告警输出;
1、定义一个用于侧输出流告警的值对象
public class BlackListUserWarning
private Long userId;
private Long adId;
private String warningMsg;
public BlackListUserWarning(Long userId, Long adId, String warningMsg)
this.userId = userId;
this.adId = adId;
this.warningMsg = warningMsg;
public BlackListUserWarning()
public Long getUserId()
return userId;
public void setUserId(Long userId)
this.userId = userId;
public Long getAdId()
return adId;
public void setAdId(Long adId)
this.adId = adId;
public String getWarningMsg()
return warningMsg;
public void setWarningMsg(String warningMsg)
this.warningMsg = warningMsg;
@Override
public String toString()
return "BlackListUserWarning" +
"userId=" + userId +
", adId=" + adId +
", warningMsg='" + warningMsg + '\\'' +
'';
2、核心代码块
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.io.IOException;
import java.sql.Timestamp;
public class AdvertiseClickAggBlackIp
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//时间语义设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//从文件中读取数据
String path = "E:\\\\code-self\\\\flink_study\\\\src\\\\main\\\\resources\\\\AdClickLog.csv";
//读取数据
SingleOutputStreamOperator<AdClickEvent> adClickEventStream = env.readTextFile(path)
.map(line ->
String[] fields = line.split(",");
return new AdClickEvent(new Long(fields[0]), new Long(fields[1]), fields[2], fields[3], new Long(fields[4]));
).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<AdClickEvent>()
@Override
public long extractAscendingTimestamp(AdClickEvent element)
return element.getTimestamp() * 1000L;
);
SingleOutputStreamOperator<AdClickEvent> filterAddClickStream = adClickEventStream
.keyBy("userId", "adId")
.process(new FilterBalckIpListUser(100));
//基于省份分组,进行开窗聚合
SingleOutputStreamOperator<AddCountResult> addCountResultStream = filterAddClickStream.keyBy(AdClickEvent::getProvince)
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new AdCountAgg(), new AdCountRes());
addCountResultStream.print("主流输出");
filterAddClickStream.getSideOutput(new OutputTag<BlackListUserWarning>("blacklist")).print("侧输出流");
env.execute("add click count by province");
public static class FilterBalckIpListUser extends KeyedProcessFunction<Tuple,AdClickEvent,AdClickEvent>
private Integer countUpperBound;
public FilterBalckIpListUser(Integer countUpperBound)
this.countUpperBound=countUpperBound;
//保存用户点击某个广告的次数
ValueState<Long> countState;
//保存当前用户之前是否存在黑名单里面了
ValueState<Boolean> isSentState;
@Override
以上是关于Flink 统计页面点击量的主要内容,如果未能解决你的问题,请参考以下文章
百度统计 可以统计页面上的JS的弹出窗的打开次数吗? 怎么实现?
在电商行业,采用flink进行热门实时流量统计,流量PV和UV分析市场营销分析恶意登录监控订单支付实时监控等场景的解决方案