有没有一种很好的方法可以用换表加入火花流?

Posted

技术标签:

【中文标题】有没有一种很好的方法可以用换表加入火花流?【英文标题】:Is there a good way to join a stream in spark with a changing table? 【发布时间】:2018-08-23 15:42:04 【问题描述】:

我们的 Spark 环境: DataBricks 4.2(包括 Apache Spark 2.3.1、Scala 2.11)

我们努力实现的目标: 我们想用一些参考数据来丰富流数据,这些参考数据会定期更新。丰富是通过将流与参考数据连接来完成的。

我们实现了什么: 我们实现了两个火花作业(罐子): 第一个是使用

每小时更新一个 Spark 表 TEST_TABLE(我们称之为“参考数据”)
<dataset>.write.mode(SaveMode.Overwrite).saveAsTable("TEST_TABLE")

然后打电话给spark.catalog.refreshTable("TEST_TABLE")

第二项工作(我们称之为流式数据)是使用 Spark 结构化流式传输读取一些数据,使用 DataFrame.transform() 将其与表 TEST_TABLE 连接并将其写入另一个系统。 我们在.transform() 调用的函数中使用spark.read.table(“TEST_TABLE”) 读取参考数据,因此我们获得了表中的最新值。不幸的是,每次第一个应用程序更新表时,第二个应用程序都会崩溃。 Log4j 输出中显示以下消息:

18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748

我们还尝试在读取表之前使缓存无效,但这会降低性能并且应用程序仍然崩溃。 我们怀疑根本过程是参考数据集的惰性评估(它仍然“指向”旧数据,不再存在)。

您有什么建议我们可以做些什么来防止这个问题,或者用动态参考数据加入流的最佳方法是什么?

【问题讨论】:

如果你点缓存它应该回到源代码。刷新数据有多大? 不要缓存我的意思 【参考方案1】:

加入参考数据;不要缓存它,这可以确保您找到源代码。查找由主键 + 计数器表示的最新版本数据,其中该计数器最接近或等于您在 Streaming 应用程序中维护的计数器。每小时写入一次,追加所有仍然是最新的参考数据,但计数器增加;即新版本。在这里使用镶木地板。

【讨论】:

谢谢! :) 我们现在使用时间戳而不是计数器,但总的来说我们使用的是您的解决方案。我们的主要问题是在参考数据表中覆盖而不是附加数据。 由于不明确了解您的情况,我只是保持简单。 @BdLearner 当我写下我应该知道的答案时。它用于非可变环境中的版本控制,可能不适用于 Cassandra 的情况。【参考方案2】:

而不是加入表和流。您可以利用 spark 2.3.1 中提供的新功能,即连接两个流数据。 创建一个流而不是带有水印的表。

Watermarks: Watermarking in Structured Streaming is a way to limit state in all 
stateful streaming operations by specifying how much late data to consider. 
Specifically, a watermark is a moving threshold in event-time that trails behind the 
maximum event-time seen by the query in the processed data. The trailing gap (aka 
watermark delay) defines how long should the engine wait for late data to arrive and 
is specified in the query using withWatermark.

Refer databricks blog

【讨论】:

关于水印,假设我有交易流和另一个流,如果国家代码发生变化,如果我使用按键更新状态,则现在有国家尝试代码,我将获得密钥的最新代码,以便我可以加入最新交易流最新密钥,水印如何帮助我处理具有国家/地区代码的维度数据,所以我是否在维度表上无限期保留水印?就我而言,国家/地区代码每周更改一次。

以上是关于有没有一种很好的方法可以用换表加入火花流?的主要内容,如果未能解决你的问题,请参考以下文章

有人可以建议使用火花流进行日志分析的最佳方法吗

如何在火花流中刷新加载的数据帧内容?

重新启动火花流应用程序的最佳方法是啥?

有没有一种很好的方法来模拟 Go 中的“可能”或“选项”类型?

有没有一种很好的 LINQ 方法来做笛卡尔积?

每个微批次火花流中处理的总记录