使用 COPY 导入时 Redshift 添加列

Posted

技术标签:

【中文标题】使用 COPY 导入时 Redshift 添加列【英文标题】:Redshift add column when importing with COPY 【发布时间】:2014-12-06 20:21:57 【问题描述】:

在 Amazon Redshift 中,我有一个需要从多个 CSV 文件加载数据的表:

create table my_table (
  id integer,
  name varchar(50) NULL
  email varchar(50) NULL,
  processed_file varchar(256) NULL
);

前三列指的是文件中的数据。最后一列processed_filed 表示从哪个文件导入的记录。

我在 Amazon S3 中有文件,我想使用 COPY 命令导入它们。比如:

COPY table_name FROM 's3://file-key' 
WITH CREDENTIALS 'aws_access_key_id=xxxx;aws_secret_access_key=xxxxx' 
DATEFORMAT 'auto' TIMEFORMAT 'auto' MAXERROR 0 ACCEPTINVCHARS '*' DELIMITER '\t' GZIP;

有没有办法使用 COPY 命令自动填充第四列 processed_file 以插入文件名。

我可以在 COPY 之后执行 UPDATE 语句,但我正在处理大量数据,所以理想情况下我希望尽可能避免这种情况。

【问题讨论】:

【参考方案1】:

这是不可能的。

您需要对文件进行预处理(包括名称列)或在加载后更新数据(但是很难同时从多个文件进行批量加载,这是最有效的方式将数据加载到 Redshift 中)。

见:Redshift COPY command documentation

【讨论】:

【参考方案2】:

在这里你可以尝试这个自定义逻辑来添加新列,在这个例子中添加文件名作为 redshift COPY 中的新列

import boto3
import re
s3 = boto3.client('s3')

sql = "DROPSQL , CREATE SQL , COPY SQL" ## here need to pass your actual sqls

def Filter(datalist,keyword):
    # Search data based on regular expression in the list
    return [val for val in datalist
        if re.search(keyword, val)]

def add_new_col(table_name):
    drop_sql = ''.join(Filter(sql.split(';'),keyword=table_name+' '))
    create_sql = ''.join(Filter(sql.split(';'),keyword=table_name+'\('))
    copy_sql = ''.join(Filter(sql.split(';'),keyword=table_name.upper()+'/'))
    
    BUCKET = copy_sql.split(' ')[3].split('/')[2]
    folder = '/'.join(copy_sql.split(' ')[3].split('/')[3:-1])+'/'
    maintable = copy_sql.split(' ')[1]
    
    print ("BUCKET , key_folder , maintable ".format(BUCKET,folder,maintable))
    
    temp_table_drop_sql = drop_sql.replace(table_name,'temp_table')
    temp_table_create_sql = create_sql.replace(table_name,'temp_table')
    temp_table_copy_sql = copy_sql.replace(table_name.upper(),'temp_table')
    temp_table_name_withSchema = temp_table_copy_sql.split(' ')[1]
    
    print ("temp_table_name_withSchema ".format(temp_table_name_withSchema))
    
    ## replace with query execute logic
    print(temp_table_drop_sql)
    print(temp_table_create_sql)
    #####
    
    response = s3.list_objects_v2(
            Bucket=BUCKET,
            Prefix =folder)
    
    new_column_name = 'filename' 
    
    for i in response["Contents"]:
        ## replace with query execute logic
        temp_sql = copy_sql.replace(folder,i["Key"])
        temp_sql = temp_sql.replace(table_name.upper(),'temp_table')
        print(temp_sql)
        ## i["Key"] is filename
        print("alter table  ADD COLUMN  varchar(256) NOT NULL DEFAULT '';".format(temp_table_name_withSchema, new_column_name , i["Key"].split('/')[-1]))
        print("insert into  (select * from )".format(maintable, temp_table_name_withSchema))
        print("truncate ".format(temp_table_name_withSchema))
        #####
    
    ## replace with query execute logic
    print(drop_sql)
    ########
    
add_new_col(table_name)

【讨论】:

【参考方案3】:

其实是可以的。我正在创建和加载没有额外 processed_file_name 列的数据,然后添加具有默认值的列。这是完整的过程:

create table my_table (
  id integer,
  name varchar(50) NULL
  email varchar(50) NULL,
);

COPY table_name FROM 's3://file-key' 
WITH CREDENTIALS 'aws_access_key_id=xxxx;aws_secret_access_key=xxxxx' 
DATEFORMAT 'auto' TIMEFORMAT 'auto' MAXERROR 0 ACCEPTINVCHARS '*' DELIMITER '\t' GZIP;

ALTER TABLE my_table ADD COLUMN processed_file_name varchar(256) NOT NULL DEFAULT 'file-name';

这适用于我的用例,因为我在暂存表上执行此操作,它将仅保存一个文件中的数据,然后将它们截断并将数据传输到目标表。

【讨论】:

这并不能解决您提出的问题。您将静态值放入您默认的列中。您加载的每个文件最终都具有相同的值。试试这个***.com/questions/16525175/… 这对我的用例来说没问题,因为我使用的表是一个 staging_table,在数据传输到目标表后会被截断

以上是关于使用 COPY 导入时 Redshift 添加列的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 中如何捕获 Redshift 对 COPY 命令的响应?

尽管数据有效,但 Redshift 上的 COPY 总是失败并出现时间戳错误

在 Redshift 中复制 JSON 列

Redshift COPY 自动压缩

S3 -> Redshift 无法处理 UTF8

如果我使用 COPY 命令将数据从 S3 加载到 Redshift,它会遵循我的 dist 样式和键吗?