如何在 psycopg2 标识符中使用 redshift 复制命令
Posted
技术标签:
【中文标题】如何在 psycopg2 标识符中使用 redshift 复制命令【英文标题】:How to use redshift copy command in psycopg2 identifier 【发布时间】:2021-08-16 10:46:41 【问题描述】:我将执行COPY
命令的以下python lambda代码设置为redshift。
bucket_name = get_bucket_name(event)
file_name = get_file_name(event)
table = bucket_name.replace('-','_')
url = f's3://bucket_name/file_name'
query = sql.SQL(''' truncate test_Schema.;
commit;
copy test_Schema.
from
iam_role 'arn:aws:iam::123344:role/test'
timeformat as 'auto'
ACCEPTINVCHARS
CSV;''').format(*map(sql.Identifier,(table,table,url)))
print(query)
cur = con.cursor()
cur.execute(query, (url,))
con.commit()
通过使用标识符,我可以使用变量并设置 S3 存储桶目标。
但它返回了以下错误。看来双引号引起了错误..
truncate test_Schema."test";
commit;
copy jp_icqa_ddl."operation_defect_detail"
from "s3://test_Schema/test_20210816.csv"
iam_role 'arn:aws:iam::123344:role/test'
timeformat as 'auto'
ACCEPTINVCHARS
CSV;
[ERROR] SyntaxError: syntax error at or near ""s3://test_Schema/test_20210816.csv""
LINE 4: from "s3://test_Schema/test_20210816...
^
Traceback (most recent call last):
File "/var/task/s3-to-redshift.py", line 63, in lambda_handler
cur.execute(query, (url,))
有什么办法可以避免吗?
如何处理标识符中的双引号?
谢谢
【问题讨论】:
在cur = con.cursor()
之前执行print(query)
- 您将在cloudwatch 中获得输出。分享输出。
在创建查询时尝试使用 f 字符串。它使代码更具可读性。见realpython.com/python-f-strings
【参考方案1】:
虽然这不是您问题的确切答案,但这里有一个更好的解决方案,不需要数据库连接字符串。
我建议使用 lambda 中的 Redshift Data API 将数据从 S3 加载到 Redshift。
你可以去掉 psycopgs2
包并在 lambda 中使用内置的 boto3
包。
这将异步运行复制查询,并且 lambda 函数不会花费超过几秒钟的时间来运行它。
我使用sentry_sdk 从 lambda 获取运行时错误通知。
import boto3
import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration
sentry_sdk.init(
"https://aaaaaa@aaaa.ingest.sentry.io/aaaaaa",
integrations=[AwsLambdaIntegration(timeout_warning=True)],
traces_sample_rate=0
)
def execute_redshift_query(sql):
data_client = boto3.client('redshift-data')
data_client.execute_statement(
ClusterIdentifier='redshift-cluster-test',
Database='db',
DbUser='db_user',
Sql=sql,
StatementName='Test query',
WithEvent=True,
)
def handler(event, context):
query = """
copy schema.test_table
from 's3://test-bucket/test.csv'
IAM_ROLE 'arn:aws:iam::1234567890:role/TestRole'
region 'us-east-1'
ignoreheader 1 csv delimiter ','
"""
execute_redshift_query(query)
return True
如果复制查询失败,另一个 lambda 函数会发送错误通知。 您可以使用下面屏幕截图中的规则添加 EventBridge lambda 触发器。
这里是发送错误通知的 lambda 代码。
import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration
sentry_sdk.init(
"https://aaaa@aaa.ingest.sentry.io/aaaaa",
integrations=[AwsLambdaIntegration(timeout_warning=True)],
traces_sample_rate=0
)
def lambda_handler(event, context):
try:
if event["detail"]["state"] != "FINISHED":
raise ValueError(str(event))
except Exception as e:
sentry_sdk.capture_exception(e)
return True
您可以使用第一个 lambda 函数中定义的StatementName
来识别哪个复制查询失败。
希望对你有帮助。
【讨论】:
以上是关于如何在 psycopg2 标识符中使用 redshift 复制命令的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Psycopg2 在 Redshift Spectrum 中添加分区 -