SparkStreaming使用checkpoint存在的问题及解决方案

Posted 匠心数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming使用checkpoint存在的问题及解决方案相关的知识,希望对你有一定的参考价值。

sparkstreaming关于偏移量的管理

技术分享图片

  1. 在 Direct DStream初始化的时候,需要指定一个包含每个topic的每个分区的offset用于让Direct DStream从指定位置读取数据。
    • offsets就是步骤4中所保存的offsets位置
  2. 读取并处理消息
  3. 处理完之后存储结果数据
    • 用虚线圈存储和提交offset只是简单强调用户可能会执行一系列操作来满足他们更加严格的语义要求。这包括幂等操作和通过原子操作的方式存储offset。
  4. 最后,将offsets保存在外部持久化数据库如 HBase, Kafka, HDFS, and ZooKeeper中

SparkStreaming使用checkpoint存在的问题

SparkStreaming在处理kafka中的数据时,存在一个kafka offset的管理问题:

  • 官方的解决方案是checkpoint:
    • checkpoint是对sparkstreaming运行过程中的元数据和
      每次rdds的数据状态保存到一个持久化系统中,当然这里面也包含了offset,一般是HDFS,S3,如果程序挂了,或者集群挂了,下次启动仍然能够从checkpoint中恢复,从而做到生产环境的7*24高可用。如果checkpoint存储做hdfs中,会带来小文件的问题。

但是checkpoint的最大的弊端在于,一旦你的流式程序代码或配置改变了,或者更新迭代新功能了,这个时候,你先停旧的sparkstreaming程序,然后新的程序打包编译后执行运行,会出现两种情况:

  • (1)启动报错,反序列化异常
  • (2)启动正常,但是运行的代码仍然是上一次的程序的代码。

为什么会出现上面的两种情况?

  • 这是因为checkpoint第一次持久化的时候会把整个相关的jar给序列化成一个二进制文件,每次重启都会从里面恢复,但是当你新的
    程序打包之后序列化加载的仍然是旧的序列化文件,这就会导致报错或者依旧执行旧代码。有的同学可能会说,既然如此,直接把上次的checkpoint删除了,不就能启动了吗? 确实是能启动,但是一旦你删除了旧的checkpoint,新启动的程序,只能从kafka的smallest或者largest的偏移量消费,默认是从最新的,如果是最新的,而不是上一次程序停止的那个偏移量
    就会导致有数据丢失,如果是老的,那么就会导致数据重复。不管怎么样搞,都有问题。
    https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#upgrading-application-code

针对这种问题,spark官网给出了2种解决办法:

(1)旧的不停机,新的程序继续启动,两个程序并存一段时间消费。 评价:仍然有丢重复消费的可能

(2)停机的时候,记录下最后一次的偏移量,然后新恢复的程序读取这个偏移量继续工作,从而达到不丢消息。 评价:官网没有给出具体怎么操作,只是给了个思路:自己存储offsets,

  • Your own data store

For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.

#Java
// Th#e details depend on your data store, but the general idea looks like this

// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
  fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  streamingContext,
  LocationStrategies.PreferConsistent(),
  ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  
  Object results = yourCalculation(rdd);

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
});

思路就在这段伪代码中:数据存储支持事务,在事务中更新结果和偏移量,确认偏移量正确更新。

 // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction

SparkStreaming管理kafka中offsets的几种方式

SparkStreaming管理kafka中offsets,就是将offsets采用某种数据格式存储在某个地方,一般有如下几种方式:

1. 存储在kafka

Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消费者API即异步提交API。你可以在你确保你处理后的数据已经妥善保存之后使用commitAsync API(异步提交 API)来向Kafka提交offsets。新的消费者API会以消费者组id作为唯一标识来提交offsets

将offsets提交到Kafka中

stream.foreachRDD { rdd =>

  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed

  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}

注意: commitAsync()是Spark Streaming集成kafka-0-10版本中的,在Spark文档提醒到它仍然是个实验性质的API并且存在修改的可能性

2. 存储在zookeeper

kafka消费者的偏移量本身就是存储在zookeeper中,在sparkstreaming中,需要在启动时,显示的指定从zookeeper中读取偏移量即可,参考代码如下:

step1: 初始化Zookeeper connection来从Zookeeper中获取offsets



val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)

val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)

Method for retrieving the last offsets stored in ZooKeeper of the consumer group and topic list.

def readOffsets(topics: Seq[String], groupId:String):

 Map[TopicPartition, Long] = {

 val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]

 val partitionMap = zkUtils.getPartitionsForTopics(topics)

 // /consumers/<groupId>/offsets/<topic>/

 partitionMap.foreach(topicPartitions => {

   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)

   topicPartitions._2.foreach(partition => {

     val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition

     try {

       val offsetStatTuple = zkUtils.readData(offsetPath)

       if (offsetStatTuple != null) {

         LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)

         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),

           offsetStatTuple._1.toLong)

       }

     } catch {

       case e: Exception =>

         LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)

         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)

     }

   })

 })

 topicPartOffsetMap.toMap

}

step2: 使用获取到的offsets来初始化Kafka Direct DStream

val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))
  • 用于将 可恢复的偏移量 持久化到zookeeper的方法。
#注意: Kafka offset在ZooKeeper中的存储路径为/consumers/[groupId]/offsets/topic/[partitionId], 存储的值为offset

def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {

 offsets.foreach(or => {

   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);

   val acls = new ListBuffer[ACL]()

   val acl = new ACL

   acl.setId(ANYONE_ID_UNSAFE)

   acl.setPerms(PERMISSIONS_ALL)

   acls += acl

   val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;

   val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset

   zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/"

     + or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))

   LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)

 })

}

3. 存储在hbase

  • DDL: 30天过期
create ‘stream_kafka_offsets‘, {NAME=>‘offsets‘, TTL=>2592000}
  • RowKey Layout
row:              <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
column family:    offsets
qualifier:        <PARTITION_ID>
value:            <OFFSET_ID>

For each batch of messages, saveOffsets() function is used to persist last read offsets for a given kafka topic in HBase.对每一个批次的消息,使用saveOffsets()将从指定topic中读取的offsets保存到HBase中

/*
 Save offsets for each batch into HBase
*/
def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
                hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.addResource("src/main/resources/hbase-site.xml")
  val conn = ConnectionFactory.createConnection(hbaseConf)
  val table = conn.getTable(TableName.valueOf(hbaseTableName))
  val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds)
  val put = new Put(rowKey.getBytes)
  for(offset <- offsetRanges){
    put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),
          Bytes.toBytes(offset.untilOffset.toString))
  }
  table.put(put)
  conn.close()
}

在执行streaming任务之前,首先会使用getLastCommittedOffsets()来从HBase中读取上一次任务结束时所保存的offsets。该方法将采用常用方案来返回kafka topic分区offsets。

情形1:Streaming任务第一次启动,从zookeeper中获取给定topic的分区数,然后将每个分区的offset都设置为0,并返回。

情形2:一个运行了很长时间的streaming任务停止并且给定的topic增加了新的分区,处理方式是从zookeeper中获取给定topic的分区数,对于所有老的分区,offset依然使用HBase中所保存,对于新的分区则将offset设置为0。

情形3:Streaming任务长时间运行后停止并且topic分区没有任何变化,在这个情形下,直接使用HBase中所保存的offset即可。

在Spark Streaming应用启动之后如果topic增加了新的分区,那么应用只能读取到老的分区中的数据,新的是读取不到的。所以如果想读取新的分区中的数据,那么就得重新启动Spark Streaming应用。

/* Returns last committed offsets for all the partitions of a given topic from HBase in  
following  cases.
*/
    
def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,
zkQuorum:String,zkRootDir:String,sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={
 
  val hbaseConf = HBaseConfiguration.create()
  val zkUrl = zkQuorum+"/"+zkRootDir
  val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl,
                                                sessionTimeout,connectionTimeOut)
  val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
  val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME
                                                 )).get(TOPIC_NAME).toList.head.size
  zkClientAndConnection._1.close()
  zkClientAndConnection._2.close()
 
  //Connect to HBase to retrieve last committed offsets
  val conn = ConnectionFactory.createConnection(hbaseConf)
  val table = conn.getTable(TableName.valueOf(hbaseTableName))
  val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" +
                                              String.valueOf(System.currentTimeMillis())
  val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
  val scan = new Scan()
  val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(
                                                   stopRow.getBytes).setReversed(true))
  val result = scanner.next()
  var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
  if (result != null){
  //If the result from hbase scanner is not null, set number of partitions from hbase 
  to the  number of cells
    hbaseNumberOfPartitionsForTopic = result.listCells().size()
  }

val fromOffsets = collection.mutable.Map[TopicPartition,Long]()
 
  if(hbaseNumberOfPartitionsForTopic == 0){
    // initialize fromOffsets to beginning
    for (partition <- 0 to zKNumberOfPartitionsForTopic-1){
      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
    }
  } else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
  // handle scenario where new partitions have been added to existing kafka topic
    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){
      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
                                        Bytes.toBytes(partition.toString)))
      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
    }
    for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){
      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
    }
  } else {
  //initialize fromOffsets from last run
    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){
      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
                                        Bytes.toBytes(partition.toString)))
      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
    }
  }
  scanner.close()
  conn.close()
  fromOffsets.toMap
}

当我们获取到offsets之后我们就可以创建一个Kafka Direct DStream

val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,

                                   zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)

val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,

                           Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))

在完成本批次的数据处理之后调用saveOffsets()保存offsets.

/*
For each RDD in a DStream apply a map transformation that processes the message.
*/

inputDStream.foreachRDD((rdd,batchTime) => {

  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset,

                        offset.untilOffset))

  val newRDD = rdd.map(message => processMessage(message))

  newRDD.count()

  saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime)

})

参考代码:https://github.com/gdtm86/spark-streaming-kafka-cdh511-testing

总结

综上所述,推荐使用zk维护offsets。

参考文献


tips:本文属于自己学习和实践过程的记录,很多图和文字都粘贴自网上文章,没有注明引用请包涵!如有任何问题请留言或邮件通知,我会及时回复。





以上是关于SparkStreaming使用checkpoint存在的问题及解决方案的主要内容,如果未能解决你的问题,请参考以下文章

spark streaming可以用于批处理吗

SparkStreaming基础

Kafka集成SparkStreaming

SparkStreaming(源码阅读十二)

flume kafka和sparkstreaming整合

如何在idea里面直接运行spark streaming程序