如何从 pyspark rdd 或分区中确定原始 s3 输入文件名

Posted

技术标签:

【中文标题】如何从 pyspark rdd 或分区中确定原始 s3 输入文件名【英文标题】:How to determine original s3 input filenames from a pyspark rdd or partition 【发布时间】:2015-11-28 17:52:41 【问题描述】:

我正在使用 pyspark 流式传输到来自 S3 的 ETL 输入文件。

我需要能够构建所有原始输入文件的审计跟踪 在 s3:// 上,我的镶木地板输出在 hdfs:// 上结束。

给定一个dstream,rdd,甚至是一个特定的rdd分区,有没有可能 确定 s3 中输入数据的原始文件名?

目前我知道这样做的唯一方法是采取 rdd.toDebugString() 并尝试解析它。然而,这感觉真的很hacky并且没有 在某些情况下工作。例如,解析调试输出不适用于我的批处理模式导入 我也在做(使用sc.TextFile("s3://...foo/*") 样式球)。

有没有人有理智的方法来确定原始文件名?

似乎其他一些 spark 用户过去曾有过这个问题,因为 示例:

http://apache-spark-user-list.1001560.n3.nabble.com/Access-original-filename-in-a-map-function-tt2831.html

谢谢!

【问题讨论】:

【参考方案1】:

我们遇到了同样的问题,而且文件足够小,所以我们使用了 sc.wholeTextFiles("s3:...foo/*")

创建("<path/filename>","<content>") 的RDD,我们将文件名附加到文件内容以供使用。

How to convert RDD[(String, String)] into RDD[Array[String]]?

【讨论】:

太好了,这解决了我的批处理模式问题。但是,假设有类似 StreamingContext.wholeTextFileStream() 的东西,它允许我从来自 dstream 的 rdd 获取源文件名?

以上是关于如何从 pyspark rdd 或分区中确定原始 s3 输入文件名的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 重新分区 RDD 元素

计算每个 pyspark RDD 分区中的元素数

在 PySpark RDD 中,如何使用 foreachPartition() 打印出每个分区的第一条记录?

spark数据分区数量的原理

如何从 PySpark rdd.mapPartitions 运行内存密集型 shell 脚本

在 pyspark RDD 上显示分区