spark流检查点恢复非常非常慢
Posted
技术标签:
【中文标题】spark流检查点恢复非常非常慢【英文标题】:spark streaming checkpoint recovery is very very slow 【发布时间】:2016-11-18 08:05:00 【问题描述】: 目标:通过 Spark 流从 Kinesis 读取数据并以 Parquet 格式将数据存储到 S3。 情况: 应用程序最初运行良好,批量运行 1 小时,处理时间平均不到 30 分钟。出于某种原因,假设应用程序崩溃了,我们尝试从检查点重新启动。现在处理需要永远并且不会向前推进。 我们尝试以 1 分钟的批处理间隔测试相同的东西,处理运行良好,批处理完成需要 1.2 分钟。当我们从检查点恢复时,每批大约需要 15 分钟。 注意事项: 我们使用 s3 作为检查点 使用 1 个执行器,每个执行器有 19g 内存和 3 个核心附上截图:
首次运行 - 检查点恢复之前
尝试从检查点恢复:
Config.scala
object Config
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)
val checkpointDirectory = sc.hadoopConfiguration.get("checkpointDirectory")
// sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))
val numStreams = 2
def getSparkContext(): SparkContext =
this.sc
def getSqlContext(): HiveContext =
this.sqlContext
S3Basin.scala
object S3Basin
def main(args: Array[String]): Unit =
Kinesis.startStreaming(s3basinFunction _)
def s3basinFunction(streams : DStream[Array[Byte]]): Unit =
streams.foreachRDD(jsonRDDRaw =>
println(s"Old partitions $jsonRDDRaw.partitions.length")
val jsonRDD = jsonRDDRaw.coalesce(10,true)
println(s"New partitions $jsonRDD.partitions.length")
if(!jsonRDD.isEmpty())
val sqlContext = SQLContext.getOrCreate(jsonRDD.context)
sqlContext.read.json(jsonRDD.map(f=>
val str = new String(f)
if(str.startsWith("\"message\""))
str.substring(11,str.indexOf("@version")-2)
else
str
)).registerTempTable("events")
sqlContext.sql(
"""
|select
|to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
|hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
|*
|from events
""".stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)
sqlContext.dropTempTable("events")
)
Kinesis.scala
object Kinesis
def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext =
val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))) // new context
streamingContext.checkpoint(Config.checkpointDirectory) // set checkpoint directory
val sc = Config.getSparkContext
var awsCredentails : BasicAWSCredentials = null
val kinesisClient = if(Config.useIAMInstanceRole)
new AmazonKinesisClient()
else
awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
new AmazonKinesisClient(awsCredentails)
val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
val appName = sc.hadoopConfiguration.get("kinesis.appName")
val streamName = sc.hadoopConfiguration.get("kinesis.streamName")
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))
// Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
// on sequence number of records that have been received. Same as batchInterval for this
// example.
val kinesisCheckpointInterval = batchInterval
// Get the region name from the endpoint URL to save Kinesis Client Library metadata in
// DynamoDB of the same region as the Kinesis stream
val regionName = sc.hadoopConfiguration.get("kinesis.regionName")
val kinesisStreams = (0 until Config.numStreams).map i =>
println(s"creating stream for $i")
if(Config.useIAMInstanceRole)
KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
else
KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)
val unionStreams = streamingContext.union(kinesisStreams)
streamFunc(unionStreams)
streamingContext
def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) =
val sc = Config.getSparkContext
if(sc.defaultParallelism < Config.numStreams+1)
throw new Exception(s"Number of shards = $Config.numStreams , number of processor = $sc.defaultParallelism")
val streamingContext = StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))
// sys.ShutdownHookThread
// println("Gracefully stopping Spark Streaming Application")
// streamingContext.stop(true, true)
// println("Application stopped greacefully")
//
//
streamingContext.start()
streamingContext.awaitTermination()
DAG
【问题讨论】:
图片很棒,但我们可以看看你的代码吗?最好是你的 Spark DAG。 @YuvalItzchakov 添加了代码和 DAG 这是迄今为止我在 SO 上看到的最有趣的帖子。感谢所有的截图=D @Kristian 关于我在哪里可以找到更多信息的任何线索? @amit_kumar 我看图片没问题。这可能是您的本地浏览器问题。回到问题上来,您是否刚刚尝试查看您的服务器是否正在交换?即使有 19G 的内存,它也可能会做一些意想不到的事情。 【参考方案1】:提出了一个 Jira 问题:https://issues.apache.org/jira/browse/SPARK-19304
问题是因为我们每次迭代读取的数据多于所需数据,然后丢弃了这些数据。这可以通过对getResults
aws 调用添加限制来避免。
修复:https://github.com/apache/spark/pull/16842
【讨论】:
@interfector 你可能对这个感兴趣 修复它的程度很棒@gaurav-shah !!【参考方案2】:重新启动失败的驱动程序时,会发生以下情况:
-
恢复计算 – 检查点信息用于
重新启动驱动程序,重建上下文并重新启动所有
接收者。
恢复块元数据 – 将要恢复的所有块的元数据
继续处理所必需的将被恢复。
重新生成不完整的作业 - 对于处理的批次
由于失败而没有完成,RDDs和相应的
使用恢复的块元数据重新生成作业。
读取保存在日志中的块 – 执行这些作业时,
块数据直接从预写日志中读取。这恢复
可靠地保存到日志中的所有必要数据。
重新发送未确认的数据 – 未保存到的缓冲数据
失败时的日志将由源再次发送。作为
接收方尚未确认。
由于所有这些步骤都是在驱动程序中执行的,因此您的批次 0 事件需要很长时间。这应该在第一批发生,然后一切都会正常。
参考here。
【讨论】:
正确,我们知道它只发生在需要通过检查点恢复的批次上(不仅仅是第一个),我们还发现它发生在驱动程序上,但是如何解决这个问题是问题 Checkpointing真的很慢,你试过KryoSerialization吗。还可以考虑使用 Datasets databricks.com/blog/2016/01/04/…,它具有更快的 ser/desr 和 Encoders。 慢是相对的,检查点需要将数据序列化到由接收者完成的检查点位置(额外工作)。恢复后重新计算对数据的所有操作,这可能涉及网络流量,因为现在数据源是您的检查点位置。在 SPARK-JIRA issues.apache.org/jira/browse/… 上将您的号码与此 cmets 匹配 我们不需要在 1.5 spark 的 Kinesis 上启用 WAL,它可以使用 Kinesis 作为源并从中恢复。检查点只包含元数据,编写起来并不昂贵。没有 ser/de ,没有数据写入。只有元数据issues.apache.org/jira/browse/SPARK-9215 @GauravShah 您是否创建了 jira 问题?我也很有趣关注这张票。我有流式传输选项卡,在重新启动流式传输后出现 45,并且在 1 小时后批处理需要越来越长的时间导致 oom。因为当我没有从检查点重新启动时我还没有 oom,所以我想知道引擎盖内有什么附加内容。【参考方案3】:我之前也遇到过类似的问题,我的应用程序越来越慢。
使用rdd后尝试释放内存,调用rdd.unpersist()
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#unpersist(boolean)
或spark.streaming.backpressure.enabled
到true
http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval
http://spark.apache.org/docs/latest/streaming-programming-guide.html#requirements
另外,请检查您的 locality
设置,可能有太多数据移动。
【讨论】:
应用程序仅在恢复中花费时间,而不是在常规处理中。如果我的内存不足,rdd.unpersist
会有所帮助,但事实并非如此。如果我不能像数据传入一样快地使用数据,那么背压很有用,但我实际上可以做到。以上是关于spark流检查点恢复非常非常慢的主要内容,如果未能解决你的问题,请参考以下文章
为啥 spark 无法使用 getOrCreate 从检查点恢复