如何将行转换为pyspark中的字典列表?

Posted

技术标签:

【中文标题】如何将行转换为pyspark中的字典列表?【英文标题】:How to convert rows into a list of dictionaries in pyspark? 【发布时间】:2018-03-22 15:10:22 【问题描述】:

我在 pyspark 中有一个 DataFrame(df),通过从配置单元表中读取:

df=spark.sql('select * from <table_name>')


+++++++++++++++++++++++++++++++++++++++++++
|  Name    |    URL visited               |
+++++++++++++++++++++++++++++++++++++++++++
|  person1 | [google,msn,yahoo]           |
|  person2 | [fb.com,airbnb,wired.com]    |
|  person3 | [fb.com,google.com]          |
+++++++++++++++++++++++++++++++++++++++++++

当我尝试以下时,出现错误

df_dict = dict(zip(df['name'],df['url']))
"TypeError: zip argument #1 must support iteration."

type(df.name) is of 'pyspark.sql.column.Column'

我如何创建一个像下面这样的字典,以后可以迭代

'person1':'google','msn','yahoo'
'person2':'fb.com','airbnb','wired.com'
'person3':'fb.com','google.com'

感谢您的想法和帮助。

【问题讨论】:

您的示例输出不是有效的字典。 【参考方案1】:

我觉得你可以试试row.asDict(),这段代码直接在executor上运行,你不用在driver上收集数据。

类似:

df.rdd.map(lambda row: row.asDict())

【讨论】:

请注意,这将生成以下形式的行:"Name": "person1", "URL Visited": ["google","msn","yahoo"] ,这不是 OP 要求的输出,但很容易更改 map 函数来解决此问题。【参考方案2】:

使用 pyspark Row.as_Dict() 方法怎么样?这是数据帧 API 的一部分(我理解为撰写本文时的“推荐”API),根本不需要您使用 RDD API。

df_list_of_dict = [row.asDict() for row in df.collect()]

type(df_list_of_dict), type(df_list_of_dict[0])
#(<class 'list'>, <class 'dict'>)

df_list_of_dict
#['person1': ['google','msn','yahoo'],
# 'person2': ['fb.com','airbnb','wired.com'],
# 'person3': ['fb.com','google.com']]

【讨论】:

【参考方案3】:

如果您希望将结果保存在 python 字典中,可以使用 collect()1 将数据导入本地内存,然后根据需要处理输出。

首先收集数据:

df_dict = df.collect()
#[Row(Name=u'person1', URL visited=[u'google', u'msn,yahoo']),
# Row(Name=u'person2', URL visited=[u'fb.com', u'airbnb', u'wired.com']),
# Row(Name=u'person3', URL visited=[u'fb.com', u'google.com'])]

这将返回pyspark.sql.Row 对象的列表。您可以轻松地将其转换为dicts 的列表:

df_dict = [r['Name']: r['URL visited'] for r in df_dict]
#[u'person1': [u'google', u'msn,yahoo'],
# u'person2': [u'fb.com', u'airbnb', u'wired.com'],
# u'person3': [u'fb.com', u'google.com']]

1 请注意,对于大型数据集,此操作可能会很慢,并且可能会因内存不足错误而失败。您应该首先考虑这是否是您真正想要做的事情,因为将数据带入本地内存会失去 spark 的并行化优势。

【讨论】:

【参考方案4】:

给定:

+++++++++++++++++++++++++++++++++++++++++++
|  Name    |    URL visited               |
+++++++++++++++++++++++++++++++++++++++++++
|  person1 | [google,msn,yahoo]           |
|  person2 | [fb.com,airbnb,wired.com]    |
|  person3 | [fb.com,google.com]          |
+++++++++++++++++++++++++++++++++++++++++++

这应该可行:

df_dict = df \
    .rdd \
    .map(lambda row: row[0]: row[1]) \
    .collect()

df_dict

#['person1': ['google','msn','yahoo'],
# 'person2': ['fb.com','airbnb','wired.com'],
# 'person3': ['fb.com','google.com']]

这种方式你只是在处理后收集。

请告诉我这是否适合你:)

【讨论】:

以上是关于如何将行转换为pyspark中的字典列表?的主要内容,如果未能解决你的问题,请参考以下文章

如何将字典列表转换为 Pyspark DataFrame

PySpark:如何将行转换为向量?

将标准 python 键值字典列表转换为 pyspark 数据框

将 pyspark 数据框转换为 python 字典列表

pyspark 将行转换为列

pyspark 行列表的 RDD 到 DataFrame