在 PySpark 中重新索引和填充缺失的日期
Posted
技术标签:
【中文标题】在 PySpark 中重新索引和填充缺失的日期【英文标题】:Reindexing and filling missing dates in PySpark 【发布时间】:2020-04-13 15:19:33 【问题描述】:有没有办法在 PySpark 中填写缺失的列日期和行值?目前,我将数据框转换为 Pandas 并在那里重新索引。
sdf.show()
+---+----------+----------+----------+
| id|2018-01-01|2018-01-03|2018-01-05|
+---+----------+----------+----------+
| 1 | 0.0| 1.0| 0.0|
| 2 | 4.0| 2.0| 0.0|
| 3 | 0.0| 1.0| 1.0|
| 7 | 0.0| 2.0| 9.0|
| 8 | 8.0| 0.0| 0.0|
| 9 | 0.0| 0.0| 3.0|
+---+----------+----------+----------+
idx = pd.date_range('01-01-2018', '01-07-2018').date
df = sdf.toPandas()
df = df.set_index('id')
df = df.reindex(idx, axis=1, fill_value=0)
我在 PySpark 中找不到类似的东西。
期望的输出:
+---+----------+----------+----------+----------+----------+
| id|2018-01-01|2018-01-02|2018-01-03|2018-01-04|2018-01-05|
+---+----------+----------+----------+----------+----------+
| 1 | 0.0| 0.0| 1.0| 0.0| 0.0|
| 2 | 4.0| 0.0| 2.0| 0.0| 0.0|
| 3 | 0.0| 0.0| 1.0| 0.0| 1.0|
| 7 | 0.0| 0.0| 2.0| 0.0| 9.0|
| 8 | 8.0| 0.0| 0.0| 0.0| 0.0|
| 9 | 0.0| 0.0| 0.0| 0.0| 3.0|
+---+----------+----------+----------+----------+----------+
【问题讨论】:
【参考方案1】:您可以将lit()
用于idx
中尚未出现在数据框中的值。
请注意,我已将该列转换为字符串,仅用于测试:
ids = [str(i) for i in idx] #may not be required
to_add = [col for col in ids if col not in df.columns]
out = df.select(df.columns+ [lit(0).alias(name) for name in to_add])
out.show()
+---+----------+----------+----------+----------+----------+----------+----------+
| id|2018-01-01|2018-01-03|2018-01-05|2018-01-02|2018-01-04|2018-01-06|2018-01-07|
+---+----------+----------+----------+----------+----------+----------+----------+
| 1| 0.0| 1.0| 0.0| 0| 0| 0| 0|
| 2| 4.0| 2.0| 0.0| 0| 0| 0| 0|
| 3| 0.0| 1.0| 1.0| 0| 0| 0| 0|
| 7| 0.0| 2.0| 9.0| 0| 0| 0| 0|
| 8| 8.0| 0.0| 0.0| 0| 0| 0| 0|
| 9| 0.0| 0.0| 3.0| 0| 0| 0| 0|
+---+----------+----------+----------+----------+----------+----------+----------+
【讨论】:
SideNote :生成排序输出,您可以测试:out.select(sorted(out.columns,key=lambda x: (x!='id',x))).show()
【参考方案2】:
试试这个
rdd_df = df.rdd.zipWithIndex()
df_final = rdd_df.toDF(sampleRatio=0.2)
df_final = df_final.withColumn('name_id', df_final['_1'].getItem("column name"))
【讨论】:
似乎不起作用,我添加了一个我想要的输出示例以进行更多说明以上是关于在 PySpark 中重新索引和填充缺失的日期的主要内容,如果未能解决你的问题,请参考以下文章