如何根据 Pyspark 中数组列中的值创建新列

Posted

技术标签:

【中文标题】如何根据 Pyspark 中数组列中的值创建新列【英文标题】:How to create new column based on values in array column in Pyspark 【发布时间】:2018-07-17 13:57:28 【问题描述】:

我有以下数据框,其中包含代表产品的代码:

testdata = [(0, ['a','b','d']), (1, ['c']), (2, ['d','e'])]
df = spark.createDataFrame(testdata, ['id', 'codes'])
df.show()
+---+---------+
| id|    codes|
+---+---------+
|  0|[a, b, d]|
|  1|      [c]|
|  2|   [d, e]|
+---+---------+

假设代码ab 代表T 恤,代码c 代表毛衣。

tshirts = ['a','b']
sweaters = ['c']

如何创建一个列label 来检查这些代码是否在数组列中并返回产品名称。像这样:

+---+---------+--------+
| id|    codes|   label|
+---+---------+--------+
|  0|[a, b, d]| tshirts|
|  1|      [c]|sweaters|
|  2|   [d, e]|    none|
+---+---------+--------+

我已经尝试了很多东西,其中一些不起作用:

codes = 
    'tshirts': ['a','b'],
    'sweaters': ['c']


def any_isin(ref_values, array_to_search):
    for key, values in ref_values.items():
        if any(item in array_to_search for item in values):
            return key
        else:
            return 'none'

any_isin_udf = lambda ref_values: (F.udf(lambda array_to_search: any_isin_mod(ref_values, array_to_search), StringType()))

df_labeled = df.withColumn('label', any_isin_udf(codes)(F.col('codes')))

df_labeled.show()
+---+---------+-------+
| id|    codes|  label|
+---+---------+-------+
|  0|[a, b, d]|tshirts|
|  1|      [c]|   none|
|  2|   [d, e]|   none|
+---+---------+-------+

【问题讨论】:

如果codes 里面有a,b,c 怎么办? 【参考方案1】:

我会用array_contains 表达。让我们将输入定义为dict

from pyspark.sql.functions import expr, lit, when
from operator import and_
from functools import reduce

label_map = "tshirts": ["a", "b"], "sweaters": ["c"]

接下来生成表达式:

expression_map = 
   label: reduce(and_, [expr("array_contains(codes, '')".format(code))
   for code in codes]) for label, codes in label_map.items()

最后用CASE ... WHEN减少它:

label = reduce(
    lambda acc, kv: when(kv[1], lit(kv[0])).otherwise(acc),
    expression_map.items(), 
    lit(None).cast("string")
).alias("label")

结果:

df.withColumn("label", label).show()
# +---+---------+--------+                                                        
# | id|    codes|   label|
# +---+---------+--------+
# |  0|[a, b, d]| tshirts|
# |  1|      [c]|sweaters|
# |  2|   [d, e]|    null|
# +---+---------+--------+

【讨论】:

这太棒了!我做了一些测试,我可能不清楚的是,它可以是代表产品的任何代码。所以带代码的列不需要包含'a'和'b',而是'a'或'b'。我想我可以将 and_ 更改为 or_ :)【参考方案2】:

首选非udf 方法,例如@user10055507 的answer 使用pyspark.sql.functions.array_contains(),但这里解释了导致代码失败的原因:

错误是你在循环内调用return,所以你永远不会遍历第一个key。这是一种修改您的udf 以获得所需结果的方法:

import pyspark.sql.functions as f

codes = 
    'tshirts': ['a','b'],
    'sweaters': ['c']


def any_isin(ref_values, array_to_search):
    label = 'none'
    for key, values in ref_values.items():
        if any(item in array_to_search for item in values):
            label=key
            break
    return label

any_isin_udf = lambda ref_values: (
    f.udf(lambda array_to_search: any_isin(ref_values, array_to_search), StringType())
)

df_labeled = df.withColumn('label', any_isin_udf(codes)(f.col('codes')))

df_labeled.show()
#+---+---------+--------+
#| id|    codes|   label|
#+---+---------+--------+
#|  0|[a, b, d]| tshirts|
#|  1|      [c]|sweaters|
#|  2|   [d, e]|    none|
#+---+---------+--------+

更新

这是使用join 的另一种非udf 方法:

先把codes字典转成表格:

import pyspark.sql.functions as f
from itertools import chain

codes_df = spark.createDataFrame(
    list(chain.from_iterable(zip([a]*len(b), b) for a, b in codes.items())),
    ["label", "code"]
)
codes_df.show()
#+--------+----+
#|   label|code|
#+--------+----+
#| tshirts|   a|
#| tshirts|   b|
#|sweaters|   c|
#+--------+----+

现在在表示代码数组是否包含代码的布尔值上进行dfcodes_df 的左连接:

df.alias('l')\
    .join(
        codes_df.alias('r'),
        how='left',
        on=f.expr('array_contains(l.codes, r.code)')
    )\
    .select('id', 'codes', 'label')\
    .distinct()\
    .show()
#+---+---------+--------+
#| id|    codes|   label|
#+---+---------+--------+
#|  2|   [d, e]|    null|
#|  0|[a, b, d]| tshirts|
#|  1|      [c]|sweaters|
#+---+---------+--------+

【讨论】:

确实我现在明白了为什么我自己的代码失败了,非常有帮助。我认为这也可以。 @Cheryl 我用不同的方法添加了更新。

以上是关于如何根据 Pyspark 中数组列中的值创建新列的主要内容,如果未能解决你的问题,请参考以下文章

如何编写一个简单的 for 循环,使用键值对根据旧列中的值填充新列?

如何过滤 PySpark 中数组列中的值?

根据其他列中的值在 python 3 (pandas) 数据框中创建新列

从 pyspark 数据框字符串列中获取第一个数值到新列中

如何根据 PySpark 数据框的另一列中的值修改列? F.当边缘情况

如何将字典中的值映射到 Pyspark 中的新列