将文件名信息附加到由 sc.textFile 初始化的 RDD

Posted

技术标签:

【中文标题】将文件名信息附加到由 sc.textFile 初始化的 RDD【英文标题】:Appending filename information to RDD initialized by sc.textFile 【发布时间】:2016-01-19 18:09:25 【问题描述】:

我有一组想要读入 RDD 的日志文件。 这些文件都是压缩的 .gz 并且文件名带有日期戳。 这些文件的来源是***的页面浏览统计数据

http://dumps.wikimedia.org/other/pagecounts-raw/

文件名如下所示:

pagecounts-20090501-000000.gz
pagecounts-20090501-010000.gz
pagecounts-20090501-020000.gz

我想要做的是读取目录中的所有此类文件,并将文件名中的日期(例如 20090501)添加到生成的 RDD 的每一行。 我首先想到的是使用 sc.wholeTextFiles(..) 而不是 sc.textFile(..),它创建了一个 PairRDD,其键是带有路径的文件名, 但 sc.wholeTextFiles() 不处理压缩的 .gz 文件。

欢迎提出任何建议。

【问题讨论】:

您使用的是什么版本的 Spark? wholeTextFiles 似乎在 Spark 1.6.0 中运行良好 检查我在答案中的代码,我没有 1.5.2 方便在这里测试它 好的,我会的,非常感谢。 【参考方案1】:

Spark 1.6.0 中的以下内容似乎对我很有效:

sc.wholeTextFiles("file:///tmp/*.gz").flatMapValues(y => y.split("\n")).take(10).foreach(println)

样本输出:

(文件:/C:/tmp/pagecounts-20160101-000000.gz,aa 271_a.C 1 4675) (file:/C:/tmp/pagecounts-20160101-000000.gz,aa Battaglia_di_Qade%C5%A1/it/Battaglia_dell%27Oronte 1 4765) (file:/C:/tmp/pagecounts-20160101-000000.gz,aa 类别:User_th 1 第4770章) (file:/C:/tmp/pagecounts-20160101-000000.gz,aa Chiron_Elias_Krase 1 4694)

【讨论】:

这样似乎成功了。它适用于 Spark 1.5.2。非常感谢。 是否可以在一组文件上使用 sc.textFiles 并获取每个文件的文件名,以便我可以将其附加到 RDD 中的行而不是使用 sc.wholeTextFiles ?由于我认为flatMapValues 的使用,我遇到了 Java 堆内存问题。 据我所知,没有办法做到这一点,你可以看一下代码,看看这两者是由谁实现的,也许可以衍生出你自己的自定义逻辑

以上是关于将文件名信息附加到由 sc.textFile 初始化的 RDD的主要内容,如果未能解决你的问题,请参考以下文章

Spark 使用 sc.textFile ("s3n://...) 从 S3 读取文件

sc.textFile()默认读取本地系统文件还是HDFS系统文件?

Spark:sc.textFiles() 与 sc.wholeTextFiles() 的区别

Spark RDD 操作实战之文件读取

spark textFile 困惑与解释

对spark内存迭代计算框架的理解误区