如何使用 Spark 删除数据库中的行?
Posted
技术标签:
【中文标题】如何使用 Spark 删除数据库中的行?【英文标题】:How to delete rows in database with Spark? 【发布时间】:2020-06-08 03:33:11 【问题描述】:感谢您阅读此问题。
我知道如何插入行
df.write \
.format('jdbc') \
.option("url", url) \
.option("dbtable", table) \
.option("user", user) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.mode('append') \
.save()
但是如何删除行呢? 喜欢..
df = [Row(id=1), Row(id=2), ... ]
=> DELETE FROM TABLE WHERE id in df ...
有可能吗?
【问题讨论】:
使用原生 JDBC 连接使用PreparedStatement
和 executeUpdate()
进行删除
你不能。您将需要回到旧的 JDBC 方式来执行此操作。您必须遍历要删除的行,然后批量删除。
【参考方案1】:
Spark 不支持它。 但我已经用 foreachPartition 完成了(只需使用数据帧数据..)
点赞Does Apache Spark SQL support MERGE clause?
df.rdd.coalesce(2).foreachPartition(partition =>
val connectionProperties = brConnect.value
val jdbcUrl = connectionProperties.getProperty("jdbcurl")
val user = connectionProperties.getProperty("user")
val password = connectionProperties.getProperty("password")
val driver = connectionProperties.getProperty("Driver")
Class.forName(driver)
val dbc: Connection = DriverManager.getConnection(jdbcUrl, user, password)
val db_batchsize = 1000
val sqlString = "INSERT employee USING values (?, ?, ?, ?)"
var pstmt: PreparedStatement = dbc.prepareStatement(sqlString)
partition.grouped(db_batchsize).foreach(batch =>
batch.foreach row =>
val id = row.id
val fname = row.fname
val lname = row.lname
val userid = row.userid
var pstmt: PreparedStatement =
pstmt.setLong(1, row.id)
pstmt.setString(2, row.fname)
pstmt.setString(3, row.lname)
pstmt.setString(4, row.userid)
pstmt.addBatch()
pstmt.executeBatch()
dbc.commit()
)
dbc.close()
)
【讨论】:
以上是关于如何使用 Spark 删除数据库中的行?的主要内容,如果未能解决你的问题,请参考以下文章
Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行
删除Apache Spark DataFrame中的重复项,并保留尚未删除的值的行?