PySpark:在联接中处理 NULL

Posted

技术标签:

【中文标题】PySpark:在联接中处理 NULL【英文标题】:PySpark: Handing NULL in Joins 【发布时间】:2017-09-05 19:14:20 【问题描述】:

我正在尝试在 pyspark 中加入 2 个数据帧。我的问题是我希望我的“内部联接”通过它,而不管 NULL 是什么。我可以看到在 scala 中,我有 的替代品。但是, 在 pyspark 中不起作用。

userLeft = sc.parallelize([
Row(id=u'1', 
    first_name=u'Steve', 
    last_name=u'Kent', 
    email=u's.kent@email.com'),
Row(id=u'2', 
    first_name=u'Margaret', 
    last_name=u'Peace', 
    email=u'marge.peace@email.com'),
Row(id=u'3', 
    first_name=None, 
    last_name=u'hh', 
    email=u'marge.hh@email.com')]).toDF()

userRight = sc.parallelize([
Row(id=u'2', 
    first_name=u'Margaret', 
    last_name=u'Peace', 
    email=u'marge.peace@email.com'),
Row(id=u'3', 
    first_name=None, 
    last_name=u'hh', 
    email=u'marge.hh@email.com')]).toDF()

当前工作版本:

userLeft.join(userRight, (userLeft.last_name==userRight.last_name) & (userLeft.first_name==userRight.first_name)).show()

当前结果:

    +--------------------+----------+---+---------+--------------------+----------+---+---------+
|               email|first_name| id|last_name|               email|first_name| id|last_name|
    +--------------------+----------+---+---------+--------------------+----------+---+---------+ 
    |marge.peace@email...|  Margaret|  2|    Peace|marge.peace@email...|  Margaret|  2|    Peace|
    +--------------------+----------+---+---------+--------------------+----------+---+---------+

预期结果:

    +--------------------+----------+---+---------+--------------------+----------+---+---------+
|               email|first_name| id|last_name|               email|first_name| id|last_name|
+--------------------+----------+---+---------+--------------------+----------+---+---------+
|  marge.hh@email.com|      null|  3|       hh|  marge.hh@email.com|      null|  3|       hh|
|marge.peace@email...|  Margaret|  2|    Peace|marge.peace@email...|  Margaret|  2|    Peace|
+--------------------+----------+---+---------+--------------------+----------+---+---------+

【问题讨论】:

【参考方案1】:

对于 PYSPARK 运算符:

import pyspark.sql.functions as F
df1.alias("df1").join(df2.alias("df2"), on = F.expr('df1.column <=> df2.column'))

对于 PYSPARK >= 2.3.0,您可以使用 Column.eqNullSafeIS NOT DISTINCT FROM 回答 here。

【讨论】:

【参考方案2】:

使用另一个值代替null

userLeft = userLeft.na.fill("unknown")
userRight = userRight.na.fill("unknown")

userLeft.join(userRight, ["last_name", "first_name"])

    +---------+----------+--------------------+---+--------------------+---+
    |last_name|first_name|               email| id|               email| id|
    +---------+----------+--------------------+---+--------------------+---+
    |    Peace|  Margaret|marge.peace@email...|  2|marge.peace@email...|  2|
    |       hh|   unknown|  marge.hh@email.com|  3|  marge.hh@email.com|  3|
    +---------+----------+--------------------+---+--------------------+---+

【讨论】:

我试过这种方法。对于字符串和日期列,我能够将其转换为区分 Null 值。比如:字符串“NULLCUSTOM”和日期:“8888-01-01”。但我无法确定整数或浮点值的值。你有什么想法吗? float("inf") 如果列的类型为intlong,它将被转换为long 它实际上不是无穷大,而是9223372036854775807 -1 用于 id 列

以上是关于PySpark:在联接中处理 NULL的主要内容,如果未能解决你的问题,请参考以下文章

PHP 处理时间

在 pyspark 中转换或处理日期数据类型的最佳方法是啥

使用 pyspark 处理结构数据类型

在 pyspark 中处理大数字的数据类型

在 Pyspark/Hive 中处理不断变化的数据类型

PySpark:如何在数据框中的 if 链中处理“else”?