如何将行转换为pyspark中的字典列表?
Posted
技术标签:
【中文标题】如何将行转换为pyspark中的字典列表?【英文标题】:How to convert rows into a list of dictionaries in pyspark? 【发布时间】:2018-03-22 15:10:22 【问题描述】:我在 pyspark 中有一个 DataFrame(df),通过从配置单元表中读取:
df=spark.sql('select * from <table_name>')
+++++++++++++++++++++++++++++++++++++++++++
| Name | URL visited |
+++++++++++++++++++++++++++++++++++++++++++
| person1 | [google,msn,yahoo] |
| person2 | [fb.com,airbnb,wired.com] |
| person3 | [fb.com,google.com] |
+++++++++++++++++++++++++++++++++++++++++++
当我尝试以下时,出现错误
df_dict = dict(zip(df['name'],df['url']))
"TypeError: zip argument #1 must support iteration."
type(df.name) is of 'pyspark.sql.column.Column'
我如何创建一个像下面这样的字典,以后可以迭代
'person1':'google','msn','yahoo'
'person2':'fb.com','airbnb','wired.com'
'person3':'fb.com','google.com'
感谢您的想法和帮助。
【问题讨论】:
您的示例输出不是有效的字典。 【参考方案1】:我觉得你可以试试row.asDict()
,这段代码直接在executor上运行,你不用在driver上收集数据。
类似:
df.rdd.map(lambda row: row.asDict())
【讨论】:
请注意,这将生成以下形式的行:"Name": "person1", "URL Visited": ["google","msn","yahoo"]
,这不是 OP 要求的输出,但很容易更改 map
函数来解决此问题。【参考方案2】:
使用 pyspark Row.as_Dict()
方法怎么样?这是数据帧 API 的一部分(我理解为撰写本文时的“推荐”API),根本不需要您使用 RDD API。
df_list_of_dict = [row.asDict() for row in df.collect()]
type(df_list_of_dict), type(df_list_of_dict[0])
#(<class 'list'>, <class 'dict'>)
df_list_of_dict
#['person1': ['google','msn','yahoo'],
# 'person2': ['fb.com','airbnb','wired.com'],
# 'person3': ['fb.com','google.com']]
【讨论】:
【参考方案3】:如果您希望将结果保存在 python 字典中,可以使用 collect()
1 将数据导入本地内存,然后根据需要处理输出。
首先收集数据:
df_dict = df.collect()
#[Row(Name=u'person1', URL visited=[u'google', u'msn,yahoo']),
# Row(Name=u'person2', URL visited=[u'fb.com', u'airbnb', u'wired.com']),
# Row(Name=u'person3', URL visited=[u'fb.com', u'google.com'])]
这将返回pyspark.sql.Row
对象的列表。您可以轻松地将其转换为dict
s 的列表:
df_dict = [r['Name']: r['URL visited'] for r in df_dict]
#[u'person1': [u'google', u'msn,yahoo'],
# u'person2': [u'fb.com', u'airbnb', u'wired.com'],
# u'person3': [u'fb.com', u'google.com']]
1 请注意,对于大型数据集,此操作可能会很慢,并且可能会因内存不足错误而失败。您应该首先考虑这是否是您真正想要做的事情,因为将数据带入本地内存会失去 spark 的并行化优势。
【讨论】:
【参考方案4】:给定:
+++++++++++++++++++++++++++++++++++++++++++
| Name | URL visited |
+++++++++++++++++++++++++++++++++++++++++++
| person1 | [google,msn,yahoo] |
| person2 | [fb.com,airbnb,wired.com] |
| person3 | [fb.com,google.com] |
+++++++++++++++++++++++++++++++++++++++++++
这应该可行:
df_dict = df \
.rdd \
.map(lambda row: row[0]: row[1]) \
.collect()
df_dict
#['person1': ['google','msn','yahoo'],
# 'person2': ['fb.com','airbnb','wired.com'],
# 'person3': ['fb.com','google.com']]
这种方式你只是在处理后收集。
请告诉我这是否适合你:)
【讨论】:
以上是关于如何将行转换为pyspark中的字典列表?的主要内容,如果未能解决你的问题,请参考以下文章