实时流计算Spark StreamingKafkaRedisExactly-once实时去重

Posted 西安大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时流计算Spark StreamingKafkaRedisExactly-once实时去重相关的知识,希望对你有一定的参考价值。

本文想记录和表达的东西挺多的,一时想不到什么好的标题,所以就用上面的关键字作为标题了。


在实时流式计算中,最重要的是在任何情况下,消息不重复、不丢失,即Exactly-once。本文以Kafka–>Spark Streaming–>Redis为例,一方面说明一下如何做到Exactly-once,另一方面说明一下我是如何计算实时去重指标的。


1. 关于数据源



数据源是文本格式的日志,由nginx产生,存放于日志服务器上。在日志服务器上部署Flume Agent,使用TAILDIR Source和Kafka Sink,将日志采集到Kafka进行临时存储。日志格式如下:


2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=DEIBAH&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=GLLIEG&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJMEC&siteid=8

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HMGBDE&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJFLA&siteid=4

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=JCEBBC&siteid=9

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=KJLAKG&siteid=8

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=FHEIKI&siteid=3

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IGIDLB&siteid=3

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IIIJCD&siteid=5


日志是由测试程序模拟产生的,字段之间由|~|分隔。


2. 实时计算需求



分天、分小时PV;

分天、分小时、分网站(siteid)PV;

分天 UV;


3. Spark Streaming消费Kafka数据



http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html


在Spark Streaming中消费Kafka数据,保证Exactly-once的核心有三点:

使用Direct方式连接Kafka;自己保存和维护Offset;更新Offset和计算在同一事务中完成;


后面的Spark Streaming程序(文章结尾),主要有以下步骤:

  1. 启动后,先从Redis中获取上次保存的Offset,Redis中的key为”topic_partition”,即每个分区维护一个Offset;

  2. 使用获取到的Offset,创建DirectStream;

  3. 在处理每批次的消息时,利用Redis的事务机制,确保在Redis中指标的计算和Offset的更新维护,在同一事务中完成。只有这两者同步,才能真正保证消息的Exactly-once。


  
    
    
  
  1. ./spark-submit \

  2. --class com.lxw1234.spark.TestSparkStreaming \

  3. --master local[2] \

  4. --conf spark.streaming.kafka.maxRatePerPartition=20000 \

  5. --jars /data1/home/dmp/lxw/realtime/commons-pool2-2.3.jar,\

  6. /data1/home/dmp/lxw/realtime/jedis-2.9.0.jar,\

  7. /data1/home/dmp/lxw/realtime/kafka-clients-0.11.0.1.jar,\

  8. /data1/home/dmp/lxw/realtime/spark-streaming-kafka-0-10_2.11-2.2.1.jar \

  9. /data1/home/dmp/lxw/realtime/testsparkstreaming.jar \

  10. --executor-memory 4G \

  11. --num-executors 1



在启动Spark Streaming程序时候,有个参数最好指定:

spark.streaming.kafka.maxRatePerPartition=20000(每秒钟从topic的每个partition最多消费的消息条数)


如果程序第一次运行,或者因为某种原因暂停了很久重新启动时候,会积累很多消息,如果这些消息同时被消费,很有可能会因为内存不够而挂掉,因此,需要根据实际的数据量大小,以及批次的间隔时间来设置该参数,以限定批次的消息量。


如果该参数设置20000,而批次间隔时间未10秒,那么每个批次最多从Kafka中消费20万消息。


4. Redis中的数据模型



  • 分小时、分网站PV

普通K-V结构,计算时候使用incr命令递增,

Key为 “site_pv_网站ID_小时”,

如:site_pv_9_2018-02-21-00、site_pv_10_2018-02-21-01

该数据模型用于计算分网站的按小时及按天PV。

  • 分小时PV

普通K-V结构,计算时候使用incr命令递增,

Key为“pv_小时”,如:pv_2018-02-21-14、pv_2018-02-22-03

该数据模型用于计算按小时及按天总PV。

  • 分天UV

Set结构,计算时候使用sadd命令添加,

Key为”uv_天”,如:uv_2018-02-21、uv_2018-02-20

该数据模型用户计算按天UV(获取时候使用SCARD命令获取Set元素个数)

 

注:这些Key对应的时间,均由实际消息中的第一个字段(时间)而定。


5. 故障恢复



如果Spark Streaming程序因为停电、网络等意外情况终止而需要恢复,则直接重启即可;


如果因为其他原因需要重新计算某一时间段的消息,可以先删除Redis中对应时间段内的Key,然后从原始日志中截取该时间段内的消息,当做新消息添加至Kafka,由Spark Streaming程序重新消费并进行计算;


6. 附程序



依赖jar包:

commons-pool2-2.3.jar

jedis-2.9.0.jar

kafka-clients-0.11.0.1.jar

spark-streaming-kafka-0-10_2.11-2.2.1.jar


InternalRedisClient (Redis链接池)

TestSparkStreaming


鉴于篇幅原因,代码请点击原文链接,到原文中查看。




以上是关于实时流计算Spark StreamingKafkaRedisExactly-once实时去重的主要内容,如果未能解决你的问题,请参考以下文章

聊聊批计算、流计算、Hadoop、Spark、Storm、Flink等等

实时流计算Spark StreamingKafkaRedisExactly-once实时去重

Spark Streaming

实时数据流计算引擎Flink和Spark剖析

面试官:Spark也能做流计算,为什么还要优先用Flink?

Spark 实战, :使用 Kafka 和 Spark Streaming 构建实时数据处理系统