Pyspark 从具有不同列的行/数据创建 DataFrame

Posted

技术标签:

【中文标题】Pyspark 从具有不同列的行/数据创建 DataFrame【英文标题】:Pyspark create DataFrame from rows/data with varying columns 【发布时间】:2018-11-29 14:35:15 【问题描述】:

我有多个键/值对的数据/行,键的数量未知——有些重叠,有些没有——我想从中创建一个 Spark DataFrame。我的最终目标是从此 DataFrame 编写 CSV。

我对输入数据/行具有灵活性:它们最容易是 JSON 字符串,但可以通过 可能重叠的键进行转换:

"color":"red", "animal":"fish"
"color":"green", "animal":"panda"
"color":"red", "animal":"panda", "fruit":"watermelon"
"animal":"aardvark"
"color":"blue", "fruit":"apple"

理想情况下,我想根据这些数据创建一个如下所示的 DataFrame:

-----------------------------
color | animal   | fruit
-----------------------------
red   | fish     | null
green | panda    | null
red   | panda    | watermelon
null  | aardvark | null
blue  | null     | apple
-----------------------------

值得注意的是,没有特定键的数据/行是null,数据/行中的所有键都表示为列。

我对 Spark 的许多基础知识感到相对自在,但在设想一个有效地使用键/值对获取我的 RDD/DataFrame 的过程时遇到了困难 -- 但列和键的数量未知 -- 并创建一个以这些键为列的 DataFrame。

高效,因为如果可能,我想避免创建一个所有输入行都保存在内存中的对象(例如单个字典)。

再一次,编写 CSV 的最终目标是,我假设创建一个 DataFrame 是实现这一目标的合乎逻辑的步骤。

另一个皱纹:

一些数据将是多值的,例如:

"color":"pink", "animal":["fish","mustang"]
"color":["orange","purple"], "animal":"panda"

使用提供的分隔符,例如/ 为避免与 , 冲突以分隔列,我想在列的输出中分隔这些,例如:

------------------------------------
color         | animal       | fruit
------------------------------------
pink          | fish/mustang | null
orange/purple | panda        | null
------------------------------------

一旦有了解决主要问题的方法,我相信我可以解决这部分问题,但无论如何都会把它扔掉,因为这将是问题的一个维度。

【问题讨论】:

你试过df = spark.read.json("myfile.json")。在你的第一个例子中似乎对我有用。 更新:它也适用于您的第二个示例,但将所有记录视为字符串,因此您必须执行一些 regex to convert the string representation of the list 以按照您想要的方式对其进行格式化。 感谢@pault 的想法。我正要说它可能行不通,因为我的数据实际上来自 DataFrame,我正在将单列 XML 转换为 JSON 字符串。但这很有趣,我可以用 JSON 行创建一个 RDD,编写它,然后读取它?还是有另一种方法可以从 RDD 模拟 .json() 方法,而不是从读取外部位置? 意识到read.json() 也可能接受 RDD,spark.apache.org/docs/latest/api/python/…,试一试... 【参考方案1】:

从文件中读取

如果您的数据存储在一个文件中(假设它被命名为myfile.json),如下所示:

"color":"red", "animal":"fish"
"color":"green", "animal":"panda"
"color":"red", "animal":"panda", "fruit":"watermelon"
"animal":"aardvark"
"color":"blue", "fruit":"apple"
"color":"pink", "animal":["fish","mustang"]
"color":["orange","purple"], "animal":"panda"

您可以使用pyspark.sql.DataFrameReader.json 将文件读取为换行符分隔的 JSON 记录。

df = spark.read.json("myfile.json")
df.show()
#+------------------+-------------------+----------+
#|            animal|              color|     fruit|
#+------------------+-------------------+----------+
#|              fish|                red|      null|
#|             panda|              green|      null|
#|             panda|                red|watermelon|
#|          aardvark|               null|      null|
#|              null|               blue|     apple|
#|["fish","mustang"]|               pink|      null|
#|             panda|["orange","purple"]|      null|
#+------------------+-------------------+----------+

df.printSchema()
#root
# |-- animal: string (nullable = true)
# |-- color: string (nullable = true)
# |-- fruit: string (nullable = true)

从 RDD 读取

您也可以这样做来读取rdd

import json

rdd = sc.parallelize(
    map(
        json.dumps,
        [
            "color":"red", "animal":"fish",
            "color":"green", "animal":"panda",
            "color":"red", "animal":"panda", "fruit":"watermelon",
            "animal":"aardvark",
            "color":"blue", "fruit":"apple",
            "color":"pink", "animal":["fish","mustang"],
            "color":["orange","purple"], "animal":"panda"
        ]
    )
)

df = spark.read.json(rdd)

对于第二部分,您可以根据需要使用pyspark.sql.functions.regexp_replace 来格式化您的多值记录。

from pyspark.sql.functions import regexp_replace

def format_column(column):
    return regexp_replace(regexp_replace(column, '(^\[)|(\]$)|(")', ''), ",", "/") 

df.select(*[format_column(c).alias(c) for c in df.columns]).show()

#+------------+-------------+----------+
#|      animal|        color|     fruit|
#+------------+-------------+----------+
#|        fish|          red|      null|
#|       panda|        green|      null|
#|       panda|          red|watermelon|
#|    aardvark|         null|      null|
#|        null|         blue|     apple|
#|fish/mustang|         pink|      null|
#|       panda|orange/purple|      null|
#+------------+-------------+----------+

【讨论】:

直到现在我才意识到使用spark.read.json 读取 RDD 是可能的,这打破了其余部分。非常感谢!用于分解多值的正则表达式语法相同(这就是我在非 Spark 上下文中这样做的方式)。

以上是关于Pyspark 从具有不同列的行/数据创建 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

在 Pyspark/Hive 中有条件的运行总计

PySpark:具有不同列的 DataFrames 的动态联合

将列的内容拆分为pyspark中的行

Pyspark/Hive 中带条件的加权运行总计

Pyspark:用同名的另一列替换行值

SQL/PySpark:创建一个包含过去 n 天的行数的新列