有条件地在 pyspark 中添加一个新列

Posted

技术标签:

【中文标题】有条件地在 pyspark 中添加一个新列【英文标题】:Conditionally adding a new column in pyspark 【发布时间】:2021-12-27 18:43:11 【问题描述】:

我有一些来自 json 的示例数据,看起来类似于以下内容:

 hero: axe, attribute: strength, active_abilities: [q, w, r], inactive_abilities: e 
 hero: invoker, attribute: intelligence, active_abilities: [q, w, e, r, f, d], inactive_abilities: null 
 hero: phantom assassin, attribute: agility, active_abilities: [q, w, e], inactive_abilities: r 
 hero: life stealer, attribute: strength, active_abilities: [q, r], inactive_abilities: [w, e] 

我遇到的问题是,由于该列中可能存在的数据类型的可变性,“inactive_abilities”列被读取为字符串。数据可能为空、单个字符串(如果只有 1 个能力)、一个数组(如果有多个能力)。我最终想要的是根据“inactive_abilities”的数量创建几个新列。如果只有 1 个或 null 能力,我想要一个新列 inactive_ability 仅在有一个不活动能力时才会填充,如果没有或有多个不活动能力则为 null。然后我想要多个列,如 inactive_ability1、inactive_ability2、inactive_ability3 等......在数组持有 > 1 值的情况下。所以从上面的例子来看,最终的结果应该是这样的:

 hero: axe, attribute: strength, active_abilities: [q, w, r], inactive_abilities: e , inactive_ability: e, inactive_ability1: null, inactive_ability2: null, inactive_ability3, null, inactive_ability4: null
 hero: invoker, attribute: intelligence, active_abilities: [q, w, e, r, f, d], inactive_abilities: null, inactive_ability: null, inactive_ability1: null, inactive_ability2: null, inactive_ability3, null, inactive_ability4: null 
 hero: phantom assassin, attribute: agility, active_abilities: [q, w, e], inactive_abilities: r, inactive_ability: r, inactive_ability1: null, inactive_ability2: null, inactive_ability3, null, inactive_ability4: null 
 hero: life stealer, attribute: strength, active_abilities: [q, r], inactive_abilities: [w, e], inactive_ability: null, inactive_ability1: w, inactive_ability2: e, inactive_ability3, null, inactive_ability4: null 

我不能假设会有固定数量的“inactive_abilities”,但如果超过 4 个,其余的可以忽略。我遇到问题的部分是能够将字段转换为数组并在适当的时候读取它,然后根据上述条件创建和填充新列。

【问题讨论】:

【参考方案1】:

字符串inactive_abilities 列可以通过删除[] 并用, 拆分字符串来转换为数组。

from pyspark.sql import functions as F

data = ["hero": "axe", "attribute": "strength", "active_abilities": ["q", "w", "r"], "inactive_abilities": "e",
        "hero": "invoker", "attribute": "intelligence", "active_abilities": ["q", "w", "e", "r", "f", "d"],
         "inactive_abilities": None,
        "hero": "phantom assassin", "attribute": "agility", "active_abilities": ["q", "w", "e"],
         "inactive_abilities": "r",
        "hero": "life stealer", "attribute": "strength", "active_abilities": ["q", "r"],
         "inactive_abilities": "[w, e]", ]

df = spark.createDataFrame(data)

array_select_expr = [
    F.when(F.size("parsed_inactive_abilities") > 1, F.trim(F.col("parsed_inactive_abilities")[i])).alias(
        f"inactive_abilityi") if i > 0 else F.when(
        F.size("parsed_inactive_abilities") == 1, F.trim(F.col("parsed_inactive_abilities")[0])).alias(
        "inactive_abilities")
    for i in range(0, 5)]

(df.withColumn("parsed_inactive_abilities", F.split(F.regexp_replace(F.col("inactive_abilities"), "[\[\]]", ""), ","))
   .select("*", *array_select_expr)
   .drop("parsed_inactive_abilities").show())

输出

+------------------+------------+----------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+
|  active_abilities|   attribute|            hero|inactive_abilities|inactive_abilities|inactive_ability1|inactive_ability2|inactive_ability3|inactive_ability4|
+------------------+------------+----------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+
|         [q, w, r]|    strength|             axe|                 e|                 e|             null|             null|             null|             null|
|[q, w, e, r, f, d]|intelligence|         invoker|              null|              null|             null|             null|             null|             null|
|         [q, w, e]|     agility|phantom assassin|                 r|                 r|             null|             null|             null|             null|
|            [q, r]|    strength|    life stealer|            [w, e]|              null|                e|             null|             null|             null|
+------------------+------------+----------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+


# Value in array_select_expr

[Column<'CASE WHEN (size(parsed_inactive_abilities) = 1) THEN trim(parsed_inactive_abilities[0]) END AS inactive_abilities'>,
 Column<'CASE WHEN (size(parsed_inactive_abilities) > 1) THEN trim(parsed_inactive_abilities[1]) END AS inactive_ability1'>,
 Column<'CASE WHEN (size(parsed_inactive_abilities) > 1) THEN trim(parsed_inactive_abilities[2]) END AS inactive_ability2'>,
 Column<'CASE WHEN (size(parsed_inactive_abilities) > 1) THEN trim(parsed_inactive_abilities[3]) END AS inactive_ability3'>,
 Column<'CASE WHEN (size(parsed_inactive_abilities) > 1) THEN trim(parsed_inactive_abilities[4]) END AS inactive_ability4'>]

【讨论】:

以上是关于有条件地在 pyspark 中添加一个新列的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 基于 if 和 else 条件创建新列

如何在条件下在pyspark上创建一个新列?

多列上的pyspark条件并返回新列

在 Pyspark 中的 .withColumn 中编写自定义条件

在 pyspark 数据框中循环遍历两列时将值添加到新列

PySpark:基于数据框中具有 UUID 的列添加新列