如何在pyspark中压缩两列? [复制]

Posted

技术标签:

【中文标题】如何在pyspark中压缩两列? [复制]【英文标题】:How to zip two column in pyspark? [duplicate] 【发布时间】:2018-06-07 05:53:46 【问题描述】:

我使用:Python 3.6PySpark 2.3.0。在下面的例子中,我在item 中只有两个项目,但我还可以拥有更多信息,例如first_namelast_namecity

我有一个具有以下架构的数据框:

|-- email: string (nullable = true)
| -- item: struct(nullable=true)
| | -- item: array(nullable=true)
| | | -- element: struct(containsNull=true)
| | | | -- data: string(nullable=true)
| | | | -- fieldid: string(nullable=true)
| | | | -- fieldname: string(nullable=true)
| | | | -- fieldtype: string(nullable=true)

这是我的输出:

+-----+-----------------------------------------------------------------------------------------+
|email|item                                                                                     |
+-----+-----------------------------------------------------------------------------------------+
|x    |[[[Gmail, 32, Email Client, dropdown], [Device uses Proxy Server, 33, Device, dropdown]]]|
|y    |[[[IE, 32, Email Client, dropdown], [Personal computer, 33, Device, dropdown]]]          |
+-----+-----------------------------------------------------------------------------------------+

我想将此数据框转换为:

+-----+-------------------------------------+
|email|Email Client|Device                  |
+-----+-------------------------------------+
|x    |Gmail       |Device uses Proxy Server|
|y    |IE          |Personal computer       |
+-----+-------------------------------------+

我做了一些转换:

df = df.withColumn('item', df.item.item)
df = df.withColumn('column_names', df.item.fieldname)
df = df.withColumn('column_values', df.item.data)

现在我的输出是:

+-----+----------------------+---------------------------------+
|email|column_names          |column_values                    |
+-----+----------------------+---------------------------------+
|x    |[Email Client, Device]|[Gmail, Device uses Proxy Server]|
|y    |[Email Client, Device]|[IE, Personal computer]          |
+-----+----------------------+---------------------------------+

从这里我想要一种方法来压缩这些列。

【问题讨论】:

【参考方案1】:

您询问了如何zip 数组,但实际上您无需创建column_namescolumn_values 列的中间步骤即可获得所需的输出。

使用getItem() 函数按索引获取所需值:

import pyspark.sql.functions as f
df = df.select(
    'email',
    f.col('item.data').getItem(0).alias('Email Client'),
    f.col('item.data').getItem(1).alias('Device')
)
df.show(truncate=False)
#+-----+------------+------------------------+
#|email|Email Client|Device                  |
#+-----+------------+------------------------+
#|x    |Gmail       |Device uses Proxy Server|
#|y    |IE          |Personal computer       |
#+-----+------------+------------------------+

这假设 Email Client 字段始终位于索引 0 处,而 Device 位于索引 1 处。


如果您不能假设字段在每一行中的顺序始终相同,另一种选择是使用pyspark.sql.functions.create_map()column_namescolumn_values 中的值创建一个映射。

这个函数需要一个:

列名(字符串)列表或 [被] 分组为键值对的列表达式列表,例如(key1, value1, key2, value2, ...)。

我们遍历column_namescolumn_values 中的项目以创建配对列表,然后使用list(chain.from_iterable(...)) 将列表展平。

列表制作完成后,可以按名称选择字段。

from itertools import chain

# first create a map type column called 'map'
df.select(
    'email',
    f.create_map(
        list(
            chain.from_iterable(
                [[f.col('column_names').getItem(i), f.col('column_values').getItem(i)] 
                 for i in range(2)]
            )
        )
    ).alias('map')
)
df.show(truncte=False)
#+-----+--------------------------------------------------------------+
#|email|map                                                           |
#+-----+--------------------------------------------------------------+
#|x    |Map(Email Client -> Gmail, Device -> Device uses Proxy Server)|
#|y    |Map(Email Client -> IE, Device -> Personal computer)          |
#+-----+--------------------------------------------------------------+

# now select the fields by key
df = df.select(
    'email',
    f.col('map').getField("Email Client").alias("Email Client"),
    f.col('map').getField("Device").alias("Device")
)

这假设每个数组中总是至少有 2 个元素。


如果您想压缩任意长度的列表,您必须使用udf

# define the udf
zip_lists = f.udf(lambda x, y: [list(z) for z in zip(x, y)], ArrayType(StringType()))

# use the udf to zip the lists
df.select(
    'email',
    zip_lists(f.col('column_names'), f.col('column_values')).alias('zipped')
).show(truncate=False)
#+-----+-----------------------------------------------------------+
#|email|zipped                                                     |
#+-----+-----------------------------------------------------------+
#|x    |[[Email Client, Gmail], [Device, Device uses Proxy Server]]|
#|y    |[[Email Client, IE], [Device, Personal computer]]          |
#+-----+-----------------------------------------------------------+

或者您可以使用udf 创建地图:

make_map = f.udf(lambda x, y: dict(zip(x, y)), MapType(StringType(), StringType()))
df.select(
    'email',
    make_map(f.col('column_names'), f.col('column_values')).alias('map')
).show(truncate=False)
#+-----+--------------------------------------------------------------+
#|email|map                                                           |
#+-----+--------------------------------------------------------------+
#|x    |Map(Device -> Device uses Proxy Server, Email Client -> Gmail)|
#|y    |Map(Device -> Personal computer, Email Client -> IE)          |
#+-----+--------------------------------------------------------------+

【讨论】:

以上是关于如何在pyspark中压缩两列? [复制]的主要内容,如果未能解决你的问题,请参考以下文章

PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?

在 Pyspark 中,我如何比较两列并在它们不相同时使用 x

pyspark如何使用两列编写UDF

如何使用pyspark将两列值组合到另一列?

如何比较pyspark中两个不同数据帧中的两列

如果 pyspark 数据帧的行基于两列的值位于另一个数据帧中,如何删除它们?