如果列值取决于文件路径,有没有办法在一次读取多个文件时将文字作为列添加到 spark 数据框中?

Posted

技术标签:

【中文标题】如果列值取决于文件路径,有没有办法在一次读取多个文件时将文字作为列添加到 spark 数据框中?【英文标题】:Is there a way to add literals as columns to a spark dataframe when reading the multiple files at once if the column values depend on the filepath? 【发布时间】:2020-10-07 23:05:45 【问题描述】:

我正在尝试将大量 avro 文件读入 spark 数据帧。它们都共享相同的 s3 文件路径前缀,所以最初我运行的是:

path = "s3a://bucketname/data-files"
df = spark.read.format("avro").load(path)

已成功识别所有文件。

单个文件类似于:

"s3a://bucketname/data-files/timestamp=20201007123000/id=update_account/0324345431234.avro"

在尝试操作数据时,代码不断出错,并显示其中一个文件不是 Avro 数据文件的消息。实际收到的错误信息是:org.apache.spark.SparkException: Job aborted due to stage failure: Task 62476 in stage 44102.0 failed 4 times, most recent failure: Lost task 62476.3 in stage 44102.0 (TID 267428, 10.96.134.227, executor 9): java.io.IOException: Not an Avro data file

为了规避这个问题,我能够获得我感兴趣的 avro 文件的显式文件路径。将它们放入列表 (file_list) 后,我能够成功运行 spark.read.format("avro").load(file_list)

现在的问题是 - 我有兴趣向数据帧中添加一些字段,这些字段是文件路径的一部分(即上面示例中的时间戳和 id)。

虽然只使用存储桶和前缀文件路径来查找文件(方法 #1),但这些字段会自动附加到生成的数据帧中。使用显式文件路径,我没有获得这种优势。

我想知道是否有办法在使用 spark 读取文件时包含这些列。

按顺序处理文件如下所示:

for file in file_list:
    df = spark.read.format("avro").load(file)
    id, timestamp = parse_filename(file)
    df = df.withColumn("id", lit(id))\
         .withColumn("timestamp", lit(timestamp))

但是有超过 500k 的文件,这需要很长时间。

我是 Spark 的新手,非常感谢任何帮助,谢谢!

【问题讨论】:

你可以试试这样 path = "s3a://bucketname/data-files/*.avro" 【参考方案1】:

这里要解决两个不同的问题:

指定文件

Spark 内置了读取给定路径中特定类型的所有文件的处理功能。正如@Sri_Karthik 建议的那样,尝试提供像"s3a://bucketname/data-files/*.avro" 这样的路径(如果这不起作用,也许试试"s3a://bucketname/data-files/**/*.avro"...我不记得spark 使用的确切模式匹配语法),它应该只获取所有avro 文件并摆脱在这些路径中看到非 avro 文件的错误。在我看来,这比手动获取文件路径并明确指定它们更优雅。

顺便说一句,您看到此问题的原因可能是因为文件夹通常会标有元数据文件,例如 .SUCCESS.COMPLETED,以表明它们已准备好使用。

从文件路径中提取元数据

如果您查看this *** question,它会显示如何将文件名添加为新列(对于 scala 和 pyspark)。然后,您可以使用 regexp_extract 函数从该文件名字符串中解析出所需的元素。我从来没有在 spark 中使用过 scala,所以无法帮助你,但它应该类似于 pyspark version。

【讨论】:

唯一的附加评论是我发现我必须提供这样的路径:"s3a://bucketname/data-files/*/*/.avro" 但我成功地加载了所有文件并将文件名附加为列。 regexp_extract 函数看起来确实很相似,看起来从这里一帆风顺。谢谢。【参考方案2】:

您为什么不尝试先使用 Wholetextfiles 方法读取文件,然后在开始时将路径名添加到数据本身中。然后,您可以从数据中过滤掉文件名,并在创建数据框时将其添加为列。我同意这是一个两步过程。但它应该工作。要获得文件的时间戳,您将需要 js 不可序列化的文件系统对象,即它不能用于 sparks 并行化操作,因此您必须使用文件和时间戳创建一个本地集合,并以某种方式将其与您使用 Wholetextfiles 创建的 RDD 连接起来.

【讨论】:

以上是关于如果列值取决于文件路径,有没有办法在一次读取多个文件时将文字作为列添加到 spark 数据框中?的主要内容,如果未能解决你的问题,请参考以下文章

IIS.由于权限不足而无法读取配置文件的解决办法

Apache驼峰单个文件的多个动态路由

如何在一次操作中解压缩和 gzip 提取的文件?

我如何懒惰地从Rust中的文件/流中读取多个JSON值?

有没有支持并行读取的磁盘?

使用 glob 读取多个 CSV 导致顺序错误 [重复]