使用 pyspark 和 when 条件从另一个数据框创建列

Posted

技术标签:

【中文标题】使用 pyspark 和 when 条件从另一个数据框创建列【英文标题】:create column from another dataframe using pyspark and when condition 【发布时间】:2021-12-19 22:26:18 【问题描述】:

假设我有一个这样的数据框。我想从另一个数据框创建一个新列。 第一个数据框:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","34563","M",3000),
    ("Michael","Rose","","52452","M",4000),
    ("Robert","","Williams","72331","M",4000),
    ("Maria","Anne","Jones","52334","F",4000),
    ("Jen","Mary","Brown","82311","F",-1)
  ]
schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |34563|M     |3000  |
|Michael  |Rose      |        |52452|M     |4000  |
|Robert   |          |Williams|72331|M     |4000  |
|Maria    |Anne      |Jones   |52334|F     |4000  |
|Jen      |Mary      |Brown   |82311|F     |-1    |
+---------+----------+--------+-----+------+------+

第二个数据框:

df_2 = spark.createDataFrame([(34563, 435353424, 1, 2 ), (23524, 466344656, 2, 1), (52452, 263637236, 2, 5), (
   52334, 466633353, 2, 3), (66334, 563555578, 5, 4), (42552, 123445563, 5, 3), (72331, 413555213, 4, 3), (82311, 52355563, 2, 2)], ["id", "col_A", "val_1", "val_2"])
df_2.show()
+-----+---------+-----+-----+
|   id|    col_A|val_1|val_2|
+-----+---------+-----+-----+
|34563|435353424|    1|    2|
|23524|466344656|    2|    1|
|52452|263637236|    2|    5|
|52334|466633353|    2|    3|
|66334|563555578|    5|    4|
|42552|123445563|    5|    3|
|72331|413555213|    4|    3|
|82311| 52355563|    2|    2|
+-----+---------+-----+-----+

我想使用第二个数据框中的列在第一个数据框中创建一个新列(理论责任 3)。这是我的代码:

merge_imputation=df.join(df_2,\
                               df["id"]==df_2["id"]\
                               ,how="left").dropDuplicates(["id"])
df=df.withColumn("Theoretical Accountable 3",F.when((F.col("gender")=="M"),F.lit("1")).\
                                                       when((F.col("gender")=="F"),F.lit("2")).\
                                                       when(F.col("salary")>2000,merge_imputation.select("col_A"))
                                                       .otherwise(F.col("lastname")))

如何在没有错误消息的情况下使用连接列?我的问题我不知道在 when 条件下使用 column merge_imputation.select("col_A")。

【问题讨论】:

【参考方案1】:

PySpark 不允许在 withColumn 表达式中选择其他数据框中的列。要将Theoretical Accountable 3添加到df,您可以先将列添加到merge_imputation,然后将select所需的列添加回来构造df

df=merge_imputation.withColumn("Theoretical Accountable 3",F.when((F.col("gender")=="M"),F.lit("1")).\
                                                       when((F.col("gender")=="F"),F.lit("2")).\
                                                       when(F.col("salary")>2000, F.col("col_A"))
                                                       .otherwise(F.col("lastname")))\
                    .select(df["id"], "firstname", "middlename", "lastname", "gender", "salary", "Theoretical Accountable 3")

输出

+-----+---------+----------+--------+------+------+-------------------------+
|   id|firstname|middlename|lastname|gender|salary|Theoretical Accountable 3|
+-----+---------+----------+--------+------+------+-------------------------+
|34563|    James|          |   Smith|     M|  3000|                        1|
|52334|    Maria|      Anne|   Jones|     F|  4000|                        2|
|52452|  Michael|      Rose|        |     M|  4000|                        1|
|72331|   Robert|          |Williams|     M|  4000|                        1|
|82311|      Jen|      Mary|   Brown|     F|    -1|                        2|
+-----+---------+----------+--------+------+------+-------------------------+

【讨论】:

感谢您的回答,如果不使用 join 就无法做到这一点@Nithish。我想在不加入两个数据框的情况下分配列 Theoretical Accountable 3 ? @grinim 如果df_2 的大小很小,那么lookup 可以在没有连接的情况下完成,因为大量数据连接是唯一的方法。 好的,谢谢 :)

以上是关于使用 pyspark 和 when 条件从另一个数据框创建列的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中使用列条件替换空值

如何在字典中使用 pyspark.sql.functions.when() 的多个条件?

如何在 pyspark.sql.functions.when() 中使用多个条件?

Pyspark 中的多个 WHEN 条件实现

如何在pyspark数据框中添加多个带有when条件的新列?

如何在 PySpark 中编写条件正则表达式替换?