无法从 CSV 文件为复制的持久表编写脚本

Posted

技术标签:

【中文标题】无法从 CSV 文件为复制的持久表编写脚本【英文标题】:Unable to script a replicated, persistent table from a CSV file 【发布时间】:2016-07-26 00:24:55 【问题描述】:

SnappyData v.0-5

目标:我想创建一个持久的、复制的 ROAD 表,并使用 Snappy Shell 从 CSV 文件加载它。 ROAD 表应该有 'road_id' 作为主键,以防止重复 ID。

我试过的命令是:

SET SCHEMA A;

DROP TABLE IF EXISTS ROAD;
DROP TABLE IF EXISTS STAGING_ROAD;

CREATE TABLE STAGING_ROAD 
(road_id string, name string)
USING com.databricks.spark.csv
OPTIONS(path 'roads.csv', header 'true');

CREATE TABLE ROAD
(
    road_id VARCHAR(64) NOT NULL,
    name VARCHAR(64) NOT NULL,
    CONSTRAINT road_PK PRIMARY KEY (road_id)
) USING row OPTIONS (BUCKETS '5', REPLICATE, PERSISTENT)
AS (select road_id, name from STAGING_ROAD);

这不起作用。而且,我必须将我的 ROAD 创建简化为使它甚至被创建。这个没有PK。它没有复制或持久性。

CREATE TABLE ROAD USING row OPTIONS ()
AS (select road_id, name from STAGING_ROAD);

我应该如何编写 SnappyData SQL 文件以实现上述目标?

【问题讨论】:

【参考方案1】:

这里有几点。由于要设置一些约束,因此需要先创建表。然后就可以向表中插入数据了。

a) CREATE TABLE ROAD (road_id VARCHAR(64) NOT NULL, name VARCHAR(64) NOT NULL, CONSTRAINT road_PK PRIMARY KEY (road_id)) 使用行选项 (PERSISTENT ''); 您不必指定 REPLICATE 关键字(不需要,默认为复制)或 BUCKET 参数(仅适用于分区表)。

b) 理想情况下,它应该是 INSERT INTO ROAD (SELECT * FROM STAGING_ROAD)。不幸的是,我们没有 SQL 支持从外部数据(如 0.5 版本中的 csv、parquet 进行批量插入),我假设您正在使用它。这已在最新的 master 中得到处理,并将在后续版本中提供。

c) 解决方法是编写一个 Spark 作业并插入到创建的行表中。您可以参考文档的http://snappydatainc.github.io/snappydata/jobs/ 部分。

如果您需要其他信息,请告诉我。

【讨论】:

【参考方案2】:

您可以使用以下脚本实现相同的目的:

CREATE TABLE STAGING_ROAD USING com.databricks.spark.csv OPTIONS(path 'roads.csv', header 'true',inferSchema 'true');

CREATE TABLE STAGING_ROAD2 USING row AS (SELECT road_id,name FROM STAGING_ROAD);

CREATE TABLE ROAD
(
    road_id VARCHAR(64) NOT NULL PRIMARY KEY,
    name VARCHAR(64) NOT NULL
) USING row OPTIONS (PARTITION_BY 'road_id', BUCKETS '5', PERSISTENT 'ASYNCHRONOUS');

INSERT INTO ROAD SELECT road_id, name from STAGING_ROAD2;

DROP TABLE STAGING_ROAD2;

创建 STAGING_ROAD2 是由于最新版本不需要的错误。

以编程方式,您可以以更简单的方式实现它,例如以下

String roadCsvFile = "road.csv";

snc.sql("CREATE TABLE ROAD( road_id VARCHAR(64) NOT NULL PRIMARY KEY,  name VARCHAR(64) NOT NULL) USING row OPTIONS (PARTITION_BY 'road_id', BUCKETS '5', PERSISTENT 'ASYNCHRONOUS')");

DataFrame roadDf = snc.read()
         .format("com.databricks.spark.csv") // CSV to DF package
         .option("header", "true") // Use first line of all files as header
         .option("inferSchema", "true") // Automatically infer data types
         .load(roadCsvFile);

// Populate the table in snappy store
roadDf.write().insertInto("ROAD");

【讨论】:

我不清楚 SRC_SYS 和 TRD_DATE 的作用。我也不明白 CSV 中的所有行实际上是如何进入决赛桌的。 CSV 中的行加载到 STAGING_ROAD,然后加载到 STAGING_ROAD2。最后,它被插入到带有所有约束等的最终表中。

以上是关于无法从 CSV 文件为复制的持久表编写脚本的主要内容,如果未能解决你的问题,请参考以下文章

在我的脚本结束时,如何将我下载的文件移动到新文件夹? [复制]

PostgreSQL pg_dump/复制

Python脚本从文本文件grep字段并将输出写入csv文件

如何编写批处理脚本来检查文件夹内的多个文件夹和文件夹并将 csv 文件设置为变量?

Mysql从脚本导入

管道输入到脚本中