将每个文件激发到数据集行

Posted

技术标签:

【中文标题】将每个文件激发到数据集行【英文标题】:Spark each file to a dataset row 【发布时间】:2017-01-27 16:04:28 【问题描述】:

我在一个目录中有很多文件,每个文件都包含多行的文本。 目前,我使用以下代码将所有这些文件读取到 spark 数据集 (>2.0)

   val ddf = spark.read.text("file:///input/*")

但是,这会创建一个数据集,其中每一行都是一行,而不是一个文件。我想在数据集中的每行都有每个文件(作为字符串)。

如何在不遍历每个文件并将其作为RDD 单独读取的情况下实现此目的?

【问题讨论】:

【参考方案1】:

SparkContext 上使用wholeTextFiles()

val rdd: RDD[(String, String)] = spark.sparkContext
                                      .wholeTextFiles("file/path/to/read/as/rdd")

SparkContext.wholeTextFiles 让您可以读取包含 多个小文本文件,并以 (filename, 内容)对。这与 textFile 形成对比,后者将返回 每个文件每行一条记录。

【讨论】:

漂亮的答案,我一直在寻找。【参考方案2】:

@mrsrinivas 的答案的替代方法是按input_file_name 分组。给定结构:

evan@vbox>~/junk/so> find .        
.
./d2
./d2/t.txt
./d1
./d1/t.txt
 evan@vbox>~/junk/so> cat  */*.txt
d1_1
d1_2
d2_1
d2_2

我们可以像这样根据输入文件收集列表:

scala> val ddf = spark.read.textFile("file:///home/evan/junk/so/*").
     | select($"value", input_file_name as "fName")
ddf: org.apache.spark.sql.DataFrame = [value: string, fName: string]

scala> ddf.show(false)
+-----+----------------------------------+
|value|fName                             |
+-----+----------------------------------+
|d2_1 |file:///home/evan/junk/so/d2/t.txt|
|d2_2 |file:///home/evan/junk/so/d2/t.txt|
|d1_1 |file:///home/evan/junk/so/d1/t.txt|
|d1_2 |file:///home/evan/junk/so/d1/t.txt|
+-----+----------------------------------+

scala> ddf.groupBy("fName").agg(collect_list($"value") as "value").
     | drop("fName").show
+------------+
|       value|
+------------+
|[d1_1, d1_2]|
|[d2_1, d2_2]|
+------------+

【讨论】:

groupBy 中使用input_file_name 的好方法,不知道那个功能:)。不过,@mrsrinivas 的答案还是有点干净。 当然,我的答案一直是DataFrame,但在这种情况下使用RDD会更好 没错,但总是有 toDF 函数。

以上是关于将每个文件激发到数据集行的主要内容,如果未能解决你的问题,请参考以下文章

如何将 JSON 拆分为数据集行?

如何将结果集行作为索引子数组添加到结果数组中?

s-s-rS 报告需要根据数据集行显示 n 次

可以通过查询结果获取结果集行数吗?

如何获取数据集行的特定版本?

跨数据集行的数组元素总和 - Spark Scala