spark开启推测机制后数据源是kafka,出现重复消费数据怎么解决

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark开启推测机制后数据源是kafka,出现重复消费数据怎么解决相关的知识,希望对你有一定的参考价值。

因为 推测机制会在其他机器去启动同一个task,难免会有数据重复消费

参考技术A 自己维护offset提交,任务正常执行再提交

Spark中使用kafka

在SparkStreaming流式计算时我们面对的数据量通常是很大的,这时我们需要一个能提供高吞吐,高可用的队列作为我们流式计算的数据源。kafka是我们的不二选择,前面的文章中我们也介绍了kafka的使用,今天我们就来看看kafka如何跟我们的SparkStreaming结合。

用Spark Streaming流式处理kafka中的数据,通常是先把数据拉取过来后,转换为Spark Streaming中的Dstream。接收数据的方式有两种:

  1. 利用Receiver接收数据

  2. 直接从kafka的broker读取数据

今天我们先介绍第一种方式(Receiver)~

Receiver方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

注意

(1)Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。

(2)一个DStream对应一个Receiver

(3)Receiver方式会单独占用至少一个core

(4)Receiver会与Executro建立连接,将接收到的数据进行分发。

代码

下面就带大家看看如何使用SparkStreaming从kafka中读取数据,并使用wordcount

引入依赖

        <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>

Spark Streaming

object Kafka2Streaming {

val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
iter.map(t=>{
(t._1,t._2.sum+t._3.getOrElse(0))
})
}

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Kafka2Streaming")

val sc = new SparkContext(conf)
sc.setCheckpointDir("hdfs://server1:9000/bigdata/stateful")

// 创建ssc也支持传入SparkConf,这是设置checkpoint就只能使用ssc设置,并且方法为ssc.checkpoint(path)
val ssc = new StreamingContext(sc,Seconds(5))

// 通过receiver接收数据,一个receiver中可以有多个线程用于接收数据。
val dstream = KafkaUtils.createStream(ssc, "server1:2181", "group1", Map("kafkaspark"->3), StorageLevel.MEMORY_AND_DISK_SER)

val result = dstream.mapPartitions(iter=>{
iter.flatMap(_._2.split(" ")).map((_,1))
}).updateStateByKey(updateFunc,new HashPartitioner(sc.defaultParallelism),true)

result.print()

ssc.start()
ssc.awaitTermination()

}

}

注意点

  1. 使用receiver为了确保数据不丢失,可以使用checkpoint,使用SparkContext或StreamingContext设置checkpointDir

缺点

  1. receiver模式下会造成一定的资源浪费

  2. 使用checkpoint保存状态,如果需要升级程序,则会导致checkpoint无法使用

  3. 当receiver接受数据速率大于处理数据速率,导致数据积压,最终可能会导致程序挂掉。


以上是关于spark开启推测机制后数据源是kafka,出现重复消费数据怎么解决的主要内容,如果未能解决你的问题,请参考以下文章

spark推测执行 优点 缺点

spark推测执行 优点 缺点

利用feign的重试机制刷新过期的请求Token

Spark依赖包加载顺序

spark streaming 读取kafka两种方式的区别

spark-streaming读kafka数据到hive遇到的问题