Spark创建或替换临时视图不多次更新现有表

Posted

技术标签:

【中文标题】Spark创建或替换临时视图不多次更新现有表【英文标题】:Spark create or replace temp view not updating existing table multiple times 【发布时间】:2018-02-15 17:51:20 【问题描述】:

第 1 步 我需要重新比较两个 csv 文件,其中一个是静态的(DB.csv),另一个是从网络下载的 Downloaded.csv(这是动态的,可能会更新记录)

第 2 步 比较两个csv的差异后,将写入mongodb

第三步 现在Downloaded.csv文件需要替换DB.csv,然后步骤1的相同逻辑将继续。

示例说明

第一步

DB.csv   [temp table `db` ]

sno APPLE   BANANA
1   13  11
2   2   22
3   2   22

Downloaded.csv [temp table `downloaded` ]
sno APPLE   BANANA
1   n   11
2   2   22
3   2   22

第二步

Difference dataset
sno APPLE   BANANA
1     n       11

第三步

DB.csv [temp table `db` - updated ]
sno APPLE   BANANA
 1  n   11
 2  2   22
 3  2   22

重复 步骤1

DB.csv [temp table `db` - updated ]
sno APPLE   BANANA
 1  n   11
 2  2   22
 3  2   22

Downloaded.csv [temp table `downloaded` - new downloaded record ]
sno APPLE   BANANA
1   n   11
2   2   n
3   2   22

重复 第二步

Difference dataset
sno APPLE   BANANA
2     2       n  

重复 第三步

DB.csv [temp table `db` ]
sno APPLE   BANANA
 1  n   11
 2  2   n
 3  2   22

这是我的逻辑

 Dataset<Row> downloaded =spark.read().option("header","true").csv("/home/exa4/Desktop/downloaded.csv");
     Dataset<Row> db =spark.read().option("header","true").csv("/home/exa4/Desktop/db.csv");
     downloaded.createOrReplaceTempView("downloaded");
     db.createOrReplaceTempView("db");

     Dataset<Row> diffs= spark.sql("select * from downloaded EXCEPT select * from db");

    //write updates to collection
    MongoSpark.save(diffs.write().option("collection", "UpdatedRecords").mode("overwrite"));

    //replacing old DB with new dataset downloaded 
    downloaded.createOrReplaceTempView("db");

     ////For every 10 seconds I may intenstionaly change the downloaded.csv for testing , as it is dynamic dataset 
     while(true)
         long start = System.currentTimeMillis();
            Thread.sleep(10000);

             //this will be newly downloaded file from net 
             Dataset<Row> downloaded =spark.read().option("header","true").csv("/home/exa4/Desktop/downloaded.csv");
             downloaded.createOrReplaceTempView("downloaded");

            //now comparing downloaded with previously updated dataset 
            Dataset<Row> diffs_= spark.sql("select * from downloaded EXCEPT select * from db");
            diffs_.show();
             ////HERE I AM GETTING NULL RECORDS 

            downloaded.createOrReplaceTempView("db");

     

【问题讨论】:

【参考方案1】:
spark.catalog.refreshTable(s"$dbName.$destinationTableName")

用数据库名和表名替换

【讨论】:

以上是关于Spark创建或替换临时视图不多次更新现有表的主要内容,如果未能解决你的问题,请参考以下文章

如何从 spark sql databricks 中的临时视图或等效视图中删除?

Spark:通过对临时表执行 sql 查询来创建临时表

SQL 使用临时表创建视图

sql中怎样将查询出来的结果创建成临时表

Spark 数据集:表数据与创建它的视图不完全相同

SQL-视图与存储过程