有条件地在 pyspark 中添加一个新列
Posted
技术标签:
【中文标题】有条件地在 pyspark 中添加一个新列【英文标题】:Conditionally adding a new column in pyspark 【发布时间】:2021-12-27 18:43:11 【问题描述】:我有一些来自 json 的示例数据,看起来类似于以下内容:
hero: axe, attribute: strength, active_abilities: [q, w, r], inactive_abilities: e
hero: invoker, attribute: intelligence, active_abilities: [q, w, e, r, f, d], inactive_abilities: null
hero: phantom assassin, attribute: agility, active_abilities: [q, w, e], inactive_abilities: r
hero: life stealer, attribute: strength, active_abilities: [q, r], inactive_abilities: [w, e]
我遇到的问题是,由于该列中可能存在的数据类型的可变性,“inactive_abilities”列被读取为字符串。数据可能为空、单个字符串(如果只有 1 个能力)、一个数组(如果有多个能力)。我最终想要的是根据“inactive_abilities”的数量创建几个新列。如果只有 1 个或 null 能力,我想要一个新列 inactive_ability 仅在有一个不活动能力时才会填充,如果没有或有多个不活动能力则为 null。然后我想要多个列,如 inactive_ability1、inactive_ability2、inactive_ability3 等......在数组持有 > 1 值的情况下。所以从上面的例子来看,最终的结果应该是这样的:
hero: axe, attribute: strength, active_abilities: [q, w, r], inactive_abilities: e , inactive_ability: e, inactive_ability1: null, inactive_ability2: null, inactive_ability3, null, inactive_ability4: null
hero: invoker, attribute: intelligence, active_abilities: [q, w, e, r, f, d], inactive_abilities: null, inactive_ability: null, inactive_ability1: null, inactive_ability2: null, inactive_ability3, null, inactive_ability4: null
hero: phantom assassin, attribute: agility, active_abilities: [q, w, e], inactive_abilities: r, inactive_ability: r, inactive_ability1: null, inactive_ability2: null, inactive_ability3, null, inactive_ability4: null
hero: life stealer, attribute: strength, active_abilities: [q, r], inactive_abilities: [w, e], inactive_ability: null, inactive_ability1: w, inactive_ability2: e, inactive_ability3, null, inactive_ability4: null
我不能假设会有固定数量的“inactive_abilities”,但如果超过 4 个,其余的可以忽略。我遇到问题的部分是能够将字段转换为数组并在适当的时候读取它,然后根据上述条件创建和填充新列。
【问题讨论】:
【参考方案1】:字符串inactive_abilities
列可以通过删除[
和]
并用,
拆分字符串来转换为数组。
from pyspark.sql import functions as F
data = ["hero": "axe", "attribute": "strength", "active_abilities": ["q", "w", "r"], "inactive_abilities": "e",
"hero": "invoker", "attribute": "intelligence", "active_abilities": ["q", "w", "e", "r", "f", "d"],
"inactive_abilities": None,
"hero": "phantom assassin", "attribute": "agility", "active_abilities": ["q", "w", "e"],
"inactive_abilities": "r",
"hero": "life stealer", "attribute": "strength", "active_abilities": ["q", "r"],
"inactive_abilities": "[w, e]", ]
df = spark.createDataFrame(data)
array_select_expr = [
F.when(F.size("parsed_inactive_abilities") > 1, F.trim(F.col("parsed_inactive_abilities")[i])).alias(
f"inactive_abilityi") if i > 0 else F.when(
F.size("parsed_inactive_abilities") == 1, F.trim(F.col("parsed_inactive_abilities")[0])).alias(
"inactive_abilities")
for i in range(0, 5)]
(df.withColumn("parsed_inactive_abilities", F.split(F.regexp_replace(F.col("inactive_abilities"), "[\[\]]", ""), ","))
.select("*", *array_select_expr)
.drop("parsed_inactive_abilities").show())
输出
+------------------+------------+----------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+
| active_abilities| attribute| hero|inactive_abilities|inactive_abilities|inactive_ability1|inactive_ability2|inactive_ability3|inactive_ability4|
+------------------+------------+----------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+
| [q, w, r]| strength| axe| e| e| null| null| null| null|
|[q, w, e, r, f, d]|intelligence| invoker| null| null| null| null| null| null|
| [q, w, e]| agility|phantom assassin| r| r| null| null| null| null|
| [q, r]| strength| life stealer| [w, e]| null| e| null| null| null|
+------------------+------------+----------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+
# Value in array_select_expr
[Column<'CASE WHEN (size(parsed_inactive_abilities) = 1) THEN trim(parsed_inactive_abilities[0]) END AS inactive_abilities'>,
Column<'CASE WHEN (size(parsed_inactive_abilities) > 1) THEN trim(parsed_inactive_abilities[1]) END AS inactive_ability1'>,
Column<'CASE WHEN (size(parsed_inactive_abilities) > 1) THEN trim(parsed_inactive_abilities[2]) END AS inactive_ability2'>,
Column<'CASE WHEN (size(parsed_inactive_abilities) > 1) THEN trim(parsed_inactive_abilities[3]) END AS inactive_ability3'>,
Column<'CASE WHEN (size(parsed_inactive_abilities) > 1) THEN trim(parsed_inactive_abilities[4]) END AS inactive_ability4'>]
【讨论】:
以上是关于有条件地在 pyspark 中添加一个新列的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyspark 基于 if 和 else 条件创建新列