如何使用随机列加载多个 csv 文件?

Posted

技术标签:

【中文标题】如何使用随机列加载多个 csv 文件?【英文标题】:How to load multiple csv files with shuffled columns? 【发布时间】:2019-04-01 13:37:08 【问题描述】:

我有多个要加载到 hive 表中的 csv 文件,我的问题是,我的 csv 文件头的序列不固定。

如果我有两个 csv 文件

sample1.csv 

  column1,column2
      "A","B"

sample2.csv

column2,column1
"A","B"

我正在尝试使用下面的代码。

spark.sql("drop table if exists faizan.sample")
val df = spark.read.format("csv").option("wholeFile", true).option("multiline",true).option("inferSchema", "true").option("header", true).option("escape","\"").csv("faizan/sample/sample/sample1.csv", "faizan/sample/sample/sample3.csv")
val newNames = Seq("column1","column2") 
val dfRenamed = df.toDF(newNames: _*)
    dfRenamed.createOrReplaceTempView("tempTable")
val tempDf = spark.sql("select * from tempTable where")
    tempDf.write.saveAsTable("faizan.sample")

我得到了输出:

+-------+-------+
|column1|column2|
+-------+-------+
|      A      B|
|      A      B|
+-------+-------+

预期输出:

+-------+-------+
|column1|column2|
+-------+-------+
|      A      B|
|      B      A|
+-------+-------+

【问题讨论】:

【参考方案1】:

我希望 spark 足够聪明,可以解决不同文件中的无序列。

尝试一一加载,但您总是创建 2 个数据框并将它们合并。

val dfReader = spark.read.format("csv").option("wholeFile", true).option("multiline",true).option("inferSchema", "true").option("header", true).option("escape","\"")

val df1 = dfReader.csv("faizan/sample/sample/sample1.csv")
val df2 = dfReader.csv("faizan/sample/sample/sample3.csv")

val df = df1.union(df2)

你可以尝试的其他事情,使用 inferSchema 为 false,创建你自己的模式为

StructType(Array(StructField("column1", StringType),StructField("column2", StringType)))

【讨论】:

它没有解决问题,我仍然得到相同的结果。【参考方案2】:

也许以下解决方案将有助于解决您的问题

val df1 = spark.read.format.....(filepath1)
val df2 = spark.read.format.....(filepath2)

从sql.fucntions导入col方法

import org.apache.spark.sql.functions.col

将 Seq(col_name1,col_name2) 或 Seq(String) 转换为 Seq(Column)

基本上df1.columns 将返回 Array[String] 列名。

所以我们应该像下面这样将 Array[String] 转换为 Array[Column]

 val cols = df1.columns.map(e=>col(e))

然后在dataframe2上进行union转换with selecting converted ordered cols of df1

  df1.union(df2.select(cols:_*))

【讨论】:

以上是关于如何使用随机列加载多个 csv 文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何在pentaho中使用csv输入固定列重新排序列

使用Dask加载多个CSV文件时混合列

如何使用 Pandas 将多个 csv 文件中的单个数据列合并为一个?

使用 pyspark 读取多个 csv 文件

如何在使用 pandas 读取 csv 文件时删除特定列?

如何将 CSV 数据导入多个数组并通过 VBA 中的函数或子函数返回多个数组?