如何将字典列表转换为 Pyspark DataFrame

Posted

技术标签:

【中文标题】如何将字典列表转换为 Pyspark DataFrame【英文标题】:How to convert list of dictionaries into Pyspark DataFrame 【发布时间】:2018-09-08 19:41:49 【问题描述】:

我想将我的字典列表转换为 DataFrame。这是列表:

mylist = 
[
  "type_activity_id":1,"type_activity_name":"xxx",
  "type_activity_id":2,"type_activity_name":"yyy",
  "type_activity_id":3,"type_activity_name":"zzz"
]

这是我的代码:

from pyspark.sql.types import StringType

df = spark.createDataFrame(mylist, StringType())

df.show(2,False)

+-----------------------------------------+
|                                    value|
+-----------------------------------------+
|type_activity_id=1,type_activity_id=xxx|
|type_activity_id=2,type_activity_id=yyy|
|type_activity_id=3,type_activity_id=zzz|
+-----------------------------------------+

我假设我应该为每一列提供一些映射和类型,但我不知道该怎么做。

更新:

我也试过这个:

schema = ArrayType(
    StructType([StructField("type_activity_id", IntegerType()),
                StructField("type_activity_name", StringType())
                ]))
df = spark.createDataFrame(mylist, StringType())
df = df.withColumn("value", from_json(df.value, schema))

然后我得到null 值:

+-----+
|value|
+-----+
| null|
| null|
+-----+

【问题讨论】:

【参考方案1】:

过去,您可以简单地将字典传递给 spark.createDataFrame(),但现在已弃用:

mylist = [
  "type_activity_id":1,"type_activity_name":"xxx",
  "type_activity_id":2,"type_activity_name":"yyy",
  "type_activity_id":3,"type_activity_name":"zzz"
]
df = spark.createDataFrame(mylist)
#UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead
#  warnings.warn("inferring schema from dict is deprecated,"

正如此警告消息所说,您应该改用pyspark.sql.Row

from pyspark.sql import Row
spark.createDataFrame(Row(**x) for x in mylist).show(truncate=False)
#+----------------+------------------+
#|type_activity_id|type_activity_name|
#+----------------+------------------+
#|1               |xxx               |
#|2               |yyy               |
#|3               |zzz               |
#+----------------+------------------+

这里我使用** (keyword argument unpacking) 将字典传递给Row 构造函数。

【讨论】:

谢谢。你知道它为什么被弃用吗? 我不知道为什么。顺便说一句,这可能比转换为/从 json 更快。 但是当每个字典(数组元素)的结构不同时,这可能不起作用。 使用 PySpark 3.0.0 中的 spark.createDataFrame(Row(**x) for x in mylist) 方法,我遇到了下游问题,即值被放置在错误的列中。可能与issues.apache.org/jira/browse/SPARK-26200有关 如何确保 dict 中的值是正确的类型,或者在必要时进行类型转换?【参考方案2】:

你可以这样做。您将获得一个包含 2 列的数据框。

mylist = [
  "type_activity_id":1,"type_activity_name":"xxx",
  "type_activity_id":2,"type_activity_name":"yyy",
  "type_activity_id":3,"type_activity_name":"zzz"
]

myJson = sc.parallelize(mylist)
myDf = sqlContext.read.json(myJson)

输出:

+----------------+------------------+
|type_activity_id|type_activity_name|
+----------------+------------------+
|               1|               xxx|
|               2|               yyy|
|               3|               zzz|
+----------------+------------------+

【讨论】:

@Markus 如果mylist 是一个rdd。你可以做spark.read.json(sc.parallelize(mylist))【参考方案3】:

在 Spark 2.4 版中,可以直接使用 df=spark.createDataFrame(mylist)

>>> mylist = [
...   "type_activity_id":1,"type_activity_name":"xxx",
...   "type_activity_id":2,"type_activity_name":"yyy",
...   "type_activity_id":3,"type_activity_name":"zzz"
... ]
>>> df1=spark.createDataFrame(mylist)
>>> df1.show()
+----------------+------------------+
|type_activity_id|type_activity_name|
+----------------+------------------+
|               1|               xxx|
|               2|               yyy|
|               3|               zzz|
+----------------+------------------+

【讨论】:

尽管UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead,它仍然给了我这个警告【参考方案4】:

从字典列表创建dataframe 时,我也遇到了同样的问题。 我已经使用namedtuple 解决了这个问题。

以下是我使用提供的数据的代码。

from collections import namedtuple
final_list = []
mylist = ["type_activity_id":1,"type_activity_name":"xxx",
          "type_activity_id":2,"type_activity_name":"yyy", 
          "type_activity_id":3,"type_activity_name":"zzz"
         ]
ExampleTuple = namedtuple('ExampleTuple', ['type_activity_id', 'type_activity_name'])

for my_dict in mylist:
    namedtupleobj = ExampleTuple(**my_dict)
    final_list.append(namedtupleobj)

sqlContext.createDataFrame(final_list).show(truncate=False)

输出

+----------------+------------------+
|type_activity_id|type_activity_name|
+----------------+------------------+
|1               |xxx               |
|2               |yyy               |
|3               |zzz               |
+----------------+------------------+

我的版本信息如下

spark: 2.4.0
python: 3.6

不必有my_list 变量。因为它可用,所以我用它来创建 namedtuple 对象,否则可以直接创建 namedtuple 对象。

【讨论】:

以上是关于如何将字典列表转换为 Pyspark DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

将标准 python 键值字典列表转换为 pyspark 数据框

将 pyspark 数据框转换为 python 字典列表

Pyspark 将列表列转换为嵌套结构列

将嵌套字典键值转换为 pyspark 数据框

如何在 PySpark 中将字符串转换为字典 (JSON) 的 ArrayType

如何将 json 对象列表转换为单个 pyspark 数据框?