Spark 流式传输 sourceArchiveDir 不会将文件移动到存档目录
Posted
技术标签:
【中文标题】Spark 流式传输 sourceArchiveDir 不会将文件移动到存档目录【英文标题】:Spark streaming sourceArchiveDir does not move file to archive dir 【发布时间】:2021-09-14 05:02:31 【问题描述】:如何使用“sourceArchiveDir”和“cleanSource=archive”将源 CSV 文件移动到存档目录?我在代码下面运行,但它不会移动源文件,但是流处理工作正常,即将源文件内容打印到控制台。
import org.apache.spark.sql.streaming.OutputMode, Trigger
val inputPath =
"/<here is an absolute path to my project dir>/data/input/spark_full_delta/2021-06-21"
spark
.readStream
.format("csv")
.schema(jsonSchema)
.option("pathGlobFilter","customers_*2021-06-21.csv")
.option(
"sourceArchiveDir",
"/<here is an absolute path to my project dir>/data/archive")
.option("cleanSource", "archive")
.option("latestFirst","false")
.option("spark.sql.streaming.fileSource.cleaner.numThreads", "2")
.option("header", "true")
.load(inputPath)
.withColumn("date", lit("2021-06-21"))
.writeStream
.outputMode(OutputMode.Append)
.trigger(Trigger.ProcessingTime("5 seconds"))
.format("console")
.start()
StructSchema 供参考:
scala> jsonSchema
res54: org.apache.spark.sql.types.StructType = StructType(
StructField(customerId,IntegerType,true),
StructField(name,StringType,true),
StructField(country,StringType,true),
StructField(date,DateType,false))
文档参考:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#creating-streaming-dataframes-and-streaming-datasets。向下滚动到源表及其选项。
【问题讨论】:
【参考方案1】:文件源存档基于更多内部 Spark 选项,您可以尝试更改(但您不必)以进行调试以加快源文件存档过程:
spark
.readStream
.format("csv")
.schema(jsonSchema)
// Number of log files after which all the previous files
// are compacted into the next log file.
.option("spark.sql.streaming.fileSource.log.compactInterval","1")
// How long in milliseconds a file is guaranteed to
// be visible for all readers.
.option("spark.sql.streaming.fileSource.log.cleanupDelay","1")
.option(
"sourceArchiveDir",
"/<here is an absolute path to my project dir>/data/archive")
.option("cleanSource", "archive")
...
然后尝试将更多文件添加到源路径。 Spark 应该将已经看到的文件从以前的微批处理移动到“sourceArchiveDir”。
请注意,这两个选项(compactInterval、cleanupDelay)都是 Spark 内部选项,将来可能会更改,恕不另行通知。 Spark 3.2.0-SNAPSHOT 的默认值:
spark.sql.streaming.fileSource.log.compactInterval: 10
spark.sql.streaming.fileSource.log.cleanupDelay: 10 minutes
【讨论】:
以上是关于Spark 流式传输 sourceArchiveDir 不会将文件移动到存档目录的主要内容,如果未能解决你的问题,请参考以下文章