如何编写 AWS Glue 脚本以将新数据插入 Redshift 表

Posted

技术标签:

【中文标题】如何编写 AWS Glue 脚本以将新数据插入 Redshift 表【英文标题】:How to Write AWS Glue Script to Insert New Data into a Redshift Table 【发布时间】:2020-06-26 17:13:08 【问题描述】:

我是 AWS Glue 的新手,我想创建一个作业,该作业将采用我编写的 SQL 脚本(INSERT INTO 语句)并填充我在 Redshift 中的一个空表。这可能吗?如果有,语法是什么?

我从一个测试用例开始。将数据从我的 Redshift 中的一个表复制到另一个。

这是 AWS 提出的脚本。我选择了“更改架构”选项,因为我想创建一个新的目标数据集。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "dev", table_name = "patients", redshift_tmp_dir = TempDir, transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "dev", table_name = "patients", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("birthdate", "string", "date of birth", "string"), ("_id", "string", "patient id", "string"), ("name_middle", "string", "patient middle name", "string"), ("gender", "string", "gender", "string"), ("name_family", "string", "patient last name", "string"), ("name_given", "string", "patient first name", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("birthdate", "string", "date of birth", "string"), ("_id", "string", "patient id", "string"), ("name_middle", "string", "patient middle name", "string"), ("gender", "string", "gender", "string"), ("name_family", "string", "patient last name", "string"), ("name_given", "string", "patient first name", "string")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["gender", "patient middle name", "patient last name", "patient first name", "patient id", "date of birth"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["gender", "patient middle name", "patient last name", "patient first name", "patient id", "date of birth"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "dev", table_name = "patients_info", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "dev", table_name = "patients_info", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "dev", table_name = "patients_info", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "dev", table_name = "patients_info", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()

然后我尝试了一个仍然失败的简单用例:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

persons = glueContext.create_dynamic_frame.from_catalog(
    database = "dev", 
    table_name = "patients", 
    redshift_tmp_dir = args["TempDir"], 
    additional_options = "aws_iam_role": "arn:aws:iam::account-id:role/role-name") 

print "Count: ", persons.count()
persons.printSchema()

【问题讨论】:

您能否发布您想要运行的查询,而您的用例只是将一些值插入到已存在于 redshift 中的表中? 您能否提供有关表格列的更多信息?还是插入语句的示例? 我添加了代码@PrabhakarReddy 我提供了更多信息@TomTom 【参考方案1】:

您不应该考虑将“插入”作为将数据写入 redshift 的一种方式,它非常慢。

正确的流程是:

    将数据写入 s3 使用 redshift COPY 命令将数据从 s3 复制到 redshift

您应该仔细阅读此 AWS 文档。 https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-redshift.html

【讨论】:

那么使用胶水这段代码将数据从S3写入Redshift?我对如何定制代码 sn-p 有点困惑。数据库=“我的数据库名称”?我需要创建一个 TempDir 吗?如果有怎么办?谢谢。 您应该仔细遵循 AWS Doc。此代码将您的动态数据框写入 Glue to Redshift。您确实需要创建 S3 存储桶。是的,您使用您的数据库名称。忽略您在问题中提到的那篇付费专区文章。 谢谢。如果您能提供指导,我添加了两个对我来说失败的代码 sn-ps。 我无法比提供的 aws 说明更好或更清楚地解释它。 写动态帧和复制命令完全不同。您还遗漏了有关如何在 s3 中抓取元数据、更新分区、书签等的所有内容。上面的内容只是一遍又一遍地复制整个 S3 路径。这是一个糟糕的设计。此外,Redshift 文档特别指出,在使用临时表时,“插入”比副本更快。

以上是关于如何编写 AWS Glue 脚本以将新数据插入 Redshift 表的主要内容,如果未能解决你的问题,请参考以下文章

如何在 AWS-Glue 脚本中编写用户定义的函数?

我们可以用 Java 为 AWS Glue 编写脚本吗

如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?

aws glue / pyspark - 如何使用 Glue 以编程方式创建 Athena 表

如何仅为 AWS Glue 脚本启用粘合日志记录

aws Glue / Redshift 的预过滤解决方案(在加载到 S3 之前)