如何使用文件中的列和字段创建 DataFrame?

Posted

技术标签:

【中文标题】如何使用文件中的列和字段创建 DataFrame?【英文标题】:How to create a DataFrame with the columns and fields in a file? 【发布时间】:2017-05-16 09:09:03 【问题描述】:

我必须创建一个带有标题和字段的 DataFrame。 标题和字段在文件中。该文件指定如下。 架构在 field5 中,col1,col2... 是我的架构,值在 field6 之后。

field1 value1;
field2 value2;
field3 value3;
field4 value4;
field5 17 col1 col2 col3 col4 col5 col6 col7 col8;
field6
val1 val 2 val3 val4 val5 val6 val7 val8
val9 val10 val11 val12 val13 val14 val15 val16
val17 val18 val19 val20 val21 val22 val23 val24;
EndOfFile; 

上面是文件,我想提取值 col1,col2.......col8 并从中创建一个 Struct 并创建一个数据框,其值位于 field6 之后。

我应该用普通的 Java 代码提取 field5 吗?是否可以在 Spark Java 中进行?

【问题讨论】:

【参考方案1】:

我会执行以下操作(但我使用的是 Scala,因此将其转换为 Java 是您的家庭练习):

    使用spark.read.text 将文件加载为常规(几乎是非结构化)文本文件 过滤掉不相关的行 使用请求的架构和行创建另一个 DataFrame

让我们看看 Scala 代码:

val input = spark.read.text("input.txt")
scala> input.show(false)
+--------------------------------------------------+
|value                                             |
+--------------------------------------------------+
|field1 value1;                                    |
|field2 value2;                                    |
|field3 value3;                                    |
|field4 value4;                                    |
|field5 17 col1 col2 col3 col4 col5 col6 col7 col8;|
|field6                                            |
|val1 val 2 val3 val4 val5 val6 val7 val8          |
|val9 val10 val11 val12 val13 val14 val15 val16    |
|val17 val18 val19 val20 val21 val22 val23 val24;  |
|EndOfFile;                                        |
+--------------------------------------------------+

// trying to impress future readers ;-)
val unnecessaryLines = (2 to 4).
  map(n => 'value startsWith s"field$n").
  foldLeft('value startsWith "field1")  case (f, orfield) => f or orfield .
  or('value startsWith "field6").
  or('value startsWith "EndOfFile")
scala> unnecessaryLines.explain(true)
((((StartsWith('value, field1) || StartsWith('value, field2)) || StartsWith('value, field3)) || StartsWith('value, field4)) || StartsWith('value, EndOfFile))

// Filter out the irrelevant lines
val onlyRelevantLines = input.filter(!unnecessaryLines)
scala> onlyRelevantLines.show(false)
+--------------------------------------------------+
|value                                             |
+--------------------------------------------------+
|field5 17 col1 col2 col3 col4 col5 col6 col7 col8;|
|val1 val 2 val3 val4 val5 val6 val7 val8          |
|val9 val10 val11 val12 val13 val14 val15 val16    |
|val17 val18 val19 val20 val21 val22 val23 val24;  |
+--------------------------------------------------+

这样我们就从文件中得到了唯一相关的行。 是时候找点乐子了!

// Remove field5 from the first line only and `;` at the end
val field5 = onlyRelevantLines.head.getString(0) // we're leaving Spark space and enter Scala
// the following is pure Scala code (no Spark whatsoever)
val header = field5.substring("field5 17 ".size).dropRight(1).split("\\s+").toSeq

val rows = onlyRelevantLines.filter(!('value startsWith "field5"))
scala> :type rows
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
scala> rows.show(false)
+------------------------------------------------+
|value                                           |
+------------------------------------------------+
|val1 val 2 val3 val4 val5 val6 val7 val8        |
|val9 val10 val11 val12 val13 val14 val15 val16  |
|val17 val18 val19 val20 val21 val22 val23 val24;|
+------------------------------------------------+

这样,您就有了Dataset 应该拆分的行(每个空格)。在未发布的 Spark 2.2.0 中,将有一个方法 csv 来加载数据集,给定的分隔符会给我们想要的:

def csv(csvDataset: Dataset[String]): DataFrame

这还不可用,所以我们必须做类似的事情。

让我们尽量坚持使用 Spark SQL 的 Dataset API。

val words = rows.select(split($"value", "\\s+") as "words")
scala> words.show(false)
+---------------------------------------------------------+
|words                                                    |
+---------------------------------------------------------+
|[val1, val, 2, val3, val4, val5, val6, val7, val8]       |
|[val9, val10, val11, val12, val13, val14, val15, val16]  |
|[val17, val18, val19, val20, val21, val22, val23, val24;]|
+---------------------------------------------------------+

// The following is just a series of withColumn's for every column in header

val finalDF = header.zipWithIndex.foldLeft(words)  case (df, (hdr, idx)) =>
  df.withColumn(hdr, $"words".getItem(idx)) .
  drop("words")
scala> finalDF.show
+-----+-----+-----+-----+-----+-----+-----+------+
| col1| col2| col3| col4| col5| col6| col7|  col8|
+-----+-----+-----+-----+-----+-----+-----+------+
| val1|  val|    2| val3| val4| val5| val6|  val7|
| val9|val10|val11|val12|val13|val14|val15| val16|
|val17|val18|val19|val20|val21|val22|val23|val24;|
+-----+-----+-----+-----+-----+-----+-----+------+

完成!

【讨论】:

以上是关于如何使用文件中的列和字段创建 DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Pandas DataFrame 的列和行子集转换为 numpy 数组?

Pandas:DataFrame数据的更改插入新增的列和行

如何将数据从 python 列表中的列和行写入 csv 文件?

如何在 Notepad++ 中删除不需要的列和字段

如何使用 Pandas 从 DataFrame 或 np.array 中的列条目创建字典

在 Python 3.x 中将基于特定列的列和值的两个 DataFrame 与 Pandas 合并