AWS:通过使用 amazon-data-pipeline 将数据从 S3 传输到 Redshift 来实现除 COPY 之外的其他功能
Posted
技术标签:
【中文标题】AWS:通过使用 amazon-data-pipeline 将数据从 S3 传输到 Redshift 来实现除 COPY 之外的其他功能【英文标题】:AWS: Other function than COPY by transferring data from S3 to Redshift with amazon-data-pipeline 【发布时间】:2014-08-14 13:38:15 【问题描述】:我正在尝试使用 Amazon-Data-Pipeline 工具将数据从 Amazon S3-Cloud 传输到 Amazon-Redshift。
是否可以在传输数据时使用例如更改数据?一个 SQL 语句,以便仅将 SQL 语句的结果作为 Redshift 的输入?
我只找到了这样的复制命令:
"id": "S3Input",
"type": "S3DataNode",
"schedule":
"ref": "MySchedule"
,
"filePath": "s3://example-bucket/source/inputfile.csv"
,
来源:https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-get-started-copy-data-cli.html
【问题讨论】:
【参考方案1】:是的,这是可能的。有两种方法:
-
使用
transformSQL
的RedShiftCopyActivity
transformSQL
如果在及时加载的记录范围内执行转换,则很有用,例如每天或每小时。这样一来,更改只会应用于批处理而不是整个表。
这是文档的摘录:
transformSql: 用于转换输入数据的 SQL SELECT 表达式。当您从 DynamoDB 或 Amazon S3 复制数据时,AWS Data Pipeline 会创建一个名为 staging 的表并最初将其加载到其中。此表中的数据用于更新目标表。如果指定了 transformSql 选项,则从指定的 SQL 语句创建第二个临时表。然后在最终目标表中更新第二个临时表中的数据。所以transformSql必须在staging表上运行,transformSql的输出schema必须与最终目标表的schema匹配。
请在下面找到一个使用 transformSql 的示例。请注意,选择来自staging
表。它将有效地运行CREATE TEMPORARY TABLE staging2 AS SELECT <...> FROM staging;
。此外,所有字段都必须包含并匹配 RedShift DB 中的现有表。
"id": "LoadUsersRedshiftCopyActivity",
"name": "Load Users",
"insertMode": "OVERWRITE_EXISTING",
"transformSql": "SELECT u.id, u.email, u.first_name, u.last_name, u.admin, u.guest, CONVERT_TIMEZONE('US/Pacific', cs.created_at_pst) AS created_at_pst, CONVERT_TIMEZONE('US/Pacific', cs.updated_at_pst) AS updated_at_pst FROM staging u;",
"type": "RedshiftCopyActivity",
"runsOn":
"ref": "OregonEc2Resource"
,
"schedule":
"ref": "HourlySchedule"
,
"input":
"ref": "OregonUsersS3DataNode"
,
"output":
"ref": "OregonUsersDashboardRedshiftDatabase"
,
"onSuccess":
"ref": "LoadUsersSuccessSnsAlarm"
,
"onFail":
"ref": "LoadUsersFailureSnsAlarm"
,
"dependsOn":
"ref": "BewteenRegionsCopyActivity"
-
使用
script
的SqlActivity
SqlActivity 允许对整个数据集进行操作,并且可以通过dependsOn
机制安排在特定事件之后运行
"name": "Add location ID",
"id": "AddCardpoolLocationSqlActivity",
"type": "SqlActivity",
"script": "INSERT INTO locations (id) SELECT 100000 WHERE NOT EXISTS (SELECT * FROM locations WHERE id = 100000);",
"database":
"ref": "DashboardRedshiftDatabase"
,
"schedule":
"ref": "HourlySchedule"
,
"output":
"ref": "LocationsDashboardRedshiftDatabase"
,
"runsOn":
"ref": "OregonEc2Resource"
,
"dependsOn":
"ref": "LoadLocationsRedshiftCopyActivity"
【讨论】:
【参考方案2】:RedshiftCopyActivity 中有一个名为“transformSql”的可选字段。
http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-redshiftcopyactivity.html
我没有亲自使用过这个,但从它的外观来看,它似乎 - 你将把你的 s3 数据放在一个临时表中,这个 sql stmt 将返回转换后的数据以供 redshift 插入。
因此,无论您是否正在转换该字段,您都需要在 select 中列出所有字段。
【讨论】:
【参考方案3】:AWS Datapipeline SqlActivity
"id" : "mysqlActivity",
"type" : "SqlActivity",
"database" : "ref": "MyDatabase" ,
"script" : "insert into AnalyticsTable (select (cast(requestEndTime as bigint) - cast(requestBeginTime as bigint)) as requestTime, hostname from StructuredLogs where hostname LIKE '%.domain.sfx');",
"schedule" : "ref": "Hour" ,
"queue" : "priority"
所以基本上在 “脚本” 任何 sql 脚本/转换/命令 Amazon Redshift SQL Commands
transformSql 很好,但只支持用于转换输入数据的 SQL SELECT 表达式。参考:RedshiftCopyActivity
【讨论】:
以上是关于AWS:通过使用 amazon-data-pipeline 将数据从 S3 传输到 Redshift 来实现除 COPY 之外的其他功能的主要内容,如果未能解决你的问题,请参考以下文章
如何通过忽略未使用的资源从 aws cli 创建 aws java lambda 函数?
通过使用 AWS-SDK PHP 生成的预签名帖子拒绝 AWS S3 上传访问
通过使用 AWS-SDK PHP 生成的预签名帖子拒绝 AWS S3 上传访问
通过 jclouds 使用 AWS (S3) - 如何承担角色