Spark创建或替换临时视图不多次更新现有表
Posted
技术标签:
【中文标题】Spark创建或替换临时视图不多次更新现有表【英文标题】:Spark create or replace temp view not updating existing table multiple times 【发布时间】:2018-02-15 17:51:20 【问题描述】:第 1 步
我需要重新比较两个 csv 文件,其中一个是静态的(DB.csv
),另一个是从网络下载的 Downloaded.csv
(这是动态的,可能会更新记录)
第 2 步 比较两个csv的差异后,将写入mongodb
第三步
现在Downloaded.csv
文件需要替换DB.csv
,然后步骤1的相同逻辑将继续。
示例说明
第一步
DB.csv [temp table `db` ]
sno APPLE BANANA
1 13 11
2 2 22
3 2 22
Downloaded.csv [temp table `downloaded` ]
sno APPLE BANANA
1 n 11
2 2 22
3 2 22
第二步
Difference dataset
sno APPLE BANANA
1 n 11
第三步
DB.csv [temp table `db` - updated ]
sno APPLE BANANA
1 n 11
2 2 22
3 2 22
重复 步骤1
DB.csv [temp table `db` - updated ]
sno APPLE BANANA
1 n 11
2 2 22
3 2 22
Downloaded.csv [temp table `downloaded` - new downloaded record ]
sno APPLE BANANA
1 n 11
2 2 n
3 2 22
重复 第二步
Difference dataset
sno APPLE BANANA
2 2 n
重复 第三步
DB.csv [temp table `db` ]
sno APPLE BANANA
1 n 11
2 2 n
3 2 22
这是我的逻辑
Dataset<Row> downloaded =spark.read().option("header","true").csv("/home/exa4/Desktop/downloaded.csv");
Dataset<Row> db =spark.read().option("header","true").csv("/home/exa4/Desktop/db.csv");
downloaded.createOrReplaceTempView("downloaded");
db.createOrReplaceTempView("db");
Dataset<Row> diffs= spark.sql("select * from downloaded EXCEPT select * from db");
//write updates to collection
MongoSpark.save(diffs.write().option("collection", "UpdatedRecords").mode("overwrite"));
//replacing old DB with new dataset downloaded
downloaded.createOrReplaceTempView("db");
////For every 10 seconds I may intenstionaly change the downloaded.csv for testing , as it is dynamic dataset
while(true)
long start = System.currentTimeMillis();
Thread.sleep(10000);
//this will be newly downloaded file from net
Dataset<Row> downloaded =spark.read().option("header","true").csv("/home/exa4/Desktop/downloaded.csv");
downloaded.createOrReplaceTempView("downloaded");
//now comparing downloaded with previously updated dataset
Dataset<Row> diffs_= spark.sql("select * from downloaded EXCEPT select * from db");
diffs_.show();
////HERE I AM GETTING NULL RECORDS
downloaded.createOrReplaceTempView("db");
【问题讨论】:
【参考方案1】:spark.catalog.refreshTable(s"$dbName.$destinationTableName")
用数据库名和表名替换
【讨论】:
以上是关于Spark创建或替换临时视图不多次更新现有表的主要内容,如果未能解决你的问题,请参考以下文章