PySpark 2.2 爆炸删除空行(如何实现explode_outer)? [复制]

Posted

技术标签:

【中文标题】PySpark 2.2 爆炸删除空行(如何实现explode_outer)? [复制]【英文标题】:PySpark 2.2 explode dropping null rows (how to implement explode_outer)? [duplicate] 【发布时间】:2018-10-10 19:11:48 【问题描述】:

我正在处理 PySpark 数据框中的一些深层嵌套数据。当我试图将结构展平为行和列时,我注意到当我调用withColumn 时,如果该行在源列中包含null,那么该行将从我的结果数据框中删除。相反,我想找到一种方法来保留该行并在结果列中包含null

使用的示例数据框:

from pyspark.sql.functions import explode, first, col, monotonically_increasing_id
from pyspark.sql import Row

df = spark.createDataFrame([
  Row(dataCells=[Row(posx=0, posy=1, posz=.5, value=1.5, shape=[Row(_type='square', _len=1)]), 
                 Row(posx=1, posy=3, posz=.5, value=4.5, shape=[]), 
                 Row(posx=2, posy=5, posz=.5, value=7.5, shape=[Row(_type='circle', _len=.5)])
    ])
])

我还有一个用于展平结构的函数:

def flatten_struct_cols(df):
    flat_cols = [column[0] for column in df.dtypes if 'struct' not in column[1][:6]]
    struct_columns = [column[0] for column in df.dtypes if 'struct' in column[1][:6]]

    df = df.select(flat_cols +
                   [col(sc + '.' + c).alias(sc + '_' + c)
                   for sc in struct_columns
                   for c in df.select(sc + '.*').columns])

    return df

架构看起来像这样:

df.printSchema()

root
 |-- dataCells: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- posx: long (nullable = true)
 |    |    |-- posy: long (nullable = true)
 |    |    |-- posz: double (nullable = true)
 |    |    |-- shape: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _len: long (nullable = true)
 |    |    |    |    |-- _type: string (nullable = true)
 |    |    |-- value: double (nullable = true)

起始数据框:

df.show(3)

+--------------------+
|           dataCells|
+--------------------+
|[[0,1,0.5,Wrapped...|
+--------------------+

我首先分解数组,因为我想将这个结构数组与结构数组转换为行和列。然后我将结构字段展平为新列。

df = df.withColumn('dataCells', explode(col('dataCells')))
df = flatten_struct_cols(df)
df.show(3)

我的数据看起来像:

+--------------+--------------+--------------+---------------+---------------+
|dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|
+--------------+--------------+--------------+---------------+---------------+
|             0|             1|           0.5|   [[1,square]]|            1.5|
|             1|             3|           0.5|             []|            4.5|
|             2|             5|           0.5|[[null,circle]]|            7.5|
+--------------+--------------+--------------+---------------+---------------+

在我尝试explode dataCells_shape 具有空值/null 值的列之前,一切都很好并且符合预期。

df = df.withColumn('dataCells_shape', explode(col('dataCells_shape')))
df.show(3)

从数据框中删除第二行:

+--------------+--------------+--------------+---------------+---------------+
|dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|
+--------------+--------------+--------------+---------------+---------------+
|             0|             1|           0.5|     [1,square]|            1.5|
|             2|             5|           0.5|  [null,circle]|            7.5|
+--------------+--------------+--------------+---------------+---------------+

相反,我想保留该行并保留该列的空值以及其他列中的所有值。在执行 .withColumn explode 时,我尝试创建一个新列而不是覆盖旧列,并且无论哪种方式都得到相同的结果。

我还尝试创建一个UDF,如果该行不为空/null,则执行explode 函数,但我遇到了处理null 的JVM 错误。

from pyspark.sql.functions import udf
from pyspark.sql.types import NullType, StructType

def explode_if_not_null(trow):
    if trow:
        return explode(trow)
    else:
        return NullType

func_udf = udf(explode_if_not_null, StructType())
df = df.withColumn('dataCells_shape_test', func_udf(df['dataCells_shape']))
df.show(3)

AttributeError: 'NoneType' object has no attribute '_jvm'

当列是null 时,谁能建议我在不丢失行的情况下爆炸或展平ArrayType 列的方法?

我正在使用 PySpark 2.2.0

编辑:

按照作为可能的dupe 提供的链接,我尝试实施建议的.isNotNull().otherwise() 解决方案,将结构架构提供给.otherwise,但该行仍然从结果集中退出。

df.withColumn("dataCells_shape_test", explode(when(col("dataCells_shape").isNotNull(), col("dataCells_shape"))
                                              .otherwise(array(lit(None).cast(df.select(col("dataCells_shape").getItem(0))
                                                                                                              .dtypes[0][1])
                                                              )
                                                        )
                                             )
             ).show()

+--------------+--------------+--------------+---------------+---------------+--------------------+
|dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|dataCells_shape_test|
+--------------+--------------+--------------+---------------+---------------+--------------------+
|             0|             1|           0.5|   [[1,square]]|            1.5|          [1,square]|
|             2|             5|           0.5|[[null,circle]]|            7.5|       [null,circle]|
+--------------+--------------+--------------+---------------+---------------+--------------------+

【问题讨论】:

你可以尝试使用 spark 的内置 when 而不是使用 udf 吗?它会像df = df.withColumn('dataCells', when(col('dataCells').isNotNull),explode(col('dataCells'))) Spark sql how to explode without losing null values 的可能重复项。尽管该帖子不适用于 pyspark,但该技术是 not language specific。 @Alexander 您在isNotNull() 末尾缺少括号 @Alexander 我无法对此进行测试,但explode_outer 是 spark 版本 2.2 的一部分(但在 pyspark 2.3 之前不可用)-您可以尝试以下操作:1)explode_outer = sc._jvm.org.apache.spark.sql.functions.explode_outer 和然后 df.withColumn("dataCells", explode_outer("dataCells")).show() 或 2) df.createOrReplaceTempView("myTable") 然后 spark.sql("select *, explode_outer(dataCells) from myTable").show() @Alexander 有关如何引入 java/scala 函数的相关帖子:Spark: How to map Python with Scala or Java User Defined Functions? 【参考方案1】:

感谢 pault 向我指出有关将 Python 映射到 Java 的 this question 和 this question。我能够通过以下方式获得有效的解决方案:

from pyspark.sql.column import Column, _to_java_column

def explode_outer(col):
    _explode_outer = sc._jvm.org.apache.spark.sql.functions.explode_outer 
    return Column(_explode_outer(_to_java_column(col)))

new_df = df.withColumn("dataCells_shape", explode_outer(col("dataCells_shape")))

+--------------+--------------+--------------+---------------+---------------+
|dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|
+--------------+--------------+--------------+---------------+---------------+
|             0|             1|           0.5|     [1,square]|            1.5|
|             1|             3|           0.5|           null|            4.5|
|             2|             5|           0.5|  [null,circle]|            7.5|
+--------------+--------------+--------------+---------------+---------------+

root
 |-- dataCells_posx: long (nullable = true)
 |-- dataCells_posy: long (nullable = true)
 |-- dataCells_posz: double (nullable = true)
 |-- dataCells_shape: struct (nullable = true)
 |    |-- _len: long (nullable = true)
 |    |-- _type: string (nullable = true)
 |-- dataCells_value: double (nullable = true)

请务必注意,这适用于 pyspark 2.2 版,因为 explode_outer 在 spark 2.2 中定义(但由于某种原因,API 包装器直到 2.3 版才在 pyspark 中实现)。该解决方案为已经实现的 java 函数创建了一个包装器。

【讨论】:

【参考方案2】:

对于这种复杂的结构,写一个map函数并在RDD接口的flatMap方法中使用它会更容易。结果,您将获得一个新的扁平化 RDD,然后您必须通过应用新模式再次创建数据框。

def flat_arr(row):
    rows = []
    # apply some logic to fill rows list with more "rows"
    return rows

rdd = df.rdd.flatMap(flat_arr)
schema = StructType(
    StructField('field1', StringType()),
    # define more fields
)
df = df.sql_ctx.createDataFrame(rdd, schema)
df.show()

此解决方案看起来比应用 withColumn 长一点,但它可能是您的解决方案的第一次迭代,因此您可以了解如何将其转换为 withColumn 语句。但在我看来,地图功能在这里是合适的,只是为了让事情变得清晰

【讨论】:

使用 RDD 会不会阻止催化剂优化器优化操作?

以上是关于PySpark 2.2 爆炸删除空行(如何实现explode_outer)? [复制]的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark Struct 列:爆炸后的奇怪行为

如何在 PySpark 中与爆炸相反?

如何在pyspark /中的结构内爆炸结构中的内部数组

如何在Scala中证明爆炸原理(ex falso sequitur quodlibet)?

pyspark中的条件爆炸

Pyspark 爆炸功能未按预期工作