如何在 psycopg2 标识符中使用 redshift 复制命令

Posted

技术标签:

【中文标题】如何在 psycopg2 标识符中使用 redshift 复制命令【英文标题】:How to use redshift copy command in psycopg2 identifier 【发布时间】:2021-08-16 10:46:41 【问题描述】:

我将执行COPY命令的以下python lambda代码设置为redshift。

bucket_name = get_bucket_name(event)
file_name = get_file_name(event)

table = bucket_name.replace('-','_')
url = f's3://bucket_name/file_name'
    query = sql.SQL(''' truncate test_Schema.;
            commit;
            copy test_Schema.
            from 
            iam_role 'arn:aws:iam::123344:role/test'
            timeformat as 'auto'
            ACCEPTINVCHARS
            CSV;''').format(*map(sql.Identifier,(table,table,url)))
    
    print(query)
    cur = con.cursor()
    cur.execute(query, (url,))
    con.commit()
    

通过使用标识符,我可以使用变量并设置 S3 存储桶目标。

但它返回了以下错误。看来双引号引起了错误..

truncate test_Schema."test";
commit;
copy jp_icqa_ddl."operation_defect_detail"
from "s3://test_Schema/test_20210816.csv"
iam_role 'arn:aws:iam::123344:role/test'
timeformat as 'auto'
ACCEPTINVCHARS
CSV; 

[ERROR] SyntaxError: syntax error at or near ""s3://test_Schema/test_20210816.csv""
LINE 4:             from "s3://test_Schema/test_20210816...
                         ^

Traceback (most recent call last):
  File "/var/task/s3-to-redshift.py", line 63, in lambda_handler
    cur.execute(query, (url,))

有什么办法可以避免吗?

如何处理标识符中的双引号?

谢谢

【问题讨论】:

cur = con.cursor() 之前执行print(query) - 您将在cloudwatch 中获得输出。分享输出。 在创建查询时尝试使用 f 字符串。它使代码更具可读性。见realpython.com/python-f-strings 【参考方案1】:

虽然这不是您问题的确切答案,但这里有一个更好的解决方案,不需要数据库连接字符串。

我建议使用 lambda 中的 Redshift Data API 将数据从 S3 加载到 Redshift。 你可以去掉 psycopgs2 包并在 lambda 中使用内置的 boto3 包。

这将异步运行复制查询,并且 lambda 函数不会花费超过几秒钟的时间来运行它。

我使用sentry_sdk 从 lambda 获取运行时错误通知。

import boto3
import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration

sentry_sdk.init(
    "https://aaaaaa@aaaa.ingest.sentry.io/aaaaaa",
    integrations=[AwsLambdaIntegration(timeout_warning=True)],
    traces_sample_rate=0
)


def execute_redshift_query(sql):
    data_client = boto3.client('redshift-data')
    data_client.execute_statement(
        ClusterIdentifier='redshift-cluster-test',
        Database='db',
        DbUser='db_user',
        Sql=sql,
        StatementName='Test query',
        WithEvent=True,
    )


def handler(event, context):
    query = """
    copy schema.test_table
    from 's3://test-bucket/test.csv'
    IAM_ROLE 'arn:aws:iam::1234567890:role/TestRole'
    region 'us-east-1'
    ignoreheader 1 csv delimiter ','
    """
    execute_redshift_query(query)
    return True

如果复制查询失败,另一个 lambda 函数会发送错误通知。 您可以使用下面屏幕截图中的规则添加 EventBridge lambda 触发器。

这里是发送错误通知的 lambda 代码。

import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration

sentry_sdk.init(
    "https://aaaa@aaa.ingest.sentry.io/aaaaa",
    integrations=[AwsLambdaIntegration(timeout_warning=True)],
    traces_sample_rate=0
)


def lambda_handler(event, context):
    try:
        if event["detail"]["state"] != "FINISHED":
            raise ValueError(str(event))
    except Exception as e:
        sentry_sdk.capture_exception(e)
    return True

您可以使用第一个 lambda 函数中定义的StatementName 来识别哪个复制查询失败。

希望对你有帮助。

【讨论】:

以上是关于如何在 psycopg2 标识符中使用 redshift 复制命令的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Psycopg2 在 Redshift Spectrum 中添加分区 -

如何在 psycopg2 中链接多个语句?

psycopg2 - 字段“id”缺少数据时出错?

如何在 Python 上使用“pip”安装 psycopg2?

psycopg2“选择更新”

如何使用 pandas sqlalchemy 和 psycopg2 处理 NaT