如何使用文件中的列和字段创建 DataFrame?
Posted
技术标签:
【中文标题】如何使用文件中的列和字段创建 DataFrame?【英文标题】:How to create a DataFrame with the columns and fields in a file? 【发布时间】:2017-05-16 09:09:03 【问题描述】:我必须创建一个带有标题和字段的 DataFrame。 标题和字段在文件中。该文件指定如下。 架构在 field5 中,col1,col2... 是我的架构,值在 field6 之后。
field1 value1;
field2 value2;
field3 value3;
field4 value4;
field5 17 col1 col2 col3 col4 col5 col6 col7 col8;
field6
val1 val 2 val3 val4 val5 val6 val7 val8
val9 val10 val11 val12 val13 val14 val15 val16
val17 val18 val19 val20 val21 val22 val23 val24;
EndOfFile;
上面是文件,我想提取值 col1,col2.......col8 并从中创建一个 Struct 并创建一个数据框,其值位于 field6 之后。
我应该用普通的 Java 代码提取 field5 吗?是否可以在 Spark Java 中进行?
【问题讨论】:
【参考方案1】:我会执行以下操作(但我使用的是 Scala,因此将其转换为 Java 是您的家庭练习):
-
使用
spark.read.text
将文件加载为常规(几乎是非结构化)文本文件
过滤掉不相关的行
使用请求的架构和行创建另一个 DataFrame
让我们看看 Scala 代码:
val input = spark.read.text("input.txt")
scala> input.show(false)
+--------------------------------------------------+
|value |
+--------------------------------------------------+
|field1 value1; |
|field2 value2; |
|field3 value3; |
|field4 value4; |
|field5 17 col1 col2 col3 col4 col5 col6 col7 col8;|
|field6 |
|val1 val 2 val3 val4 val5 val6 val7 val8 |
|val9 val10 val11 val12 val13 val14 val15 val16 |
|val17 val18 val19 val20 val21 val22 val23 val24; |
|EndOfFile; |
+--------------------------------------------------+
// trying to impress future readers ;-)
val unnecessaryLines = (2 to 4).
map(n => 'value startsWith s"field$n").
foldLeft('value startsWith "field1") case (f, orfield) => f or orfield .
or('value startsWith "field6").
or('value startsWith "EndOfFile")
scala> unnecessaryLines.explain(true)
((((StartsWith('value, field1) || StartsWith('value, field2)) || StartsWith('value, field3)) || StartsWith('value, field4)) || StartsWith('value, EndOfFile))
// Filter out the irrelevant lines
val onlyRelevantLines = input.filter(!unnecessaryLines)
scala> onlyRelevantLines.show(false)
+--------------------------------------------------+
|value |
+--------------------------------------------------+
|field5 17 col1 col2 col3 col4 col5 col6 col7 col8;|
|val1 val 2 val3 val4 val5 val6 val7 val8 |
|val9 val10 val11 val12 val13 val14 val15 val16 |
|val17 val18 val19 val20 val21 val22 val23 val24; |
+--------------------------------------------------+
这样我们就从文件中得到了唯一相关的行。 是时候找点乐子了!
// Remove field5 from the first line only and `;` at the end
val field5 = onlyRelevantLines.head.getString(0) // we're leaving Spark space and enter Scala
// the following is pure Scala code (no Spark whatsoever)
val header = field5.substring("field5 17 ".size).dropRight(1).split("\\s+").toSeq
val rows = onlyRelevantLines.filter(!('value startsWith "field5"))
scala> :type rows
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
scala> rows.show(false)
+------------------------------------------------+
|value |
+------------------------------------------------+
|val1 val 2 val3 val4 val5 val6 val7 val8 |
|val9 val10 val11 val12 val13 val14 val15 val16 |
|val17 val18 val19 val20 val21 val22 val23 val24;|
+------------------------------------------------+
这样,您就有了Dataset
应该拆分的行(每个空格)。在未发布的 Spark 2.2.0 中,将有一个方法 csv 来加载数据集,给定的分隔符会给我们想要的:
def csv(csvDataset: Dataset[String]): DataFrame
这还不可用,所以我们必须做类似的事情。
让我们尽量坚持使用 Spark SQL 的 Dataset API。
val words = rows.select(split($"value", "\\s+") as "words")
scala> words.show(false)
+---------------------------------------------------------+
|words |
+---------------------------------------------------------+
|[val1, val, 2, val3, val4, val5, val6, val7, val8] |
|[val9, val10, val11, val12, val13, val14, val15, val16] |
|[val17, val18, val19, val20, val21, val22, val23, val24;]|
+---------------------------------------------------------+
// The following is just a series of withColumn's for every column in header
val finalDF = header.zipWithIndex.foldLeft(words) case (df, (hdr, idx)) =>
df.withColumn(hdr, $"words".getItem(idx)) .
drop("words")
scala> finalDF.show
+-----+-----+-----+-----+-----+-----+-----+------+
| col1| col2| col3| col4| col5| col6| col7| col8|
+-----+-----+-----+-----+-----+-----+-----+------+
| val1| val| 2| val3| val4| val5| val6| val7|
| val9|val10|val11|val12|val13|val14|val15| val16|
|val17|val18|val19|val20|val21|val22|val23|val24;|
+-----+-----+-----+-----+-----+-----+-----+------+
完成!
【讨论】:
以上是关于如何使用文件中的列和字段创建 DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Pandas DataFrame 的列和行子集转换为 numpy 数组?
如何将数据从 python 列表中的列和行写入 csv 文件?