如何根据 Pyspark 中数组列中的值创建新列
Posted
技术标签:
【中文标题】如何根据 Pyspark 中数组列中的值创建新列【英文标题】:How to create new column based on values in array column in Pyspark 【发布时间】:2018-07-17 13:57:28 【问题描述】:我有以下数据框,其中包含代表产品的代码:
testdata = [(0, ['a','b','d']), (1, ['c']), (2, ['d','e'])]
df = spark.createDataFrame(testdata, ['id', 'codes'])
df.show()
+---+---------+
| id| codes|
+---+---------+
| 0|[a, b, d]|
| 1| [c]|
| 2| [d, e]|
+---+---------+
假设代码a
和b
代表T 恤,代码c
代表毛衣。
tshirts = ['a','b']
sweaters = ['c']
如何创建一个列label
来检查这些代码是否在数组列中并返回产品名称。像这样:
+---+---------+--------+
| id| codes| label|
+---+---------+--------+
| 0|[a, b, d]| tshirts|
| 1| [c]|sweaters|
| 2| [d, e]| none|
+---+---------+--------+
我已经尝试了很多东西,其中一些不起作用:
codes =
'tshirts': ['a','b'],
'sweaters': ['c']
def any_isin(ref_values, array_to_search):
for key, values in ref_values.items():
if any(item in array_to_search for item in values):
return key
else:
return 'none'
any_isin_udf = lambda ref_values: (F.udf(lambda array_to_search: any_isin_mod(ref_values, array_to_search), StringType()))
df_labeled = df.withColumn('label', any_isin_udf(codes)(F.col('codes')))
df_labeled.show()
+---+---------+-------+
| id| codes| label|
+---+---------+-------+
| 0|[a, b, d]|tshirts|
| 1| [c]| none|
| 2| [d, e]| none|
+---+---------+-------+
【问题讨论】:
如果codes
里面有a,b,c
怎么办?
【参考方案1】:
我会用array_contains
表达。让我们将输入定义为dict
:
from pyspark.sql.functions import expr, lit, when
from operator import and_
from functools import reduce
label_map = "tshirts": ["a", "b"], "sweaters": ["c"]
接下来生成表达式:
expression_map =
label: reduce(and_, [expr("array_contains(codes, '')".format(code))
for code in codes]) for label, codes in label_map.items()
最后用CASE ... WHEN
减少它:
label = reduce(
lambda acc, kv: when(kv[1], lit(kv[0])).otherwise(acc),
expression_map.items(),
lit(None).cast("string")
).alias("label")
结果:
df.withColumn("label", label).show()
# +---+---------+--------+
# | id| codes| label|
# +---+---------+--------+
# | 0|[a, b, d]| tshirts|
# | 1| [c]|sweaters|
# | 2| [d, e]| null|
# +---+---------+--------+
【讨论】:
这太棒了!我做了一些测试,我可能不清楚的是,它可以是代表产品的任何代码。所以带代码的列不需要包含'a'和'b',而是'a'或'b'。我想我可以将 and_ 更改为 or_ :)【参考方案2】:首选非udf 方法,例如@user10055507 的answer 使用pyspark.sql.functions.array_contains()
,但这里解释了导致代码失败的原因:
错误是你在循环内调用return
,所以你永远不会遍历第一个key
。这是一种修改您的udf
以获得所需结果的方法:
import pyspark.sql.functions as f
codes =
'tshirts': ['a','b'],
'sweaters': ['c']
def any_isin(ref_values, array_to_search):
label = 'none'
for key, values in ref_values.items():
if any(item in array_to_search for item in values):
label=key
break
return label
any_isin_udf = lambda ref_values: (
f.udf(lambda array_to_search: any_isin(ref_values, array_to_search), StringType())
)
df_labeled = df.withColumn('label', any_isin_udf(codes)(f.col('codes')))
df_labeled.show()
#+---+---------+--------+
#| id| codes| label|
#+---+---------+--------+
#| 0|[a, b, d]| tshirts|
#| 1| [c]|sweaters|
#| 2| [d, e]| none|
#+---+---------+--------+
更新
这是使用join
的另一种非udf 方法:
先把codes
字典转成表格:
import pyspark.sql.functions as f
from itertools import chain
codes_df = spark.createDataFrame(
list(chain.from_iterable(zip([a]*len(b), b) for a, b in codes.items())),
["label", "code"]
)
codes_df.show()
#+--------+----+
#| label|code|
#+--------+----+
#| tshirts| a|
#| tshirts| b|
#|sweaters| c|
#+--------+----+
现在在表示代码数组是否包含代码的布尔值上进行df
和codes_df
的左连接:
df.alias('l')\
.join(
codes_df.alias('r'),
how='left',
on=f.expr('array_contains(l.codes, r.code)')
)\
.select('id', 'codes', 'label')\
.distinct()\
.show()
#+---+---------+--------+
#| id| codes| label|
#+---+---------+--------+
#| 2| [d, e]| null|
#| 0|[a, b, d]| tshirts|
#| 1| [c]|sweaters|
#+---+---------+--------+
【讨论】:
确实我现在明白了为什么我自己的代码失败了,非常有帮助。我认为这也可以。 @Cheryl 我用不同的方法添加了更新。以上是关于如何根据 Pyspark 中数组列中的值创建新列的主要内容,如果未能解决你的问题,请参考以下文章
如何编写一个简单的 for 循环,使用键值对根据旧列中的值填充新列?
根据其他列中的值在 python 3 (pandas) 数据框中创建新列