Spark Streaming实时计算海量用户UV

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming实时计算海量用户UV相关的知识,希望对你有一定的参考价值。

提出需求

实时统计业务系统(web,APP之类)的访问人数,即所谓UV,或者DAU指标.

这个需求怕是流计算最最最常见的需求了.

计算UV的关键点就在于去重,即同一个人访问两次是只计一个UV的.在离线计算中统计UV比较容易想到的方法就是用group或distinct机制来去重.但是在实时计算场景,还用group就不太科学了,一个是全量数据的group是比较费时的,第二个是全量数据的group是很费内存和CPU的.特别是当用户量巨大的时候,还要做到秒级更新就更难了.

总结起来,需求就是:海量用户场景UV实时计算.

分享之前我还是要推荐下我自己创建的大数据学习交流Qun531629188


无论是大牛还是想转行想学习的大学生


小编我都挺欢迎,今天的已经资讯上传到群文件,不定期分享干货,

接受挑战

不难发现,问题的主要难点就是去重.

Spark Streaming目前没有给出内置方案(这个其实可以有),但是海量数据去重问题早就有解决办法了. 所以Spark Streaming程序完全可以利用其他系统的现有方案解决去重问题,比如Redis.

Redis的海量去重计数方案

Bitmap方案

所谓的Bitmap就是用一个bit位来标记某个元素对应的Value,比如ID为2的用户,就用第2个bit位来表示,然后用该位的值来表示该用户是否访问过.如果要计算UV,那就只要数一下有多少个1就行啦.

假设我们有40亿用户,使用Bitmap需要2^32个bit位,算下来也就500M左右.

你可能没想到的是,Redis中最常用的数据结构string,就可以实现bitmap算法.

Redis提供了如下命令

1

2

3

4

5

6

7

<span class="hljs-comment"><span class="hljs-comment">// 插入</span></span>

setbit key offset value

<span class="hljs-comment"><span class="hljs-comment">//获取</span></span>

getbit key offset

<span class="hljs-comment"><span class="hljs-comment">//计数</span></span>

BITCOUNT key <span class="hljs-selector-attr"><span class="hljs-selector-attr">[start]</span></span> <span class="hljs-selector-attr"><span class="hljs-selector-attr">[end]</span></span>

 

这里offset最大值就是2^32. 比如ID为2的用户,可以setbit uv 2 1,来记录. 最后要计算UV,就直接 BITCOUNT uv. 这步计数非常快,复杂度是O(1).

HyperLogLog方案

若要计算很多页面的UV,用bitmap还是比较费空间的,N个页面就得有N个500M.这时候HyperLogLog结构就是一个比较好的选择.

Redis 在 2.8.9 版本添加了 HyperLogLog 结构。 Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。 在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。 但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

也就是说HyperLogLog是一种基数统计算法,计算结果是近似值, 12 KB 内存就可以计算2^64 个不同元素的基数.

Redis命令如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

 

<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>&gt; <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFADD</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span> "<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span>"

 

1) (<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 1

 

<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>&gt; <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFADD</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span> "<span class="hljs-selector-tag"><span class="hljs-selector-tag">mongodb</span></span>"

 

1) (<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 1

 

<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>&gt; <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFADD</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span> "<span class="hljs-selector-tag"><span class="hljs-selector-tag">mysql</span></span>"

 

1) (<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 1

 

<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>&gt; <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFCOUNT</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span>

 

(<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 3

 

 


代码实现

下面给出HyperLogLog方案的参考实现:

1

2

3

4

5

6

7

8

9

10

11

12

13

stream.foreachRDD { rdd =&gt;

<span class="hljs-comment"><span class="hljs-comment">//统计人数</span></span>

rdd.foreachPartition { partition =&gt;

<span class="hljs-comment"><span class="hljs-comment">//从分区所属executor的redis线程池获取一个连接.</span></span>

<span class="hljs-keyword">val</span> redis = <span class="hljs-type">RedisUtil</span>.getRedis

partition.<span class="hljs-keyword">foreach</span> { <span class="hljs-keyword"><span class="hljs-keyword">case</span></span> (date, userId) =&gt;

<span class="hljs-comment"><span class="hljs-comment">//统计当前userId</span></span>

redis.pfadd(<span class="hljs-string">s<span class="hljs-string">"uv:</span><span class="hljs-subst"><span class="hljs-string">$date</span></span><span class="hljs-string">"</span></span>, userId)

}

redis.close()

}

}

 

关于Redis的连接,如果是用java或scala可以使用JedisPool,注意处理序列化即可.


以上是关于Spark Streaming实时计算海量用户UV的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming实时计算框架介绍

Streaming30分钟概览Spark Streaming 实时计算

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark 实践——基于 Spark Streaming 的实时日志分析系统

Spark Streaming和Kafka集成深入浅出

Spark 实战, :使用 Kafka 和 Spark Streaming 构建实时数据处理系统