Spark Streaming实时计算海量用户UV
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming实时计算海量用户UV相关的知识,希望对你有一定的参考价值。
提出需求实时统计业务系统(web,APP之类)的访问人数,即所谓UV,或者DAU指标.
这个需求怕是流计算最最最常见的需求了.
计算UV的关键点就在于去重,即同一个人访问两次是只计一个UV的.在离线计算中统计UV比较容易想到的方法就是用group或distinct机制来去重.但是在实时计算场景,还用group就不太科学了,一个是全量数据的group是比较费时的,第二个是全量数据的group是很费内存和CPU的.特别是当用户量巨大的时候,还要做到秒级更新就更难了.
总结起来,需求就是:海量用户场景UV实时计算.
分享之前我还是要推荐下我自己创建的大数据学习交流Qun531629188
无论是大牛还是想转行想学习的大学生
小编我都挺欢迎,今天的已经资讯上传到群文件,不定期分享干货,
接受挑战
不难发现,问题的主要难点就是去重.
Spark Streaming目前没有给出内置方案(这个其实可以有),但是海量数据去重问题早就有解决办法了. 所以Spark Streaming程序完全可以利用其他系统的现有方案解决去重问题,比如Redis.
Redis的海量去重计数方案
Bitmap方案
所谓的Bitmap就是用一个bit位来标记某个元素对应的Value,比如ID为2的用户,就用第2个bit位来表示,然后用该位的值来表示该用户是否访问过.如果要计算UV,那就只要数一下有多少个1就行啦.
假设我们有40亿用户,使用Bitmap需要2^32个bit位,算下来也就500M左右.
你可能没想到的是,Redis中最常用的数据结构string,就可以实现bitmap算法.
Redis提供了如下命令
1 2 3 4 5 6 7 | <span class="hljs-comment"><span class="hljs-comment">// 插入</span></span> setbit key offset value <span class="hljs-comment"><span class="hljs-comment">//获取</span></span> getbit key offset <span class="hljs-comment"><span class="hljs-comment">//计数</span></span> BITCOUNT key <span class="hljs-selector-attr"><span class="hljs-selector-attr">[start]</span></span> <span class="hljs-selector-attr"><span class="hljs-selector-attr">[end]</span></span>
|
这里offset最大值就是2^32. 比如ID为2的用户,可以setbit uv 2 1,来记录. 最后要计算UV,就直接 BITCOUNT uv. 这步计数非常快,复杂度是O(1).
HyperLogLog方案
若要计算很多页面的UV,用bitmap还是比较费空间的,N个页面就得有N个500M.这时候HyperLogLog结构就是一个比较好的选择.
Redis 在 2.8.9 版本添加了 HyperLogLog 结构。 Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。 在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。 但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
也就是说HyperLogLog是一种基数统计算法,计算结果是近似值, 12 KB 内存就可以计算2^64 个不同元素的基数.
Redis命令如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>> <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFADD</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span> "<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span>"
1) (<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 1
<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>> <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFADD</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span> "<span class="hljs-selector-tag"><span class="hljs-selector-tag">mongodb</span></span>"
1) (<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 1
<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>> <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFADD</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span> "<span class="hljs-selector-tag"><span class="hljs-selector-tag">mysql</span></span>"
1) (<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 1
<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>> <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFCOUNT</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span>
(<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 3
|
代码实现
下面给出HyperLogLog方案的参考实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 | stream.foreachRDD { rdd => <span class="hljs-comment"><span class="hljs-comment">//统计人数</span></span> rdd.foreachPartition { partition => <span class="hljs-comment"><span class="hljs-comment">//从分区所属executor的redis线程池获取一个连接.</span></span> <span class="hljs-keyword">val</span> redis = <span class="hljs-type">RedisUtil</span>.getRedis partition.<span class="hljs-keyword">foreach</span> { <span class="hljs-keyword"><span class="hljs-keyword">case</span></span> (date, userId) => <span class="hljs-comment"><span class="hljs-comment">//统计当前userId</span></span> redis.pfadd(<span class="hljs-string">s<span class="hljs-string">"uv:</span><span class="hljs-subst"><span class="hljs-string">$date</span></span><span class="hljs-string">"</span></span>, userId) } redis.close() } }
|
关于Redis的连接,如果是用java或scala可以使用JedisPool,注意处理序列化即可.
以上是关于Spark Streaming实时计算海量用户UV的主要内容,如果未能解决你的问题,请参考以下文章
Streaming30分钟概览Spark Streaming 实时计算
.Spark Streaming(上)--实时流计算Spark Streaming原理介