spark开启推测机制后数据源是kafka,出现重复消费数据怎么解决
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark开启推测机制后数据源是kafka,出现重复消费数据怎么解决相关的知识,希望对你有一定的参考价值。
因为 推测机制会在其他机器去启动同一个task,难免会有数据重复消费
参考技术A 自己维护offset提交,任务正常执行再提交Spark中使用kafka
在SparkStreaming流式计算时我们面对的数据量通常是很大的,这时我们需要一个能提供高吞吐,高可用的队列作为我们流式计算的数据源。kafka是我们的不二选择,前面的文章中我们也介绍了kafka的使用,今天我们就来看看kafka如何跟我们的SparkStreaming结合。
用Spark Streaming流式处理kafka中的数据,通常是先把数据拉取过来后,转换为Spark Streaming中的Dstream。接收数据的方式有两种:
利用Receiver接收数据
直接从kafka的broker读取数据
今天我们先介绍第一种方式(Receiver)~
Receiver方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
注意
(1)Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
(2)一个DStream对应一个Receiver
(3)Receiver方式会单独占用至少一个core
(4)Receiver会与Executro建立连接,将接收到的数据进行分发。
代码
下面就带大家看看如何使用SparkStreaming从kafka中读取数据,并使用wordcount
引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>
Spark Streaming
object Kafka2Streaming {
val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
iter.map(t=>{
(t._1,t._2.sum+t._3.getOrElse(0))
})
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Kafka2Streaming")
val sc = new SparkContext(conf)
sc.setCheckpointDir("hdfs://server1:9000/bigdata/stateful")
// 创建ssc也支持传入SparkConf,这是设置checkpoint就只能使用ssc设置,并且方法为ssc.checkpoint(path)
val ssc = new StreamingContext(sc,Seconds(5))
// 通过receiver接收数据,一个receiver中可以有多个线程用于接收数据。
val dstream = KafkaUtils.createStream(ssc, "server1:2181", "group1", Map("kafkaspark"->3), StorageLevel.MEMORY_AND_DISK_SER)
val result = dstream.mapPartitions(iter=>{
iter.flatMap(_._2.split(" ")).map((_,1))
}).updateStateByKey(updateFunc,new HashPartitioner(sc.defaultParallelism),true)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
注意点
使用receiver为了确保数据不丢失,可以使用checkpoint,使用SparkContext或StreamingContext设置checkpointDir
缺点
receiver模式下会造成一定的资源浪费
使用checkpoint保存状态,如果需要升级程序,则会导致checkpoint无法使用
当receiver接受数据速率大于处理数据速率,导致数据积压,最终可能会导致程序挂掉。
以上是关于spark开启推测机制后数据源是kafka,出现重复消费数据怎么解决的主要内容,如果未能解决你的问题,请参考以下文章