如何在字典中使用 pyspark.sql.functions.when() 的多个条件?

Posted

技术标签:

【中文标题】如何在字典中使用 pyspark.sql.functions.when() 的多个条件?【英文标题】:How do I use multiple conditions with pyspark.sql.funtions.when() from a dict? 【发布时间】:2019-08-15 23:34:36 【问题描述】:

我想根据字典中的值生成一个 when 子句。它与正在做的事情非常相似How do I use multiple conditions with pyspark.sql.funtions.when()?

只有我想传递一个列和值的字典

假设我有一个字典:


  'employed': 'Y',
  'athlete': 'N'

我想用那个 dict 来生成等价的:

df.withColumn("call_person",when((col("employed") == "Y") & (col("athlete") == "N"), "Y")

所以最终结果是:

+---+-----------+--------+-------+
| id|call_person|employed|athlete|
+---+-----------+--------+-------+
|  1|     Y     |    Y   |   N   |
|  2|     N     |    Y   |   Y   |
|  3|     N     |    N   |   N   |
+---+-----------+--------+-------+

请注意我想以编程方式执行此操作的部分原因是我有不同长度的字典(条件数)

【问题讨论】:

【参考方案1】:

使用reduce()函数:

from functools import reduce
from pyspark.sql.functions import when, col

# dictionary
d = 
  'employed': 'Y',
  'athlete': 'N'


# set up the conditions, multiple conditions merged with `&`
cond = reduce(lambda x,y: x&y, [ col(c) == v for c,v in d.items() if c in df.columns ])

# set up the new column
df.withColumn("call_person", when(cond, "Y").otherwise("N")).show()
+---+--------+-------+-----------+
| id|employed|athlete|call_person|
+---+--------+-------+-----------+
|  1|       Y|      N|          Y|
|  2|       Y|      Y|          N|
|  3|       N|      N|          N|
+---+--------+-------+-----------+

【讨论】:

是的,你说得对 - 使用 reduce 真的很有趣【参考方案2】:

您也可以直接访问字典项:

dict =
  'code': 'b',
  'amt': '4'
  
list = [(1, 'code'),(1,'amt')]
df=spark.createDataFrame(list, ['id', 'dict_key'])

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

user_func =  udf (lambda x: dict.get(x), StringType())
newdf = df.withColumn('new_column',user_func(df.dict_key))

>>> newdf.show();
+---+--------+----------+
| id|dict_key|new_column|
+---+--------+----------+
|  1|    code|         b|
|  1|     amt|         4|
+---+--------+----------+

或广播字典

broadcast_dict = sc.broadcast(dict)

def my_func(key):
    return broadcast_dict.value.get(key)

new_my_func = udf(my_func, StringType())

newdf = df.withColumn('new_column',new_my_func(df.dict_key))
>>> newdf.show();
+---+--------+----------+
| id|dict_key|new_column|
+---+--------+----------+
|  1|    code|         b|
|  1|     amt|         4|
+---+--------+----------+

【讨论】:

我继续添加了一些结果,b/c 这不是我想要完成的。感谢您的回复,我很抱歉在原始帖子中没有更清楚

以上是关于如何在字典中使用 pyspark.sql.functions.when() 的多个条件?的主要内容,如果未能解决你的问题,请参考以下文章

如何在“可编码”响应中使用字典?

如何在 [关闭] 中使用字典值填充 fstring

如何在 Hive 中使用字典进行映射?

如何在Excel VBA中使用字典Dictionary对象

如何在 SwiftUI 中使用字典创建一个数组,然后调用它们?

如何在 C# 中使用 LINQ 过滤嵌套字典?