Pyspark 熔化空列

Posted

技术标签:

【中文标题】Pyspark 熔化空列【英文标题】:Pyspark Melting Null Columns 【发布时间】:2018-06-12 07:02:36 【问题描述】:

我有一个如下所示的数据框:

# +----+------------+----------+-----------+
# | id |  c_type_1  | c_type_2 |  c_type_3 |
# +----+------------+----------+-----------+
# |  1 |    null    |   null   |     r     |
# |  2 |     a      |   null   |    null   |
# |  3 |    null    |   null   |    null   |
# +---+-------------+----------+-----------+

我需要把它转换成这样的:

# +----+------------+------------+
# | id |   c_type   | c_type_val |
# +----+------------+------------+
# |  1 |  c_type_3  |     r      |
# |  2 |  c_type_1  |     a      |
# |  3 |    null    |    null    |
# +---+-------------+------------+

每一行只有一个 c_type 值,否则所有 c_type 值都将为空。

我目前正在像这样融化行:

def melt(df, id_cols, value_cols, c_type, c_value):
    v_arr = []
    for c in value_cols:
        v_arr.append(struct(lit(c).alias(c_type), col(c).alias(c_value)))

    vars_and_vals = array(*v_arr)

    tmp = df.withColumn("vars_and_vals", explode(vars_and_vals))

    cols = id_cols + [
        col("vars_and_vals")[x].alias(x) for x in [c_type, c_value]]
    return tmp.select(*cols)


melted = melt(df, df.columns[:1], df.columns[1:4], 'c_type', 'c_type_val')

melted.filter(melted.c_type_val.isNotNull()).show()

问题在于过滤 c_type_val 的空值会过滤掉 id == 3 的行(任何 c_type 为空的行)。我需要一种方法来融化和过滤以将第三行保留为空 c_type 和 value。

【问题讨论】:

【参考方案1】:

我尝试使用pandas,它可能会给你一个解决这个问题的想法,

temp=df.filter(like='c_type')
df= pd.merge(df,temp[temp.notnull()].stack().reset_index(), left_index=True, right_on=['level_0'],how='left')
df=df[['id','level_1',0]].reset_index(drop=True).rename(columns='level_1':'c_type',0:'c_type_val')
print df

输出:

   id    c_type c_type_val
0   1  c_type_3          r
1   2  c_type_1          a
2   3       NaN        NaN

【讨论】:

我忘了说我不能在这个环境中使用 pandas,因为它将在尚不支持其他库的 AWS Glue 中运行。

以上是关于Pyspark 熔化空列的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark DataFrame 中添加多个空列

从 pyspark 数据框中删除空列

Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回

PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?

pyspark.sql.functions.col 和 pyspark.sql.functions.lit 之间的 PySpark 区别

Pyspark 安装错误:没有名为“pyspark”的模块