刷新缓存的数据框?
Posted
技术标签:
【中文标题】刷新缓存的数据框?【英文标题】:Refresh cached dataframe? 【发布时间】:2016-12-28 05:00:25 【问题描述】:我们有一个小蜂巢表(大约 50000 条记录),每天更新一次。
我们为该表缓存了一个数据帧,并且正在与 Spark 流数据连接。 base hive 中加载新数据时如何刷新数据框?
DataFrame tempApp = hiveContext.table("emp_data");
//Get Max Load-Date
Date max_date = max_date = tempApp.select(max("load_date")).collect()[0].getDate(0);
//Get data for latest date and cache. This will be used to join with stream data.
DataFrame emp= hiveContext.table("emp_data").where("load_date='" + max_date + "'").cache();
// Get message from Kafka Stream
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(....);
JavaDStream<MobileFlowRecord> rddMobileFlorRecs = messages.map(Record::parseFromMessage);
kafkaRecs.foreachRDD(rdd->DataFrame recordDataFrame = hiveContext.createDataFrame(rdd, Record.class);
DataFrame joinedDataSet = recordDataFrame.join(emp,
recordDataFrame.col("application").equalTo(app.col("emp_id"));
joinedDataSet. <Do furthur processing>
);
【问题讨论】:
查看此链接。 ***.com/questions/66911985/… 【参考方案1】:您可以手动完成。像这样的:
DataFrame refresh(DataFrame orig)
if (orig != null)
orig.unpersist();
DataFrame res = get the dataframe as you normally would
res.cache()
return res
现在每天调用一次,或者在您希望刷新时调用:
DataFrame join_df = refresh(join_df)
这基本上做的是取消保留(删除缓存)以前的版本,读取新版本,然后缓存它。所以在实践中,数据框被刷新了。
您应该注意,数据帧仅在刷新后第一次使用后才会保留在内存中,因为缓存是惰性的。
【讨论】:
【参考方案2】:如果不再使用 RDD 或 Dataframe,则自动生成 unpersist
火花。为了知道 RDD 或 Dataframe 是否被缓存,您可以进入 Spark UI --> Storage 选项卡并查看内存详细信息。您可以使用df.unpersist()
或sqlContext.uncacheTable("sparktable")
uncacheTable APi 从内存中删除df 或表。此选项在新的SparksessionAPi 中不可用,但始终存在向后兼容性。 Spark 为延迟评估而设计,除非并且直到您说出任何操作,否则它不会将任何数据加载或处理到 RDD 或 DataFrame 中。
所以为您执行join
后,为您的数据框执行unpersist()。这将提高性能并解决您的问题。
Databricks
【讨论】:
我怀疑我是否理解您的解决方案。缓存和取消缓存数据集可能会解决问题,但它破坏了缓存的目的,因为缓存仅对一次迭代有效。我添加了示例代码以进行更多说明。其次,我测试过,每次迭代的缓存和取消缓存都会增加大约 3 秒的延迟。想知道是否有其他方法可以实现这一目标?以上是关于刷新缓存的数据框?的主要内容,如果未能解决你的问题,请参考以下文章