Pyspark:加入 2 个数据帧以仅从第 2 个数据帧获取新记录(历史化)

Posted

技术标签:

【中文标题】Pyspark:加入 2 个数据帧以仅从第 2 个数据帧获取新记录(历史化)【英文标题】:Pyspark : Join 2 dataframe to get only new records from 2nd dataframe (Historisation) 【发布时间】:2019-12-30 10:51:55 【问题描述】:

我有 2 个数据框 df1 和 df2。我想要这个数据框的结果是这样的: 1.取df1的所有记录。 2. 仅从 df2 中获取新记录(df1 中不可用的记录) 3. 生成该逻辑的新数据框

注意:主键是“id”。我只想检查 id 而不是完整的行。如果 df1 中没有 ID,那么只有 df2 中的故事。

df1

    +------+-------------+-----+
    |  id  |time         |other|
    +------+-------------+-----+
    |   111|  29-12-2019 |   p1|
    |   222|  29-12-2019 |   p2|
    |   333|  29-12-2019 |   p3|
    +----+-----+-----+---------+

df2

    +------+-------------+-----+
    |  id  |time         |other|
    +------+-------------+-----+
    |   111|  30-12-2019 |   p7|
    |   222|  30-12-2019 |   p8|
    |   444|  30-12-2019 |   p0|
    +----+-----+-----+---------+

结果

+------+-------------+-----+
|  id  |time         |other|
+------+-------------+-----+
|   111|  29-12-2019 |   p1|
|   222|  29-12-2019 |   p2|
|   333|  29-12-2019 |   p3|
|   444|  30-12-2019 |   p0|
+----+-----+-----+---------+

能否请您帮我在 pyspark 中执行此操作。我打算使用join。

【问题讨论】:

【参考方案1】:
df1=spark.createDataFrame([(111,'29-12-2019','p1'),(222,'29-12-2019','p2'),(333,'29-12-2019','p3')],['id','time','other'])
df2=spark.createDataFrame([(111,'30-12-2019','p7'),(222,'30-12-2019','p8'),(444,'30-12-2019','p0')],['id','time','other'])

mvv1 = df1.select("id").rdd.flatMap(lambda x: x).collect()
print(mvv1)

[111, 222, 333]

yy=",".join([str(x) for x in mvv1])
df2.registerTempTable("temp_df2")
sqlDF2 = sqlContext.sql("select * from temp_df2 where id not in ("+yy+")")
sqlDF2.show()

+---+----------+-----+
| id|      time|other|
+---+----------+-----+
|444|30-12-2019|   p0|
+---+----------+-----+

df1.union(sqlDF2).show()

+---+----------+-----+
| id|      time|other|
+---+----------+-----+
|111|29-12-2019|   p1|
|222|29-12-2019|   p2|
|333|29-12-2019|   p3|
|444|30-12-2019|   p0|
+---+----------+-----+

【讨论】:

感谢您的回答,但我有数百万条记录,并且合并会导致内存问题。对这么多记录使用联合比较慢。加入和使用减法或除法怎么样? Join 是您在 Spark 中常用的最昂贵的操作之一,因此在执行 join 之前尽您所能压缩数据是值得的。 我想是的,将 2 个表与这么多记录合并可能会非常昂贵,所以我使用 join 只是为了过滤新记录并使用 UNION 作为最终输出。【参考方案2】:

最后,我编写了这段代码,它似乎可以很好地处理 12,000,000 行,它只需要 5 分钟即可构建。我希望它可以帮助其他人:

df1=spark.createDataFrame([(111,'29-12-2019','p1'),(222,'29-12-2019','p2'),(333,'29-12-2019','p3')],['id','time','other'])
df2=spark.createDataFrame([(111,'30-12-2019','p7'),(222,'30-12-2019','p8'),(444,'30-12-2019','p0')],['id','time','other'])

#So this is giving me all records which are not available in df1 dataset
new_input_df = df2.join(df1, on=['id'], how='left_anti')

#Now union df1(historic reocrds) and new_input_df  which contains only new 
final_df = df1.union(new_input_df)

final_df.show()

【讨论】:

以上是关于Pyspark:加入 2 个数据帧以仅从第 2 个数据帧获取新记录(历史化)的主要内容,如果未能解决你的问题,请参考以下文章

聚合 SQL 函数以仅从每个组中获取第一个

pyspark 处理和比较 2 个数据帧

使用 onBackPressed 仅从第一个片段退出 App

加入后替换pyspark数据框中的列

Pyspark在第二个数据框中加入多行数据框

pyspark 数据框上的自定义函数