如何在pyspark中压缩两列? [复制]
Posted
技术标签:
【中文标题】如何在pyspark中压缩两列? [复制]【英文标题】:How to zip two column in pyspark? [duplicate] 【发布时间】:2018-06-07 05:53:46 【问题描述】:我使用:Python 3.6
和 PySpark 2.3.0
。在下面的例子中,我在item
中只有两个项目,但我还可以拥有更多信息,例如first_name
、last_name
、city
。
我有一个具有以下架构的数据框:
|-- email: string (nullable = true)
| -- item: struct(nullable=true)
| | -- item: array(nullable=true)
| | | -- element: struct(containsNull=true)
| | | | -- data: string(nullable=true)
| | | | -- fieldid: string(nullable=true)
| | | | -- fieldname: string(nullable=true)
| | | | -- fieldtype: string(nullable=true)
这是我的输出:
+-----+-----------------------------------------------------------------------------------------+
|email|item |
+-----+-----------------------------------------------------------------------------------------+
|x |[[[Gmail, 32, Email Client, dropdown], [Device uses Proxy Server, 33, Device, dropdown]]]|
|y |[[[IE, 32, Email Client, dropdown], [Personal computer, 33, Device, dropdown]]] |
+-----+-----------------------------------------------------------------------------------------+
我想将此数据框转换为:
+-----+-------------------------------------+
|email|Email Client|Device |
+-----+-------------------------------------+
|x |Gmail |Device uses Proxy Server|
|y |IE |Personal computer |
+-----+-------------------------------------+
我做了一些转换:
df = df.withColumn('item', df.item.item)
df = df.withColumn('column_names', df.item.fieldname)
df = df.withColumn('column_values', df.item.data)
现在我的输出是:
+-----+----------------------+---------------------------------+
|email|column_names |column_values |
+-----+----------------------+---------------------------------+
|x |[Email Client, Device]|[Gmail, Device uses Proxy Server]|
|y |[Email Client, Device]|[IE, Personal computer] |
+-----+----------------------+---------------------------------+
从这里我想要一种方法来压缩这些列。
【问题讨论】:
【参考方案1】:您询问了如何zip
数组,但实际上您无需创建column_names
和column_values
列的中间步骤即可获得所需的输出。
使用getItem()
函数按索引获取所需值:
import pyspark.sql.functions as f
df = df.select(
'email',
f.col('item.data').getItem(0).alias('Email Client'),
f.col('item.data').getItem(1).alias('Device')
)
df.show(truncate=False)
#+-----+------------+------------------------+
#|email|Email Client|Device |
#+-----+------------+------------------------+
#|x |Gmail |Device uses Proxy Server|
#|y |IE |Personal computer |
#+-----+------------+------------------------+
这假设 Email Client
字段始终位于索引 0 处,而 Device
位于索引 1 处。
如果您不能假设字段在每一行中的顺序始终相同,另一种选择是使用pyspark.sql.functions.create_map()
从column_names
和column_values
中的值创建一个映射。
这个函数需要一个:
列名(字符串)列表或 [被] 分组为键值对的列表达式列表,例如(key1, value1, key2, value2, ...)。
我们遍历column_names
和column_values
中的项目以创建配对列表,然后使用list(chain.from_iterable(...))
将列表展平。
列表制作完成后,可以按名称选择字段。
from itertools import chain
# first create a map type column called 'map'
df.select(
'email',
f.create_map(
list(
chain.from_iterable(
[[f.col('column_names').getItem(i), f.col('column_values').getItem(i)]
for i in range(2)]
)
)
).alias('map')
)
df.show(truncte=False)
#+-----+--------------------------------------------------------------+
#|email|map |
#+-----+--------------------------------------------------------------+
#|x |Map(Email Client -> Gmail, Device -> Device uses Proxy Server)|
#|y |Map(Email Client -> IE, Device -> Personal computer) |
#+-----+--------------------------------------------------------------+
# now select the fields by key
df = df.select(
'email',
f.col('map').getField("Email Client").alias("Email Client"),
f.col('map').getField("Device").alias("Device")
)
这假设每个数组中总是至少有 2 个元素。
如果您想压缩任意长度的列表,您必须使用udf
。
# define the udf
zip_lists = f.udf(lambda x, y: [list(z) for z in zip(x, y)], ArrayType(StringType()))
# use the udf to zip the lists
df.select(
'email',
zip_lists(f.col('column_names'), f.col('column_values')).alias('zipped')
).show(truncate=False)
#+-----+-----------------------------------------------------------+
#|email|zipped |
#+-----+-----------------------------------------------------------+
#|x |[[Email Client, Gmail], [Device, Device uses Proxy Server]]|
#|y |[[Email Client, IE], [Device, Personal computer]] |
#+-----+-----------------------------------------------------------+
或者您可以使用udf
创建地图:
make_map = f.udf(lambda x, y: dict(zip(x, y)), MapType(StringType(), StringType()))
df.select(
'email',
make_map(f.col('column_names'), f.col('column_values')).alias('map')
).show(truncate=False)
#+-----+--------------------------------------------------------------+
#|email|map |
#+-----+--------------------------------------------------------------+
#|x |Map(Device -> Device uses Proxy Server, Email Client -> Gmail)|
#|y |Map(Device -> Personal computer, Email Client -> IE) |
#+-----+--------------------------------------------------------------+
【讨论】:
以上是关于如何在pyspark中压缩两列? [复制]的主要内容,如果未能解决你的问题,请参考以下文章
PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?