如果不存在则插入 Spark SQL 中的其他更新

Posted

技术标签:

【中文标题】如果不存在则插入 Spark SQL 中的其他更新【英文标题】:INSERT IF NOT EXISTS ELSE UPDATE in Spark SQL 【发布时间】:2017-08-15 20:15:24 【问题描述】:

Spark SQL 中是否有提供“INSERT IF NOT EXISTS ELSE UPDATE”的规定。

我有包含一些记录的 Spark SQL 表“ABC”。 然后我有另一批记录,我想根据它们是否存在于这个表中来在这个表中插入/更新。

有没有我可以在 SQL 查询中使用的 SQL 命令来实现这一点?

【问题讨论】:

在常规 Spark 中,这将通过 join 后跟 map... 谢谢@GlennieHellesSindholt,你能分享一个例子吗 【参考方案1】:

在常规 Spark 中,这可以通过 join 后跟 map 来实现,如下所示:

import spark.implicits._
val df1 = spark.sparkContext.parallelize(List(("id1", "orginal"), ("id2", "original"))).toDF("df1_id", "df1_status")
val df2 = spark.sparkContext.parallelize(List(("id1", "new"), ("id3","new"))).toDF("df2_id", "df2_status")

val df3 = df1
  .join(df2, 'df1_id === 'df2_id, "outer")
  .map(row => 
    if (row.isNullAt(2))
      (row.getString(0), row.getString(1))
    else
      (row.getString(2), row.getString(3))
  )

这会产生:

scala> df3.show
+---+--------+
| _1|      _2|
+---+--------+
|id3|     new| 
|id1|     new|
|id2|original|
+---+--------+

您也可以将selectudfs 一起使用而不是map,但在这种使用空值的特殊情况下,我个人更喜欢map 变体。

【讨论】:

这太棒了,这种技术有什么名字吗?【参考方案2】:

我知道分享我的代码有点晚了,但是为了添加或更新我的数据库,我做了一个看起来像这样的函数:

import pandas as pd

#Returns a spark dataframe with added and updated datas
#key parameter is the primary key of the dataframes
#The two parameters dfToUpdate and dfToAddAndUpdate are spark dataframes
def AddOrUpdateDf(dfToUpdate,dfToAddAndUpdate,key):
    #Cast the spark dataframe dfToUpdate to pandas dataframe
    dfToUpdatePandas = dfToUpdate.toPandas()

    #Cast the spark dataframe dfToAddAndUpdate to pandas dataframe
    dfToAddAndUpdatePandas = dfToAddAndUpdate.toPandas()

    #Update the table records with the latest records, and adding new records if there are new records.
    AddOrUpdatePandasDf = pd.concat([dfToUpdatePandas,dfToAddAndUpdatePandas]).drop_duplicates([key], keep = 'last').sort_values(key)

    #Cast back to get a spark dataframe
    AddOrUpdateDf = spark.createDataFrame(AddOrUpdatePandasDf)

    return AddOrUpdateDf

如您所见,我们需要将 spark 数据帧转换为 pandas 数据帧,以便能够执行 pd.concat,尤其是带有“keep = 'last'”的 drop_duplicates,然后我们转换回 spark 数据帧并返回它。 我不认为这是处理 AddOrUpdate 的最佳方式,但至少它有效。

【讨论】:

不要使用 Pandas,它会减慢你的代码速度!它不会在多个节点上扩展! 我怎样才能实现作为 spark 数据帧?【参考方案3】:

你可以像这样使用 spark sql:

select * from (select c.*, row_number() over (partition by tac  order by tag desc) as 
    TAG_NUM from (
    select 
         a.tac
        ,a.name
        ,0 as tag
    from tableA a
    union all
    select 
        b.tac
        ,b.name
         ,1 as tag
    from tableB b) c ) d where TAG_NUM=1

tac 是您要插入/更新的列。

【讨论】:

以上是关于如果不存在则插入 Spark SQL 中的其他更新的主要内容,如果未能解决你的问题,请参考以下文章

SQL Select:如果存在则更新,如果不存在则插入 - 使用日期部分比较?

如果 Databricks 或 Spark SQL 中存在表,则插入

SQL 查询 - 如果存在则更新,否则插入

SQL:如果存在则更新,否则插入...但对于具有不同值的多行

mysql批量更新,数据存在则更新,不存在则插入

数据库:如果存在则“更新”如果不存在则插入 [重复]