我们如何在 Spark 结构化流 2.4.4 中缓存/持久化数据集

Posted

技术标签:

【中文标题】我们如何在 Spark 结构化流 2.4.4 中缓存/持久化数据集【英文标题】:How do we cache / persist dataset in spark structured streaming 2.4.4 【发布时间】:2020-01-17 05:53:19 【问题描述】:

我想在一个计算数据集上写三个单独的输出,为此我必须缓存/持久化我的第一个数据集,否则它将计算第一个数据集三倍,这会增加我的计算时间。

例如

FirstDataset // Get data from kafka;

SecondDataset = FirstDataSet.mapPartitions(Some Calculations);

ThirdDataset = SecondDataset.mapPartitions(Some Calculations);

现在我想过滤我的 ThirdDataset 并输出过滤后的数据集,用于具有不同逻辑的三种不同条件。

ThirdDataset.filter(**Condition1**).writeStream().foreach(**SOMECALCULATIONS1**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

ThirdDataset.filter(**Condition2**).writeStream().foreach(**SOMECALCULATIONS2**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

ThirdDataset.filter(**Condition3**).writeStream().foreach(**SOMECALCULATIONS3**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

现在对于每个 writestream ThirdDataset 正在计算,如果我缓存 ThirdDataset 那么它不会计算三次。

但是当我执行ThirdDataset.cache() 时,我会出现以下错误,

线程“主”org.apache.spark.sql.AnalysisException 中的异常: 必须使用流式源执行查询 writeStream.start();;

谁能推荐我。

【问题讨论】:

【参考方案1】:

使用 foreachbatch 接收器并在数据帧/数据集上进行缓存!

【讨论】:

【参考方案2】:

缓存对流数据集没有意义。

SPARK-20865

您可能需要改变方法。

类似

ThirdDataset.writeStream().foreach(**SOMECALCULATIONS BASED ON CONDITION**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

cache on streaming datasets fails

【讨论】:

以上是关于我们如何在 Spark 结构化流 2.4.4 中缓存/持久化数据集的主要内容,如果未能解决你的问题,请参考以下文章

spark结构化流如何计算水印

如何在 Spark 结构化流中获取书面记录的数量?

Spark结构化流检查点大小巨大

如何在 Spark 结构化流中指定 deltalake 表的位置?

如何在 Spark 结构化流中保存通过水印丢弃的记录

如何将 Spark 结构化流数据写入 REST API?