Pyspark/SQL:添加一个标志列,如左半连接
Posted
技术标签:
【中文标题】Pyspark/SQL:添加一个标志列,如左半连接【英文标题】:Pyspark/SQL: add a flag column like left semi join 【发布时间】:2018-05-24 13:10:55 【问题描述】:比如数据是
customer = spark.createDataFrame([
(0, "Bill Chambers"),
(1, "Matei Zaharia"),
(2, "Michael Armbrust")])\
.toDF("customerid", "name")
order = spark.createDataFrame([
(0, 0, "Product 0"),
(1, 1, "Product 1"),
(2, 1, "Product 2"),
(3, 3, "Product 3"),
(4, 1, "Product 4")])\
.toDF("orderid", "customerid", "product_name")
为了得到客户的订单,我可以用left semi
来做
customer.join(order, ['customerid'], "left_semi").show()
可以返回
现在出于比较的原因,我想添加一个标志列,而不是直接过滤掉一些行。所需的输出如下所示:
+----------+----------------+---------+
|customerid| name|has_order|
+----------+----------------+---------+
| 0| Bill Chambers | true|
| 1| Matei Zaharia | true|
| 2|Michael Armbrust| false|
+----------+----------------+---------+
我该怎么做?有什么优雅的方法吗?我试图搜索但没有找到相关的东西,也许我得到了错误的关键词?
是否可以使用 SQL 的存在/存在来做到这一点?:Spark replacement for EXISTS and IN
【问题讨论】:
你可以使用except
像join_result.except(customer).withColumn("has_order", lit(False))
然后union
结果使用join_result.withColumn("has_order", lit(True))
。或者您可以选择不同的 order_id,然后使用 customer
进行左连接,然后使用 when
- otherwise
和 nvl
填充 has_order
@philantrovert 是的......但与left_semi
的简单性相比,这要复杂得多。
@cqcn1991 我个人认为加入和过滤是更好的解决方案,但也可以选择leftsemi
和leftanti
的union
加入(见我的updated answer)。
【参考方案1】:
您可以进行左连接,并根据orderid
列是否不为空,使用pyspark.sql.Column.isNull()
创建has_order
列。然后使用distinct()
删除重复的行。
import pyspark.sql.functions as f
customer.alias("c").join(order.alias("o"), on=["customerid"], how="left")\
.select(
"c.*",
f.col("o.orderid").isNull().alias("has_order")
)\
.distinct()\
.show()
#+----------+----------------+---------+
#|customerid| name|has_order|
#+----------+----------------+---------+
#| 0| Bill Chambers| true|
#| 1| Matei Zaharia| true|
#| 2|Michael Armbrust| false|
#+----------+----------------+---------+
如果您想使用类似于您正在使用的左半连接的东西,您可以将左半连接和左反连接的结果合并:
cust_left_semi = customer.join(order, ['customerid'], "leftsemi")\
.withColumn('has_order', f.lit(True))
cust_left_semi.show()
#+----------+-------------+---------+
#|customerid| name|has_order|
#+----------+-------------+---------+
#| 0|Bill Chambers| true|
#| 1|Matei Zaharia| true|
#+----------+-------------+---------+
cust_left_anti = customer.join(order, ['customerid'], "leftanti")\
.withColumn('has_order', f.lit(False))
cust_left_anti.show()
#+----------+----------------+---------+
#|customerid| name|has_order|
#+----------+----------------+---------+
#| 2|Michael Armbrust| false|
#+----------+----------------+---------+
cust_left_semi.union(cust_left_anti).show()
#+----------+----------------+---------+
#|customerid| name|has_order|
#+----------+----------------+---------+
#| 0| Bill Chambers| true|
#| 1| Matei Zaharia| true|
#| 2|Michael Armbrust| false|
#+----------+----------------+---------+
【讨论】:
好的,我会尝试对这两种解决方案进行基准测试并给您一些反馈。 而且,SQL 支持 IN/Exist,是否可以通过简单的解决方案通过 SQL 实现结果? 如post you linked 所示,目前在 SparkSQL AFAIK 中不支持 EXISTS/IN。以上是关于Pyspark/SQL:添加一个标志列,如左半连接的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe
如何修复:pyspark.sql.utils.IllegalArgumentException:列功能的类型不正确?
在 pyspark sql 的连接中重复使用相同的数据框视图