Spark 流式传输 sourceArchiveDir 不会将文件移动到存档目录

Posted

技术标签:

【中文标题】Spark 流式传输 sourceArchiveDir 不会将文件移动到存档目录【英文标题】:Spark streaming sourceArchiveDir does not move file to archive dir 【发布时间】:2021-09-14 05:02:31 【问题描述】:

如何使用“sourceArchiveDir”和“cleanSource=archive”将源 CSV 文件移动到存档目录?我在代码下面运行,但它不会移动源文件,但是流处理工作正常,即将源文件内容打印到控制台。

import org.apache.spark.sql.streaming.OutputMode, Trigger

val inputPath = 
  "/<here is an absolute path to my project dir>/data/input/spark_full_delta/2021-06-21"

spark
.readStream
.format("csv")
.schema(jsonSchema)
.option("pathGlobFilter","customers_*2021-06-21.csv")
.option(
  "sourceArchiveDir", 
  "/<here is an absolute path to my project dir>/data/archive")
.option("cleanSource", "archive")
.option("latestFirst","false")
.option("spark.sql.streaming.fileSource.cleaner.numThreads", "2")
.option("header", "true")
.load(inputPath)
.withColumn("date", lit("2021-06-21"))
.writeStream
.outputMode(OutputMode.Append)
.trigger(Trigger.ProcessingTime("5 seconds"))
.format("console")
.start()

StructSchema 供参考:

scala> jsonSchema
res54: org.apache.spark.sql.types.StructType = StructType(
StructField(customerId,IntegerType,true), 
StructField(name,StringType,true), 
StructField(country,StringType,true), 
StructField(date,DateType,false))

文档参考:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#creating-streaming-dataframes-and-streaming-datasets。向下滚动到源表及其选项。

【问题讨论】:

【参考方案1】:

文件源存档基于更多内部 Spark 选项,您可以尝试更改(但您不必)以进行调试以加快源文件存档过程:

spark
.readStream
.format("csv")
.schema(jsonSchema)
// Number of log files after which all the previous files 
// are compacted into the next log file.
.option("spark.sql.streaming.fileSource.log.compactInterval","1")
// How long in milliseconds a file is guaranteed to 
// be visible for all readers.
.option("spark.sql.streaming.fileSource.log.cleanupDelay","1")
.option(
  "sourceArchiveDir", 
  "/<here is an absolute path to my project dir>/data/archive")
.option("cleanSource", "archive")
...

然后尝试将更多文件添加到源路径。 Spark 应该将已经看到的文件从以前的微批处理移动到“sourceArchiveDir”。

请注意,这两个选项(compactInterval、cleanupDelay)都是 Spark 内部选项,将来可能会更改,恕不另行通知。 Spark 3.2.0-SNAPSHOT 的默认值:

spark.sql.streaming.fileSource.log.compactInterval: 10
spark.sql.streaming.fileSource.log.cleanupDelay: 10 minutes

【讨论】:

以上是关于Spark 流式传输 sourceArchiveDir 不会将文件移动到存档目录的主要内容,如果未能解决你的问题,请参考以下文章

流式传输 Kmeans Spark JAVA

Spark 流式传输 Kafka 消息未使用

使用 Spark 流式传输的 Redshift

Twitter使用Spark流式传输

使用 Kafka 进行 Spark 流式传输 - createDirectStream 与 createStream

Spark 将数据流式传输到 S3