如何使用 Psycopg2 在 Redshift Spectrum 中添加分区 -

Posted

技术标签:

【中文标题】如何使用 Psycopg2 在 Redshift Spectrum 中添加分区 -【英文标题】:How can I use Psycopg2 to add Partition in Redshift Spectrum - 【发布时间】:2017-10-19 16:12:46 【问题描述】:

我们有一个基于 S3 数据构建的 Redshift Spectrum 表 - 我们正在尝试在此表中自动添加分区 - 我可以在 redshift 客户端或 psql shell 中运行以下 ALTER 语句:

ALTER TABLE analytics_spectrum.page_view ADD PARTITION(date='2017-10-17') LOCATION 's3://data-hub/page_view/2017/10/17/';

但这无法通过 psycopg2 执行。

sql_query = "ALTER TABLE analytics_spectrum.page_view ADD PARTITION(date='2017-10-17') LOCATION 's3://data-hub/_page_view_v3/2017/10/17/';"
import config
import psycopg2
connection = psycopg2.connect(
            **config.DATABASES['redshift_db']["connection"])
cursor = connection.cursor()
cursor.execute(sql_query)

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
psycopg2.ProgrammingError: syntax error at or near "("
LINE 1: ...ABLE analytics_spectrum.page_view ADD PARTITION(date='201...

在 psycopg2 的情况下,它甚至不会将查询发送到 redshift,并且在查询解析中执行失败。

目前我已经实现了使用 subprocess.popen 来执行 alter 语句 - 但我想将其切换回使用 psycopg2。

p = subprocess.Popen(['psql',
                      '-h', self.spectrum_connection['host'],
                      '-p', self.spectrum_connection['port'],
                      '-d', self.spectrum_connection['dbname'],
                      '-U', self.spectrum_connection['user'],
                      '-c', sql_stmt],
                     env=
    'PGPASSWORD': self.spectrum_connection['password'],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
out, err = p.communicate()

建议/想法?

谢谢, 侯赛因·博拉

【问题讨论】:

看一看。 ***.com/a/47217546/3957916 上面的帖子讨论了如何在 Redshift Spectrum 表中添加分区 - 但它没有说明使用 python 和 psycopg2。 【参考方案1】:

我有同样的问题。不使用 ISOLATION_LEVEL_AUTOCOMMIT 的查询执行会引发以下错误:

psycopg2.InternalError: ALTER EXTERNAL TABLE cannot run inside a transaction block

我稍微修改了我的代码,它工作了。

import argparse
import sys, psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

input_data = 
input_data["db_name"] = <<DB_NAME>>
input_data["db_host"] = <<HOST_NAME>>
input_data["db_port"] = 5439
input_data["db_user"] = <<USER>>
input_data["db_pass"] = <<PASSWORD>>
con = psycopg2.connect(dbname=input_data["db_name"], host=input_data["db_host"], port=input_data["db_port"], user=input_data["db_user"], password=input_data["db_pass"])
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
query = <<ADD_YOUR_QUERY_HERE>>
cur.execute(query)
cur.close() 
con.close()

【讨论】:

【参考方案2】:

在您的查询中,您必须首先添加set autocommit=on; 才能通过交易块。

然后魔法就会发生,你可以对你的表进行分区。

【讨论】:

以上是关于如何使用 Psycopg2 在 Redshift Spectrum 中添加分区 -的主要内容,如果未能解决你的问题,请参考以下文章

尝试使用 psycopg2.sql 在 python 中创建 Redshift 表

使用带有 Lambda 的 psycopg2 插入 Redshift (Python)

Psycopg2 是不是允许使用 Python 在 redshift 上运行 udf create 查询?

无法使用 Psycopg2 在 Amazon Redshift 中创建表

psycopg2、Redshift 和 unittest 的并发问题

使用 Psycopg2 将 Spark DataFrame 写入 Redshift 时出错:无法腌制 psycopg2.extensions.cursor 对象