Spark Streaming的Exactly-One的事务处理和不重复输出详解

Posted snail_gesture

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming的Exactly-One的事务处理和不重复输出详解相关的知识,希望对你有一定的参考价值。

本篇博文组织形式如下:
一:Exactly-One的事务处理
二:输出不重复

一:Exactly-One的事务处理
一:Exactly-One的事务处理
1. 什么是事务处理:
a) 能够处理且只被处理一次。例如,银行转账,A转给B,A有且仅转一次。
b) 能够输出,且只能够输出一次。而B接收转账,且直接收一次。

2.  事务处理会不会失败呢?

可能性不大,Spark是批处理的方式来进行流处理Batch Interval的方式,Spark应用程序启动的时候为我们分配了资源,而且在计算的时候也会动态的分配资源。

3.  WAL机制:

写文件的时候,先通过WAL写入到文件系统中,然后再通过Executor存储到内存或者磁盘中。但是假设数据没有写成功的话,后面一定不会存储到Executor中,这样的话,Executor此时就不会向Driver汇报,那么这些数据一定不会被计算。因此,WAL也并不是一定就可以保证数据安全。

4.  Executor接收数据是一条一条接收的,Receiver会将数据在内存中积累到一定程度的时候才会写入到WAL或者说写入到磁盘中。但是如果还没有积累到一定程度,Receiver崩溃了咋办?

5.  InputDStream的真正产生是在:Driver端产生的。Receiver不断的接收数据,Receiver为了保证安全性,他会不断的通过容错的方式进行处理(把数据写进磁盘,写进内存同时有副本的方式,或者说WAL),

StreamingContext:第一获取数据,第二产生作业。

6.  假设数据崩溃的话,如何处理?

a) Driver端的数据恢复:直接Driver的checkpoint的文件系统中将数据读取出来,而在内部其实是重新启动SparkContext,从新构建StreamingContext,恢复出元数据,再次产生RDD,再次提交到Spark集群。
b) Receiver的重新恢复:Receiver在以前数据的基础上继续接收数据,曾经接收到的数据,通过WAL机制从磁盘中恢复回来。

数据不断的流进Executor,Receiver不断的接收数据,为了保证数据的安全性,他会不断的通过容错的方式进行处理,具体做法是:将数据写入到磁盘,内存,同时以副本的方式,或者说WAL。

Exactly Once事务处理:数据接收基于Kafka
1、 数据的零丢失:必须有可要的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint,且通过WAL来保证数据安全,包括接收的数据和元数据本身,实际生产环境中数据来源一般都是Kafka,Receiver接收到来自于Kafka中的数据,默认存储的话是MEMONY_AND_DISK_2.默认在执行计算的时候,他必须完成两台机器的容错之后,他才开始真正的进行计算。Receiver在接收数据如果崩溃的话,这个时候不会有数据丢失,此时没有完成默认副本的复制,Receiver恢复之后就可以重新接收。
2、 Spark Streaming1.3的时候,为了避免WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系统!!!
3、 Kafka是消息中间件,可以动态的接收数据,然后Spark Streaming又可以直接使用Direct API的方式直接操作Kafka,此时把Kafka作为文件存储系统,此时兼具流和文件系统的特性,直接对Kafka操作,Kafka又可以将数据存储一段时间,所以此时操作的时候直接操作Kafka数据中的Offset,这就可以确保数据肯定不会丢失,至此Spark Streaming + Kafka就构建了完美的流处理事件
(1. 数据不需要拷贝副本,
2. 不需要WAL,因此没有性能损耗。
3. Kafka比HDFS高效很多,因为Kafka中采用MEMORY COPY的方式)所有的Executor通过Kafka API直接消费数据
如何解决不会重复读取数据的问题?直接关系Offset。所以不会重复消费数据,事务实现了。

Driver默认的容错机制是:Checkpoint,一般都Checkpoint到HDFS上,因为HDFS天生是有副本。Driver端如果失败的话,可以从Checkpoint端获取数据的metada信息。

二:输出不重复
思考:
数据可能在那些地方丢失?

  1. 数据丢失及其具体的解决方式:
    a) 在Receiver收到数据且通过Driver的调度Executor开始计算数据的时候如果Driver突然崩溃,则此时Executor会被Kill掉,那么Executor中的数据就会丢失,此时就必须通过例如WAL的方式让所有的数据都通过例如HDFS的方式首先进行安全性容错处理,此时如果Executor中的数据丢失的话就可以通过WAL恢复回来。
    b) Spark Streaming在1.3的时候为了避免WAL的性能损失,和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系统。此时既具有流的优势和文件系统的优势,至此,Spark Streaming + Kafka就构成了完美的流处理世界!!!第一:数据不需要拷贝副本,第二:不需要进行WAL不会有性能损耗。第三:Kafka比HDFS高效很多,因为Kafka内部是采用copy_and_memory的方式。所有的Executor通过Kafka API直接消费数据。所以直接管理Offset,所以也不会重复消费数据。这个时候就可以保证数据一定会被处理,且被处理一次,事务实现了。

  2. 数据重复读取的情况:
    a) 在Receiver收到数据且保存到了HDFS等持久化引擎但是没有来得及updateOffsets,此时Receiver崩溃重新启动就会通过管理Kafka的Zookeeper中的元数据再次重复读取数据。但是此时Spark Streaming认为是成功的,Kafka认为是失败的(因为没有更新offset到Zookeeper中),此时就导致了数据重新消费的情况。

  3. 通过WAL的方式弊端性能损失?

    1. 通过WAL的方式弊端是会极大的损伤Spark Streaming中Receivers接收数据的性能。Receiver接收Kafka的数据方式在实际的企业中使用不是那么多,一般都是直接使用Kafka读取数据。
    2. 如果通过Kafka作为数据来源的话,Kafka中有数据,然后Receiver接受的时候又会有数据副本,这个时候其实是存储浪费。怎么解决?因为基于Zookeeper方式的话可以直接访问元数据信息,因此在处理的时候就可以将数据写入到内存数据库中,在处理的时候就去查该数据是否被处理过,如果处理过那就跳过即可。

关于Spark Streaming数据输出多次重写及其解决方案:
1. 为什么会有这个问题?因为Spark Streaming在计算的时候基于Spark Core,Spark Core天生会做一下事情导致Spark Streaming的结果(部分)重复输出:

            Task重试;
            慢任务推测;
            Stage重复;
            Job重试;
2.  具体解决方案:

设置允许失败次数。Spark.task.maxFailures次数为1
设置spark.speculation为关闭状态(因为慢任务推测其实非常消耗性能,所以关闭后可以显著的提高Spark Streaming处理性能)。
Spark Streaming on Kafka的话,Job失败后可以设置auto.offset.reset为”largest”的方式;这样就会自动进行恢复。

Spark Streaming基于Spark Core会天然的进行Task重试和Stage重试,
最后再次说明:可以通过Transform和foreachRDD基于业务逻辑代码进行逻辑控制来实现数据的不重复消费和输出不重复。这两个方法类似于Spark Streaming的后门,可以做任意想象的控制。

总结:

以上是关于Spark Streaming的Exactly-One的事务处理和不重复输出详解的主要内容,如果未能解决你的问题,请参考以下文章

spark streaming kafka example

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行

Spark Streaming带状态更新