Pyspark/SQL:添加一个标志列,如左半连接

Posted

技术标签:

【中文标题】Pyspark/SQL:添加一个标志列,如左半连接【英文标题】:Pyspark/SQL: add a flag column like left semi join 【发布时间】:2018-05-24 13:10:55 【问题描述】:

比如数据是

customer = spark.createDataFrame([
    (0, "Bill Chambers"),
    (1, "Matei Zaharia"),
    (2, "Michael Armbrust")])\
  .toDF("customerid", "name")

order = spark.createDataFrame([
    (0, 0, "Product 0"),
    (1, 1, "Product 1"),
    (2, 1, "Product 2"),
    (3, 3, "Product 3"),
    (4, 1, "Product 4")])\
  .toDF("orderid", "customerid", "product_name")

为了得到客户的订单,我可以用left semi来做

customer.join(order, ['customerid'], "left_semi").show()

可以返回

现在出于比较的原因,我想添加一个标志列,而不是直接过滤掉一些行。所需的输出如下所示:

+----------+----------------+---------+ 
|customerid|            name|has_order| 
+----------+----------------+---------+ 
|         0| Bill Chambers  |     true| 
|         1| Matei Zaharia  |     true| 
|         2|Michael Armbrust|    false| 
+----------+----------------+---------+

我该怎么做?有什么优雅的方法吗?我试图搜索但没有找到相关的东西,也许我得到了错误的关键词?


是否可以使用 SQL 的存在/存在来做到这一点?:Spark replacement for EXISTS and IN

【问题讨论】:

你可以使用exceptjoin_result.except(customer).withColumn("has_order", lit(False)) 然后union 结果使用join_result.withColumn("has_order", lit(True))。或者您可以选择不同的 order_id,然后使用 customer 进行左连接,然后使用 when - otherwisenvl 填充 has_order @philantrovert 是的......但与left_semi 的简单性相比,这要复杂得多。 @cqcn1991 我个人认为加入和过滤是更好的解决方案,但也可以选择leftsemileftantiunion 加入(见我的updated answer)。 【参考方案1】:

您可以进行左连接,并根据orderid 列是否不为空,使用pyspark.sql.Column.isNull() 创建has_order 列。然后使用distinct() 删除重复的行。

import pyspark.sql.functions as f
customer.alias("c").join(order.alias("o"), on=["customerid"], how="left")\
    .select(
        "c.*",
        f.col("o.orderid").isNull().alias("has_order")
    )\
    .distinct()\
    .show()
#+----------+----------------+---------+
#|customerid|            name|has_order|
#+----------+----------------+---------+
#|         0|   Bill Chambers|     true|
#|         1|   Matei Zaharia|     true|
#|         2|Michael Armbrust|    false|
#+----------+----------------+---------+

如果您想使用类似于您正在使用的左半连接的东西,您可以将左半连接和左反连接的结果合并:

cust_left_semi = customer.join(order, ['customerid'], "leftsemi")\
    .withColumn('has_order', f.lit(True))
cust_left_semi.show()
#+----------+-------------+---------+
#|customerid|         name|has_order|
#+----------+-------------+---------+
#|         0|Bill Chambers|     true|
#|         1|Matei Zaharia|     true|
#+----------+-------------+---------+

cust_left_anti = customer.join(order, ['customerid'], "leftanti")\
    .withColumn('has_order', f.lit(False))
cust_left_anti.show()
#+----------+----------------+---------+
#|customerid|            name|has_order|
#+----------+----------------+---------+
#|         2|Michael Armbrust|    false|
#+----------+----------------+---------+

cust_left_semi.union(cust_left_anti).show()
#+----------+----------------+---------+
#|customerid|            name|has_order|
#+----------+----------------+---------+
#|         0|   Bill Chambers|     true|
#|         1|   Matei Zaharia|     true|
#|         2|Michael Armbrust|    false|
#+----------+----------------+---------+

【讨论】:

好的,我会尝试对这两种解决方案进行基准测试并给您一些反馈。 而且,SQL 支持 IN/Exist,是否可以通过简单的解决方案通过 SQL 实现结果? 如post you linked 所示,目前在 SparkSQL AFAIK 中不支持 EXISTS/IN。

以上是关于Pyspark/SQL:添加一个标志列,如左半连接的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

如何修复:pyspark.sql.utils.IllegalArgumentException:列功能的类型不正确?

左半连接left demi-join

在 pyspark sql 的连接中重复使用相同的数据框视图

如何使用 PySpark 将 JSON 列类型写入 Postgres?

在 pyspark SQL 中将字符串日期转换为日期格式