Spark Streaming:如何定期刷新缓存的 RDD?

Posted

技术标签:

【中文标题】Spark Streaming:如何定期刷新缓存的 RDD?【英文标题】:Spark Streaming: How to periodically refresh cached RDD? 【发布时间】:2016-06-05 04:39:07 【问题描述】:

在我的 Spark 流应用程序中,我想根据从后端 (ElasticSearch) 检索到的字典映射一个值。我想定期刷新字典,以防它在后端更新。它类似于 Logstash 翻译过滤器的定期刷新功能。我如何使用 Spark 实现这一点(例如,以某种方式每 30 秒取消一次 RDD)?

【问题讨论】:

【参考方案1】:

我发现这样做的最佳方法是重新创建 RDD 并维护对它的可变引用。 Spark Streaming 的核心是基于 Spark 的调度框架。我们可以搭载调度程序来定期刷新 RDD。为此,我们使用仅为刷新操作安排的空 DStream:

def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream

// a dstream of empty data
val refreshDstream = new  ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))

var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD_ => 
    // evict the old RDD from memory and recreate it
    referenceData.unpersist(true)
    referenceData = getData()
    referenceData.cache()


val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...

过去,我也尝试过仅将 cache()unpersist() 交错而没有结果(它只刷新一次)。重新创建 RDD 会删除所有沿袭并提供新数据的干净加载。

【讨论】:

ConstantInputDStream 有 java 替代品吗? 是否保证 referenceData 刷新作业(由 getData() 触发)总是在安排 businessDataDStream 作业之前发生?我们是否会出现在调度 rdd.join(referenceData) 作业时发生 referenceData 刷新的场景?? @maasg 如何安排getData() 通话?从问题,every 30 seconds? @maasg 下面的会中断吗?为您的方法 +100!【参考方案2】:

步骤:

    在开始流式传输之前缓存一次 一段时间后清除缓存(此处示例为 30 分钟)

可选:通过 spark 修复 Hive 表可以添加到 init 中。

spark.sql("msck 修复表表名")

import java.time.LocalDateTime

var caching_data = Data.init()

caching_data.persist()

var currTime = LocalDateTime.now()

var cacheClearTime = currTime.plusMinutes(30) // Put your time in Units

DStream.foreachRDD(rdd => if (rdd.take(1).length > 0) 
  //Clear and Cache again
  currTime = LocalDateTime.now()
  val dateDiff = cacheClearTime.isBefore(currTime)
  if (dateDiff) 
    caching_data.unpersist(true) //blocking unpersist on boolean = true
    caching_data = Data.init()
    caching_data.persist()
    currTime = LocalDateTime.now()
    cacheClearTime = currTime.plusMinutes(30)
  
)

【讨论】:

以上是关于Spark Streaming:如何定期刷新缓存的 RDD?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Spark Structured Streaming 刷新 Hive/Impala 表?

spark streaming基础知识1

Spark Streaming发行版笔记16:数据清理内幕彻底解密

慕课网实战Spark Streaming实时流处理项目实战笔记二十之铭文升级版

Spark Streaming实时处理应用

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN