如何使用 Spark 删除数据库中的行?

Posted

技术标签:

【中文标题】如何使用 Spark 删除数据库中的行?【英文标题】:How to delete rows in database with Spark? 【发布时间】:2020-06-08 03:33:11 【问题描述】:

感谢您阅读此问题。

我知道如何插入行

    df.write \
        .format('jdbc') \
        .option("url", url) \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "org.postgresql.Driver") \
        .mode('append') \
        .save()

但是如何删除行呢? 喜欢..

df = [Row(id=1), Row(id=2), ... ]

=> DELETE FROM TABLE WHERE id in df ...

有可能吗?

【问题讨论】:

使用原生 JDBC 连接使用 PreparedStatementexecuteUpdate() 进行删除 你不能。您将需要回到旧的 JDBC 方式来执行此操作。您必须遍历要删除的行,然后批量删除。 【参考方案1】:

Spark 不支持它。 但我已经用 foreachPartition 完成了(只需使用数据帧数据..)

点赞Does Apache Spark SQL support MERGE clause?

df.rdd.coalesce(2).foreachPartition(partition => 
  val connectionProperties = brConnect.value
  val jdbcUrl = connectionProperties.getProperty("jdbcurl")
  val user = connectionProperties.getProperty("user")
  val password = connectionProperties.getProperty("password")
  val driver = connectionProperties.getProperty("Driver")
  Class.forName(driver)
  val dbc: Connection = DriverManager.getConnection(jdbcUrl, user, password)
  val db_batchsize = 1000
  val sqlString = "INSERT employee USING values (?, ?, ?, ?)"

  var pstmt: PreparedStatement = dbc.prepareStatement(sqlString)
  partition.grouped(db_batchsize).foreach(batch => 
    batch.foreach row =>
      
        val id = row.id
        val fname = row.fname
        val lname = row.lname
        val userid = row.userid

        var pstmt: PreparedStatement = 
        pstmt.setLong(1, row.id)
        pstmt.setString(2, row.fname)
        pstmt.setString(3, row.lname)
        pstmt.setString(4, row.userid)
        pstmt.addBatch()
      
    
    pstmt.executeBatch()
    dbc.commit()
  )
  dbc.close()
)

【讨论】:

以上是关于如何使用 Spark 删除数据库中的行?的主要内容,如果未能解决你的问题,请参考以下文章

Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

获取被筛选器从 spark 数据帧中删除的行的示例

删除Apache Spark DataFrame中的重复项,并保留尚未删除的值的行?

SPARK DataFrame:删除组中的最大值

如何使用 Java 在 Dataset Spark 中过滤列并删除行

如何删除 Spark 表列中的空格(Pyspark)