Pyspark:加入 2 个数据帧以仅从第 2 个数据帧获取新记录(历史化)
Posted
技术标签:
【中文标题】Pyspark:加入 2 个数据帧以仅从第 2 个数据帧获取新记录(历史化)【英文标题】:Pyspark : Join 2 dataframe to get only new records from 2nd dataframe (Historisation) 【发布时间】:2019-12-30 10:51:55 【问题描述】:我有 2 个数据框 df1 和 df2。我想要这个数据框的结果是这样的: 1.取df1的所有记录。 2. 仅从 df2 中获取新记录(df1 中不可用的记录) 3. 生成该逻辑的新数据框
注意:主键是“id”。我只想检查 id 而不是完整的行。如果 df1 中没有 ID,那么只有 df2 中的故事。
df1
+------+-------------+-----+
| id |time |other|
+------+-------------+-----+
| 111| 29-12-2019 | p1|
| 222| 29-12-2019 | p2|
| 333| 29-12-2019 | p3|
+----+-----+-----+---------+
df2
+------+-------------+-----+
| id |time |other|
+------+-------------+-----+
| 111| 30-12-2019 | p7|
| 222| 30-12-2019 | p8|
| 444| 30-12-2019 | p0|
+----+-----+-----+---------+
结果
+------+-------------+-----+
| id |time |other|
+------+-------------+-----+
| 111| 29-12-2019 | p1|
| 222| 29-12-2019 | p2|
| 333| 29-12-2019 | p3|
| 444| 30-12-2019 | p0|
+----+-----+-----+---------+
能否请您帮我在 pyspark 中执行此操作。我打算使用join。
【问题讨论】:
【参考方案1】:df1=spark.createDataFrame([(111,'29-12-2019','p1'),(222,'29-12-2019','p2'),(333,'29-12-2019','p3')],['id','time','other'])
df2=spark.createDataFrame([(111,'30-12-2019','p7'),(222,'30-12-2019','p8'),(444,'30-12-2019','p0')],['id','time','other'])
mvv1 = df1.select("id").rdd.flatMap(lambda x: x).collect()
print(mvv1)
[111, 222, 333]
yy=",".join([str(x) for x in mvv1])
df2.registerTempTable("temp_df2")
sqlDF2 = sqlContext.sql("select * from temp_df2 where id not in ("+yy+")")
sqlDF2.show()
+---+----------+-----+
| id| time|other|
+---+----------+-----+
|444|30-12-2019| p0|
+---+----------+-----+
df1.union(sqlDF2).show()
+---+----------+-----+
| id| time|other|
+---+----------+-----+
|111|29-12-2019| p1|
|222|29-12-2019| p2|
|333|29-12-2019| p3|
|444|30-12-2019| p0|
+---+----------+-----+
【讨论】:
感谢您的回答,但我有数百万条记录,并且合并会导致内存问题。对这么多记录使用联合比较慢。加入和使用减法或除法怎么样? Join 是您在 Spark 中常用的最昂贵的操作之一,因此在执行 join 之前尽您所能压缩数据是值得的。 我想是的,将 2 个表与这么多记录合并可能会非常昂贵,所以我使用 join 只是为了过滤新记录并使用 UNION 作为最终输出。【参考方案2】:最后,我编写了这段代码,它似乎可以很好地处理 12,000,000 行,它只需要 5 分钟即可构建。我希望它可以帮助其他人:
df1=spark.createDataFrame([(111,'29-12-2019','p1'),(222,'29-12-2019','p2'),(333,'29-12-2019','p3')],['id','time','other'])
df2=spark.createDataFrame([(111,'30-12-2019','p7'),(222,'30-12-2019','p8'),(444,'30-12-2019','p0')],['id','time','other'])
#So this is giving me all records which are not available in df1 dataset
new_input_df = df2.join(df1, on=['id'], how='left_anti')
#Now union df1(historic reocrds) and new_input_df which contains only new
final_df = df1.union(new_input_df)
final_df.show()
【讨论】:
以上是关于Pyspark:加入 2 个数据帧以仅从第 2 个数据帧获取新记录(历史化)的主要内容,如果未能解决你的问题,请参考以下文章