使用 Overwrite 模式时 Spark 不会删除 MemSql 中的旧数据

Posted

技术标签:

【中文标题】使用 Overwrite 模式时 Spark 不会删除 MemSql 中的旧数据【英文标题】:Spark not deleting old data in MemSql when Overwrite mode is used 【发布时间】:2018-05-29 09:59:40 【问题描述】:

我正在使用覆盖模式运行 Spark 作业。我期待它会删除表中的数据并插入新数据。但是它只是将数据附加到它。

我期待与在文件系统中使用保存鼠标覆盖时相同的行为,

object HiveToMemSQL 
def main(args: Array[String]) 

    val log = Logger.getLogger(HiveToMemSQL.getClass)

    //var options = getOptions()
    //val cmdLineArgs = new CommandLineOptions().validateArguments(args, options)

    //if (cmdLineArgs != null) 

    // Get command line options values
    var query = "select * from default.students"
    // Get destination DB details from command line
    val destHostName ="localhost"
    //val destUserName = cmdLineArgs.getOptionValue("destUserName")
    //val destPassword = cmdLineArgs.getOptionValue("destPassword")
    val destDBName ="tsg"
    val destTable = "ORC_POS_TEST"
    val destPort = 3308
    val destConnInfo = MemSQLConnectionInfo(destHostName, destPort, "root", "", destDBName)

    val spark = SparkSession.builder().appName("Hive To MemSQL")
    .config("maxRecordsPerBatch" ,"100")
    .config("spark.memsql.host", destConnInfo.dbHost)
    .config("spark.memsql.port", destConnInfo.dbPort.toString)
    .config("spark.memsql.user", destConnInfo.user)
    .config("spark.memsql.password", destConnInfo.password)
    .config("spark.memsql.defaultDatabase", destConnInfo.dbName)
    //          .config("org.apache.spark.sql.SaveMode" , SaveMode.Overwrite.toString())
    .config("spark.memsql.defaultSaveMode"  , "Overwrite")
    .config("maxRecordsPerBatch" ,"100").master("local[*]").enableHiveSupport().getOrCreate()

    import spark.implicits._
    import spark.sql

    // Queries are expressed in HiveQL
    val sqlDF = spark.sql("select* from tsg.v_pos_krogus_wk_test")
    log.info("Successfully read data from source")
    sqlDF.printSchema()
    sqlDF.printSchema()

    // MemSQL destination DB Master Aggregator, Port, Username and Password
    import spark.implicits._

    // Disabling writing to leaf nodes directly
    var saveConf = SaveToMemSQLConf(spark.memSQLConf,
    params = Map("useKeylessShardingOptimization" -> "false", 
                 "writeToMaster" -> "false" , 
                 "saveMode" -> SaveMode.Overwrite.toString()))

    log.info("Save mode before  :" + saveConf.saveMode )
    saveConf= saveConf.copy(saveMode=SaveMode.Overwrite)
    log.info("Save mode after  :" + saveConf.saveMode )

    val tableIdent = TableIdentifier(destDBName, destTable)
    sqlDF.saveToMemSQL(tableIdent, saveConf)

    log.info("Successfully completed writing to MemSQL DB")

【问题讨论】:

【参考方案1】:

MemSQL Spark 连接器设置将写入 REPLACE 语句。 REPLACE 的工作方式与 INSERT 完全相同,只是如果表中的旧行与 PRIMARY KEY 的新行具有相同的值,则在插入新行之前删除旧行。见https://docs.memsql.com/sql-reference/v6.0/replace/

【讨论】:

有没有办法,我可以在插入之前截断表格,就像我如何在 spark 中使用 jdbc 写入格式一样?

以上是关于使用 Overwrite 模式时 Spark 不会删除 MemSql 中的旧数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL写入Hive,同分区overwrite,不同分区insert

Spark覆盖不会删除目标路径中的文件

如何在写入时强制数据集匹配其模式?

Apache Spark 动态分区 OverWrite 问题

Spark SQL SaveMode.Overwrite,获取 java.io.FileNotFoundException 并需要'REFRESH TABLE tableName'

spark特殊问题 在IDEA中spark(enableHiveSupport)中使用 insert overwrite时对空表可以正常写入但是如果表不为空就会报错处理方法