Pyspark 从具有不同列的行/数据创建 DataFrame
Posted
技术标签:
【中文标题】Pyspark 从具有不同列的行/数据创建 DataFrame【英文标题】:Pyspark create DataFrame from rows/data with varying columns 【发布时间】:2018-11-29 14:35:15 【问题描述】:我有多个键/值对的数据/行,键的数量未知——有些重叠,有些没有——我想从中创建一个 Spark DataFrame。我的最终目标是从此 DataFrame 编写 CSV。
我对输入数据/行具有灵活性:它们最容易是 JSON 字符串,但可以通过 可能重叠的键进行转换:
"color":"red", "animal":"fish"
"color":"green", "animal":"panda"
"color":"red", "animal":"panda", "fruit":"watermelon"
"animal":"aardvark"
"color":"blue", "fruit":"apple"
理想情况下,我想根据这些数据创建一个如下所示的 DataFrame:
-----------------------------
color | animal | fruit
-----------------------------
red | fish | null
green | panda | null
red | panda | watermelon
null | aardvark | null
blue | null | apple
-----------------------------
值得注意的是,没有特定键的数据/行是null
,数据/行中的所有键都表示为列。
我对 Spark 的许多基础知识感到相对自在,但在设想一个有效地使用键/值对获取我的 RDD/DataFrame 的过程时遇到了困难 -- 但列和键的数量未知 -- 并创建一个以这些键为列的 DataFrame。
高效,因为如果可能,我想避免创建一个所有输入行都保存在内存中的对象(例如单个字典)。
再一次,编写 CSV 的最终目标是,我假设创建一个 DataFrame 是实现这一目标的合乎逻辑的步骤。
另一个皱纹:
一些数据将是多值的,例如:
"color":"pink", "animal":["fish","mustang"]
"color":["orange","purple"], "animal":"panda"
使用提供的分隔符,例如/
为避免与 ,
冲突以分隔列,我想在列的输出中分隔这些,例如:
------------------------------------
color | animal | fruit
------------------------------------
pink | fish/mustang | null
orange/purple | panda | null
------------------------------------
一旦有了解决主要问题的方法,我相信我可以解决这部分问题,但无论如何都会把它扔掉,因为这将是问题的一个维度。
【问题讨论】:
你试过df = spark.read.json("myfile.json")
。在你的第一个例子中似乎对我有用。 更新:它也适用于您的第二个示例,但将所有记录视为字符串,因此您必须执行一些 regex to convert the string representation of the list 以按照您想要的方式对其进行格式化。
感谢@pault 的想法。我正要说它可能行不通,因为我的数据实际上来自 DataFrame,我正在将单列 XML 转换为 JSON 字符串。但这很有趣,我可以用 JSON 行创建一个 RDD,编写它,然后读取它?还是有另一种方法可以从 RDD 模拟 .json()
方法,而不是从读取外部位置?
意识到read.json()
也可能接受 RDD,spark.apache.org/docs/latest/api/python/…,试一试...
【参考方案1】:
从文件中读取
如果您的数据存储在一个文件中(假设它被命名为myfile.json
),如下所示:
"color":"red", "animal":"fish"
"color":"green", "animal":"panda"
"color":"red", "animal":"panda", "fruit":"watermelon"
"animal":"aardvark"
"color":"blue", "fruit":"apple"
"color":"pink", "animal":["fish","mustang"]
"color":["orange","purple"], "animal":"panda"
您可以使用pyspark.sql.DataFrameReader.json
将文件读取为换行符分隔的 JSON 记录。
df = spark.read.json("myfile.json")
df.show()
#+------------------+-------------------+----------+
#| animal| color| fruit|
#+------------------+-------------------+----------+
#| fish| red| null|
#| panda| green| null|
#| panda| red|watermelon|
#| aardvark| null| null|
#| null| blue| apple|
#|["fish","mustang"]| pink| null|
#| panda|["orange","purple"]| null|
#+------------------+-------------------+----------+
df.printSchema()
#root
# |-- animal: string (nullable = true)
# |-- color: string (nullable = true)
# |-- fruit: string (nullable = true)
从 RDD 读取
您也可以这样做来读取rdd
:
import json
rdd = sc.parallelize(
map(
json.dumps,
[
"color":"red", "animal":"fish",
"color":"green", "animal":"panda",
"color":"red", "animal":"panda", "fruit":"watermelon",
"animal":"aardvark",
"color":"blue", "fruit":"apple",
"color":"pink", "animal":["fish","mustang"],
"color":["orange","purple"], "animal":"panda"
]
)
)
df = spark.read.json(rdd)
对于第二部分,您可以根据需要使用pyspark.sql.functions.regexp_replace
来格式化您的多值记录。
from pyspark.sql.functions import regexp_replace
def format_column(column):
return regexp_replace(regexp_replace(column, '(^\[)|(\]$)|(")', ''), ",", "/")
df.select(*[format_column(c).alias(c) for c in df.columns]).show()
#+------------+-------------+----------+
#| animal| color| fruit|
#+------------+-------------+----------+
#| fish| red| null|
#| panda| green| null|
#| panda| red|watermelon|
#| aardvark| null| null|
#| null| blue| apple|
#|fish/mustang| pink| null|
#| panda|orange/purple| null|
#+------------+-------------+----------+
【讨论】:
直到现在我才意识到使用spark.read.json
读取 RDD 是可能的,这打破了其余部分。非常感谢!用于分解多值的正则表达式语法相同(这就是我在非 Spark 上下文中这样做的方式)。以上是关于Pyspark 从具有不同列的行/数据创建 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章