如何使用随机列加载多个 csv 文件?
Posted
技术标签:
【中文标题】如何使用随机列加载多个 csv 文件?【英文标题】:How to load multiple csv files with shuffled columns? 【发布时间】:2019-04-01 13:37:08 【问题描述】:我有多个要加载到 hive 表中的 csv 文件,我的问题是,我的 csv 文件头的序列不固定。
如果我有两个 csv 文件
sample1.csv
column1,column2
"A","B"
和
sample2.csv
column2,column1
"A","B"
我正在尝试使用下面的代码。
spark.sql("drop table if exists faizan.sample")
val df = spark.read.format("csv").option("wholeFile", true).option("multiline",true).option("inferSchema", "true").option("header", true).option("escape","\"").csv("faizan/sample/sample/sample1.csv", "faizan/sample/sample/sample3.csv")
val newNames = Seq("column1","column2")
val dfRenamed = df.toDF(newNames: _*)
dfRenamed.createOrReplaceTempView("tempTable")
val tempDf = spark.sql("select * from tempTable where")
tempDf.write.saveAsTable("faizan.sample")
我得到了输出:
+-------+-------+
|column1|column2|
+-------+-------+
| A B|
| A B|
+-------+-------+
预期输出:
+-------+-------+
|column1|column2|
+-------+-------+
| A B|
| B A|
+-------+-------+
【问题讨论】:
【参考方案1】:我希望 spark 足够聪明,可以解决不同文件中的无序列。
尝试一一加载,但您总是创建 2 个数据框并将它们合并。
val dfReader = spark.read.format("csv").option("wholeFile", true).option("multiline",true).option("inferSchema", "true").option("header", true).option("escape","\"")
val df1 = dfReader.csv("faizan/sample/sample/sample1.csv")
val df2 = dfReader.csv("faizan/sample/sample/sample3.csv")
val df = df1.union(df2)
你可以尝试的其他事情,使用 inferSchema 为 false,创建你自己的模式为
StructType(Array(StructField("column1", StringType),StructField("column2", StringType)))
【讨论】:
它没有解决问题,我仍然得到相同的结果。【参考方案2】:也许以下解决方案将有助于解决您的问题
val df1 = spark.read.format.....(filepath1)
val df2 = spark.read.format.....(filepath2)
从sql.fucntions导入col
方法
import org.apache.spark.sql.functions.col
将 Seq(col_name1,col_name2) 或 Seq(String) 转换为 Seq(Column)
基本上df1.columns
将返回 Array[String] 列名。
所以我们应该像下面这样将 Array[String] 转换为 Array[Column]
val cols = df1.columns.map(e=>col(e))
然后在dataframe2上进行union
转换with selecting converted ordered cols of df1
df1.union(df2.select(cols:_*))
【讨论】:
以上是关于如何使用随机列加载多个 csv 文件?的主要内容,如果未能解决你的问题,请参考以下文章