psycopg2.errors.FeatureNotSupported:Redshift 表不支持指定的类型或函数(每个 INFO 消息一个)
Posted
技术标签:
【中文标题】psycopg2.errors.FeatureNotSupported:Redshift 表不支持指定的类型或函数(每个 INFO 消息一个)【英文标题】:psycopg2.errors.FeatureNotSupported: Specified types or functions (one per INFO message) not supported on Redshift tables 【发布时间】:2021-08-17 10:48:52 【问题描述】:我尝试从 AWS Managed AirFlow 获取对 AWS RedShift 的测试查询:
查询:
AWS_GET_DATA_FROM_REDSHIFT = """('SELECT * FROM information_schema.tables;')"""
stage_data_from_redshift_to_s3 = FromRedshiftToS3TransferOperator(
task_id=f'Stage_unload_SCHEMA_TABLE_from_redshift_to_s3_S3_BUCKET',
dag=dag,
table=TABLE,
s3_bucket=S3_BUCKET,
s3_prefix=f'SCHEMA_TABLE',
select_query=AWS_GET_DATA_FROM_REDSHIFT,
unload_options=['CSV']
)
class FromRedshiftToS3TransferOperator(BaseOperator):
"""
Executes an UNLOAD command to s3 as a CSV with headers
:param schema: reference to a specific schema in redshift database
:type schema: str
:param table: reference to a specific table in redshift database
:type table: str
:param s3_bucket: reference to a specific S3 bucket
:type s3_bucket: str
:param s3_key: reference to a specific S3 key
:type s3_key: str
:param redshift_conn_id: reference to a specific redshift database
:type redshift_conn_id: str
:param aws_conn_id: reference to a specific S3 connection
:type aws_conn_id: str
:param verify: Whether or not to verify SSL certificates for S3 connection.
By default SSL certificates are verified.
You can provide the following values:
- ``False``: do not validate SSL certificates. SSL will still be used
(unless use_ssl is False), but SSL certificates will not be
verified.
- ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses.
You can specify this argument if you want to use a different
CA cert bundle than the one used by botocore.
:type verify: bool or str
:param unload_options: reference to a list of UNLOAD options
:type unload_options: list
:param autocommit: If set to True it will automatically commit the UNLOAD statement.
Otherwise it will be committed right before the redshift connection gets closed.
:type autocommit: bool
:param include_header: If set to True the s3 file contains the header columns.
:type include_header: bool
"""
ui_color = '#8EB6D4'
@apply_defaults
def __init__(self,
table,
s3_bucket,
s3_prefix,
select_query,
redshift_conn_id='redshift',
aws_conn_id='aws_credentials',
unload_options=tuple(),
autocommit=False,
include_header=False,
*args, **kwargs):
super(FromRedshiftToS3TransferOperator, self).__init__(*args, **kwargs)
self.table = table
self.s3_bucket = s3_bucket
self.s3_prefix = s3_prefix
self.select_query = select_query
self.redshift_conn_id = redshift_conn_id
self.aws_conn_id = aws_conn_id
self.unload_options = unload_options
self.autocommit = autocommit
self.include_header = include_header
if self.include_header and 'HEADER' not in [uo.upper().strip() for uo in self.unload_options]:
self.unload_options = list(self.unload_options) + ['HEADER', ]
def execute(self, context):
aws_hook = AwsHook("aws_credentials")
credentials = aws_hook.get_credentials()
redshift_hook = PostgresHook("redshift")
self.log.info(f'Preparing to stage data from self.select_query to self.s3_bucket/self.s3_prefix...')
unload_query = """
UNLOAD select_query
TO 's3://s3_bucket/s3_prefix/table_'
with credentials
'aws_access_key_id=access_key;aws_secret_access_key=secret_key'
unload_options;
""".format(select_query=self.select_query,
s3_bucket=self.s3_bucket,
s3_prefix=self.s3_prefix,
table=self.table,
access_key=credentials.access_key,
secret_key=credentials.secret_key,
unload_options='\n\t\t\t'.join(self.unload_options))
self.log.info(f'credentials.access_key')
self.log.info(f'credentials.secret_key')
self.log.info('Executing UNLOAD command...')
redshift_hook.run(unload_query, self.autocommit)
self.log.info("UNLOAD command complete.")
并得到一个错误:
[2021-08-17 10:40:50,186] taskinstance.py:1150 ERROR - Specified types or functions (one per INFO message) not supported on Redshift tables.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/airflow/dags/ownoperators/aws_from_redshift_to_s3_operator.py", line 95, in execute
redshift_hook.run(unload_query, self.autocommit)
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py", line 175, in run
cur.execute(s)
psycopg2.errors.FeatureNotSupported: Specified types or functions (one per INFO message) not supported on Redshift tables.
【问题讨论】:
【参考方案1】:此错误是由 Redshift 生成的,并且在大多数情况下,它是当您的查询使用仅领导节点的函数时(例如 generater_series() - 其中有很多)。查看您的 select_query 代码并检查调用的函数在计算节点上是否有效(在工作台中运行查询)。如果您发布查询,很乐意提供帮助。问题在于 SQL,而不是您发布的代码。
问题的根源是计算节点在执行期间需要领导节点信息,并且不支持此路由。这可能有多种原因,每种都有解决方法。
【讨论】:
以上是关于psycopg2.errors.FeatureNotSupported:Redshift 表不支持指定的类型或函数(每个 INFO 消息一个)的主要内容,如果未能解决你的问题,请参考以下文章