使用 Overwrite 模式时 Spark 不会删除 MemSql 中的旧数据
Posted
技术标签:
【中文标题】使用 Overwrite 模式时 Spark 不会删除 MemSql 中的旧数据【英文标题】:Spark not deleting old data in MemSql when Overwrite mode is used 【发布时间】:2018-05-29 09:59:40 【问题描述】:我正在使用覆盖模式运行 Spark 作业。我期待它会删除表中的数据并插入新数据。但是它只是将数据附加到它。
我期待与在文件系统中使用保存鼠标覆盖时相同的行为,
object HiveToMemSQL
def main(args: Array[String])
val log = Logger.getLogger(HiveToMemSQL.getClass)
//var options = getOptions()
//val cmdLineArgs = new CommandLineOptions().validateArguments(args, options)
//if (cmdLineArgs != null)
// Get command line options values
var query = "select * from default.students"
// Get destination DB details from command line
val destHostName ="localhost"
//val destUserName = cmdLineArgs.getOptionValue("destUserName")
//val destPassword = cmdLineArgs.getOptionValue("destPassword")
val destDBName ="tsg"
val destTable = "ORC_POS_TEST"
val destPort = 3308
val destConnInfo = MemSQLConnectionInfo(destHostName, destPort, "root", "", destDBName)
val spark = SparkSession.builder().appName("Hive To MemSQL")
.config("maxRecordsPerBatch" ,"100")
.config("spark.memsql.host", destConnInfo.dbHost)
.config("spark.memsql.port", destConnInfo.dbPort.toString)
.config("spark.memsql.user", destConnInfo.user)
.config("spark.memsql.password", destConnInfo.password)
.config("spark.memsql.defaultDatabase", destConnInfo.dbName)
// .config("org.apache.spark.sql.SaveMode" , SaveMode.Overwrite.toString())
.config("spark.memsql.defaultSaveMode" , "Overwrite")
.config("maxRecordsPerBatch" ,"100").master("local[*]").enableHiveSupport().getOrCreate()
import spark.implicits._
import spark.sql
// Queries are expressed in HiveQL
val sqlDF = spark.sql("select* from tsg.v_pos_krogus_wk_test")
log.info("Successfully read data from source")
sqlDF.printSchema()
sqlDF.printSchema()
// MemSQL destination DB Master Aggregator, Port, Username and Password
import spark.implicits._
// Disabling writing to leaf nodes directly
var saveConf = SaveToMemSQLConf(spark.memSQLConf,
params = Map("useKeylessShardingOptimization" -> "false",
"writeToMaster" -> "false" ,
"saveMode" -> SaveMode.Overwrite.toString()))
log.info("Save mode before :" + saveConf.saveMode )
saveConf= saveConf.copy(saveMode=SaveMode.Overwrite)
log.info("Save mode after :" + saveConf.saveMode )
val tableIdent = TableIdentifier(destDBName, destTable)
sqlDF.saveToMemSQL(tableIdent, saveConf)
log.info("Successfully completed writing to MemSQL DB")
【问题讨论】:
【参考方案1】:MemSQL Spark 连接器设置将写入 REPLACE 语句。 REPLACE 的工作方式与 INSERT 完全相同,只是如果表中的旧行与 PRIMARY KEY 的新行具有相同的值,则在插入新行之前删除旧行。见https://docs.memsql.com/sql-reference/v6.0/replace/
【讨论】:
有没有办法,我可以在插入之前截断表格,就像我如何在 spark 中使用 jdbc 写入格式一样?以上是关于使用 Overwrite 模式时 Spark 不会删除 MemSql 中的旧数据的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL写入Hive,同分区overwrite,不同分区insert
Apache Spark 动态分区 OverWrite 问题
Spark SQL SaveMode.Overwrite,获取 java.io.FileNotFoundException 并需要'REFRESH TABLE tableName'
spark特殊问题 在IDEA中spark(enableHiveSupport)中使用 insert overwrite时对空表可以正常写入但是如果表不为空就会报错处理方法