将多个文件中的数据读入单个 RDD 或 Dataframe

Posted

技术标签:

【中文标题】将多个文件中的数据读入单个 RDD 或 Dataframe【英文标题】:Read data from multiple files into a single RDD or Dataframe 【发布时间】:2020-01-29 07:59:49 【问题描述】:

我有 HMP dataset。该数据集有 14 个不同的文件夹(类别),每个类别中都有多个 CSV 文件。

我想将所有 csv 文件中的数据读取到单个数据帧中。数据架构是

 val Tschema = StructType(Array(
  StructField("X", IntegerType, true),
  StructField("Y", IntegerType, nullable = true),
  StructField("Z", IntegerType, true)
 ))

我还想在数据框中再添加两列。第一列包含包含当前 CSV 的文件夹(类别)的名称,第二列包含 CSV 文件的名称。

我尝试了以下代码,但没有正常工作。

val path = System.getProperty("user.home") + "/Desktop/HMP/*"  // Path to all categories
val df =spark.sparkContext.wholeTextFiles(path)
df.toDF().show(5 , false)

我的代码输出是

+----------------------------------------------------------------------+--------------------+
|                                                                    _1|                  _2|
+----------------------------------------------------------------------+--------------------+
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |12 38 35            |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |23 56 34            |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |13 36 36            |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |39 57 42            |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |26 51 36            |
+----------------------------------------------------------------------+--------------------+

\ 之前的第一列 (_1) 是我想在单独的列 class 上的部分,其余部分将在来源列。 在 _2 部分我想应用我定义的模式。

我希望最终输出如下所示。

+---+---+---+--------------+---------------------+
|  X|  Y|  Z|         class|               source|
+---+---+---+--------------+---------------------+
| 37| 34| 43|  Climb_stairs|Accelerometer-2011...|
| 05| 39| 34|  Climb_stairs|Accelerometer-2011...|
| 30| 53| 49|  Climb_stairs|Accelerometer-2011...|
+---+---+---+-------------+----------------------+ 

【问题讨论】:

你可以添加dataframe .show(false) 选项来获得你想要的源 @MaheshGupta 是的,不会截断条目。但我想要上面提到的输出数据框(表) @MaheshGupta 我已经更新了问题和代码,请立即查看 【参考方案1】:

我认为您正在查看本地文件系统中的文件。你能在df 中详细说明你得到了什么吗?你是在本地模式下运行 spark 吗?

如果您想在 Cloudera VM 上试用,您可以按照以下步骤将其中两个 csv 文件放入 hdfs 位置

hdfs dfs -mkdir /files
hdfs dfs -put sample.csv sample2.csv /files/

运行 spark

spark2-shell
val df = spark.read.csv("/files/")
df.show

对于读取文件名和目录,您可能需要使用splitinput_file_name 函数,具体取决于文件在HDFS 上的确切位置。

您可以添加如下内容。

val df2 = df.withColumn("file_name", split(input_file_name(), "/").getItem(7).cast(StringType))

同样,您可以使用input_file_namesubstr 来获取输入目录,具体取决于您想要的部分。

【讨论】:

此代码将提供来自 CSV 文件的数据。我还想要两列具有此输出。一列包含该 CSV 文件所在目录的名称,第二列包含 CSV 文件的名称 我已经更新了问题和代码,请立即查看 我已经相应地更新了我的答案,基本上你需要使用input_file_name函数。【参考方案2】:

您可以通过使用带有拆分和反向读取的 input_file_name 来实现这一点

多个输入文件

scala>  var df_v = spark.read.format("csv").option("header",true).option("inferSchema",true).load("input_file/*.csv")
scala> df_v.show
+---------------+-------------------+
|             id|           DateTime|
+---------------+-------------------+
|340054675199675|15-01-2018 19:43:23|
|340054675199675|15-01-2018 10:56:43|
|340028465709212|10-01-2018 02:47:11|
|340054675199675|09-01-2018 10:59:10|
|340028465709212|02-01-2018 03:25:35|
|340054675199675|28-12-2017 05:48:04|
|340054675199675|21-12-2017 15:47:51|
|340028465709212|18-12-2017 10:33:04|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|12-12-2017 07:04:51|
|340054675199675|06-12-2017 08:52:38|
|       21000101|               null|
|       20991231|               null|
+---------------+-------------------+

应用 input_file_name 内置函数获取文件名

scala> var df_v1 =df_v.withColumn("file",input_file_name).withColumn("folder",reverse(split($"file","/"))(1)).withColumn("filename",reverse(split($"file","/"))(0))//.drop("file")
scala> df_v1.show(false)
+---------------+-------------------+------------------------------------------+----------+-----------+
|id             |DateTime           |file                                      |folder    |filename   |
+---------------+-------------------+------------------------------------------+----------+-----------+
|340054675199675|15-01-2018 19:43:23|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340054675199675|15-01-2018 10:56:43|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340028465709212|10-01-2018 02:47:11|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340054675199675|09-01-2018 10:59:10|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340028465709212|02-01-2018 03:25:35|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340054675199675|28-12-2017 05:48:04|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340054675199675|21-12-2017 15:47:51|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340028465709212|18-12-2017 10:33:04|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340028465709212|16-12-2017 19:55:40|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340028465709212|16-12-2017 19:55:40|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340028465709212|12-12-2017 07:04:51|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|340054675199675|06-12-2017 08:52:38|file:///home/mahesh/input_file/test.csv   |input_file|test.csv   |
|21000101       |null               |file:///home/mahesh/input_file/so_date.csv|input_file|so_date.csv|
|20991231       |null               |file:///home/mahesh/input_file/so_date.csv|input_file|so_date.csv|
+---------------+-------------------+------------------------------------------+----------+-----------+

如果您不希望我保留以进行澄清,请取消注释删除列。

【讨论】:

以上是关于将多个文件中的数据读入单个 RDD 或 Dataframe的主要内容,如果未能解决你的问题,请参考以下文章

将文件夹中的多个csv文件读入R中的单个数据框[重复]

使用 Jackson 将单个文件中的多个 JSON 对象读入 Java

如何从 Apache Spark 中的单个文件记录创建多个 RDD 行

如何将一个 RDD 拆分为两个或多个 RDD?

使用分块将 CSV 文件读入 Pandas 数据帧,生成单个目标数据帧

使用 pyspark 将 json 文件读入 RDD(不是 dataFrame)