如果列值取决于文件路径,有没有办法在一次读取多个文件时将文字作为列添加到 spark 数据框中?
Posted
技术标签:
【中文标题】如果列值取决于文件路径,有没有办法在一次读取多个文件时将文字作为列添加到 spark 数据框中?【英文标题】:Is there a way to add literals as columns to a spark dataframe when reading the multiple files at once if the column values depend on the filepath? 【发布时间】:2020-10-07 23:05:45 【问题描述】:我正在尝试将大量 avro 文件读入 spark 数据帧。它们都共享相同的 s3 文件路径前缀,所以最初我运行的是:
path = "s3a://bucketname/data-files"
df = spark.read.format("avro").load(path)
已成功识别所有文件。
单个文件类似于:
"s3a://bucketname/data-files/timestamp=20201007123000/id=update_account/0324345431234.avro"
在尝试操作数据时,代码不断出错,并显示其中一个文件不是 Avro 数据文件的消息。实际收到的错误信息是:org.apache.spark.SparkException: Job aborted due to stage failure: Task 62476 in stage 44102.0 failed 4 times, most recent failure: Lost task 62476.3 in stage 44102.0 (TID 267428, 10.96.134.227, executor 9): java.io.IOException: Not an Avro data file
。
为了规避这个问题,我能够获得我感兴趣的 avro 文件的显式文件路径。将它们放入列表 (file_list)
后,我能够成功运行 spark.read.format("avro").load(file_list)
。
现在的问题是 - 我有兴趣向数据帧中添加一些字段,这些字段是文件路径的一部分(即上面示例中的时间戳和 id)。
虽然只使用存储桶和前缀文件路径来查找文件(方法 #1),但这些字段会自动附加到生成的数据帧中。使用显式文件路径,我没有获得这种优势。
我想知道是否有办法在使用 spark 读取文件时包含这些列。
按顺序处理文件如下所示:
for file in file_list:
df = spark.read.format("avro").load(file)
id, timestamp = parse_filename(file)
df = df.withColumn("id", lit(id))\
.withColumn("timestamp", lit(timestamp))
但是有超过 500k 的文件,这需要很长时间。
我是 Spark 的新手,非常感谢任何帮助,谢谢!
【问题讨论】:
你可以试试这样 path = "s3a://bucketname/data-files/*.avro" 【参考方案1】:这里要解决两个不同的问题:
指定文件
Spark 内置了读取给定路径中特定类型的所有文件的处理功能。正如@Sri_Karthik 建议的那样,尝试提供像"s3a://bucketname/data-files/*.avro"
这样的路径(如果这不起作用,也许试试"s3a://bucketname/data-files/**/*.avro"
...我不记得spark 使用的确切模式匹配语法),它应该只获取所有avro 文件并摆脱在这些路径中看到非 avro 文件的错误。在我看来,这比手动获取文件路径并明确指定它们更优雅。
顺便说一句,您看到此问题的原因可能是因为文件夹通常会标有元数据文件,例如 .SUCCESS
或 .COMPLETED
,以表明它们已准备好使用。
从文件路径中提取元数据
如果您查看this *** question,它会显示如何将文件名添加为新列(对于 scala 和 pyspark)。然后,您可以使用 regexp_extract
函数从该文件名字符串中解析出所需的元素。我从来没有在 spark 中使用过 scala,所以无法帮助你,但它应该类似于 pyspark version。
【讨论】:
唯一的附加评论是我发现我必须提供这样的路径:"s3a://bucketname/data-files/*/*/.avro"
但我成功地加载了所有文件并将文件名附加为列。 regexp_extract 函数看起来确实很相似,看起来从这里一帆风顺。谢谢。【参考方案2】:
您为什么不尝试先使用 Wholetextfiles 方法读取文件,然后在开始时将路径名添加到数据本身中。然后,您可以从数据中过滤掉文件名,并在创建数据框时将其添加为列。我同意这是一个两步过程。但它应该工作。要获得文件的时间戳,您将需要 js 不可序列化的文件系统对象,即它不能用于 sparks 并行化操作,因此您必须使用文件和时间戳创建一个本地集合,并以某种方式将其与您使用 Wholetextfiles 创建的 RDD 连接起来.
【讨论】:
以上是关于如果列值取决于文件路径,有没有办法在一次读取多个文件时将文字作为列添加到 spark 数据框中?的主要内容,如果未能解决你的问题,请参考以下文章