我们如何在 Spark 结构化流 2.4.4 中缓存/持久化数据集
Posted
技术标签:
【中文标题】我们如何在 Spark 结构化流 2.4.4 中缓存/持久化数据集【英文标题】:How do we cache / persist dataset in spark structured streaming 2.4.4 【发布时间】:2020-01-17 05:53:19 【问题描述】:我想在一个计算数据集上写三个单独的输出,为此我必须缓存/持久化我的第一个数据集,否则它将计算第一个数据集三倍,这会增加我的计算时间。
例如
FirstDataset // Get data from kafka;
SecondDataset = FirstDataSet.mapPartitions(Some Calculations);
ThirdDataset = SecondDataset.mapPartitions(Some Calculations);
现在我想过滤我的 ThirdDataset 并输出过滤后的数据集,用于具有不同逻辑的三种不同条件。
ThirdDataset.filter(**Condition1**).writeStream().foreach(**SOMECALCULATIONS1**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
ThirdDataset.filter(**Condition2**).writeStream().foreach(**SOMECALCULATIONS2**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
ThirdDataset.filter(**Condition3**).writeStream().foreach(**SOMECALCULATIONS3**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
现在对于每个 writestream ThirdDataset 正在计算,如果我缓存 ThirdDataset 那么它不会计算三次。
但是当我执行ThirdDataset.cache()
时,我会出现以下错误,
线程“主”org.apache.spark.sql.AnalysisException 中的异常: 必须使用流式源执行查询 writeStream.start();;
谁能推荐我。
【问题讨论】:
【参考方案1】:使用 foreachbatch 接收器并在数据帧/数据集上进行缓存!
【讨论】:
【参考方案2】:缓存对流数据集没有意义。
SPARK-20865
您可能需要改变方法。
类似
ThirdDataset.writeStream().foreach(**SOMECALCULATIONS BASED ON CONDITION**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
cache on streaming datasets fails
【讨论】:
以上是关于我们如何在 Spark 结构化流 2.4.4 中缓存/持久化数据集的主要内容,如果未能解决你的问题,请参考以下文章