Pyspark 熔化空列
Posted
技术标签:
【中文标题】Pyspark 熔化空列【英文标题】:Pyspark Melting Null Columns 【发布时间】:2018-06-12 07:02:36 【问题描述】:我有一个如下所示的数据框:
# +----+------------+----------+-----------+
# | id | c_type_1 | c_type_2 | c_type_3 |
# +----+------------+----------+-----------+
# | 1 | null | null | r |
# | 2 | a | null | null |
# | 3 | null | null | null |
# +---+-------------+----------+-----------+
我需要把它转换成这样的:
# +----+------------+------------+
# | id | c_type | c_type_val |
# +----+------------+------------+
# | 1 | c_type_3 | r |
# | 2 | c_type_1 | a |
# | 3 | null | null |
# +---+-------------+------------+
每一行只有一个 c_type 值,否则所有 c_type 值都将为空。
我目前正在像这样融化行:
def melt(df, id_cols, value_cols, c_type, c_value):
v_arr = []
for c in value_cols:
v_arr.append(struct(lit(c).alias(c_type), col(c).alias(c_value)))
vars_and_vals = array(*v_arr)
tmp = df.withColumn("vars_and_vals", explode(vars_and_vals))
cols = id_cols + [
col("vars_and_vals")[x].alias(x) for x in [c_type, c_value]]
return tmp.select(*cols)
melted = melt(df, df.columns[:1], df.columns[1:4], 'c_type', 'c_type_val')
melted.filter(melted.c_type_val.isNotNull()).show()
问题在于过滤 c_type_val 的空值会过滤掉 id == 3 的行(任何 c_type 为空的行)。我需要一种方法来融化和过滤以将第三行保留为空 c_type 和 value。
【问题讨论】:
【参考方案1】:我尝试使用pandas
,它可能会给你一个解决这个问题的想法,
temp=df.filter(like='c_type')
df= pd.merge(df,temp[temp.notnull()].stack().reset_index(), left_index=True, right_on=['level_0'],how='left')
df=df[['id','level_1',0]].reset_index(drop=True).rename(columns='level_1':'c_type',0:'c_type_val')
print df
输出:
id c_type c_type_val
0 1 c_type_3 r
1 2 c_type_1 a
2 3 NaN NaN
【讨论】:
我忘了说我不能在这个环境中使用 pandas,因为它将在尚不支持其他库的 AWS Glue 中运行。以上是关于Pyspark 熔化空列的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回
PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?
pyspark.sql.functions.col 和 pyspark.sql.functions.lit 之间的 PySpark 区别