psycopg2.errors.FeatureNotSupported:Redshift 表不支持指定的类型或函数(每个 INFO 消息一个)

Posted

技术标签:

【中文标题】psycopg2.errors.FeatureNotSupported:Redshift 表不支持指定的类型或函数(每个 INFO 消息一个)【英文标题】:psycopg2.errors.FeatureNotSupported: Specified types or functions (one per INFO message) not supported on Redshift tables 【发布时间】:2021-08-17 10:48:52 【问题描述】:

我尝试从 AWS Managed AirFlow 获取对 AWS RedShift 的测试查询:

查询:

AWS_GET_DATA_FROM_REDSHIFT = """('SELECT * FROM information_schema.tables;')"""

stage_data_from_redshift_to_s3 = FromRedshiftToS3TransferOperator(
    task_id=f'Stage_unload_SCHEMA_TABLE_from_redshift_to_s3_S3_BUCKET',
    dag=dag,
    table=TABLE,
    s3_bucket=S3_BUCKET,
    s3_prefix=f'SCHEMA_TABLE',
    select_query=AWS_GET_DATA_FROM_REDSHIFT,
    unload_options=['CSV']
)

class FromRedshiftToS3TransferOperator(BaseOperator):
    """
        Executes an UNLOAD command to s3 as a CSV with headers

        :param schema: reference to a specific schema in redshift database
        :type schema: str
        :param table: reference to a specific table in redshift database
        :type table: str
        :param s3_bucket: reference to a specific S3 bucket
        :type s3_bucket: str
        :param s3_key: reference to a specific S3 key
        :type s3_key: str
        :param redshift_conn_id: reference to a specific redshift database
        :type redshift_conn_id: str
        :param aws_conn_id: reference to a specific S3 connection
        :type aws_conn_id: str
        :param verify: Whether or not to verify SSL certificates for S3 connection.
            By default SSL certificates are verified.
            You can provide the following values:

            - ``False``: do not validate SSL certificates. SSL will still be used
                     (unless use_ssl is False), but SSL certificates will not be
                     verified.
            - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses.
                     You can specify this argument if you want to use a different
                     CA cert bundle than the one used by botocore.
        :type verify: bool or str
        :param unload_options: reference to a list of UNLOAD options
        :type unload_options: list
        :param autocommit: If set to True it will automatically commit the UNLOAD statement.
            Otherwise it will be committed right before the redshift connection gets closed.
        :type autocommit: bool
        :param include_header: If set to True the s3 file contains the header columns.
        :type include_header: bool
    """

    ui_color = '#8EB6D4'

    @apply_defaults
    def __init__(self,
                 table,
                 s3_bucket,
                 s3_prefix,
                 select_query,
                 redshift_conn_id='redshift',
                 aws_conn_id='aws_credentials',
                 unload_options=tuple(),
                 autocommit=False,
                 include_header=False,
                 *args, **kwargs):
        super(FromRedshiftToS3TransferOperator, self).__init__(*args, **kwargs)
        self.table = table
        self.s3_bucket = s3_bucket
        self.s3_prefix = s3_prefix
        self.select_query = select_query
        self.redshift_conn_id = redshift_conn_id
        self.aws_conn_id = aws_conn_id
        self.unload_options = unload_options
        self.autocommit = autocommit
        self.include_header = include_header

        if self.include_header and 'HEADER' not in [uo.upper().strip() for uo in self.unload_options]:
            self.unload_options = list(self.unload_options) + ['HEADER', ]

    def execute(self, context):
        aws_hook = AwsHook("aws_credentials")
        credentials = aws_hook.get_credentials()
        redshift_hook = PostgresHook("redshift")

        self.log.info(f'Preparing to stage data from self.select_query to self.s3_bucket/self.s3_prefix...')

        unload_query = """
                    UNLOAD select_query
                    TO 's3://s3_bucket/s3_prefix/table_'
                    with credentials
                    'aws_access_key_id=access_key;aws_secret_access_key=secret_key'
                    unload_options;
                """.format(select_query=self.select_query,
                           s3_bucket=self.s3_bucket,
                           s3_prefix=self.s3_prefix,
                           table=self.table,
                           access_key=credentials.access_key,
                           secret_key=credentials.secret_key,
                           unload_options='\n\t\t\t'.join(self.unload_options))

        self.log.info(f'credentials.access_key')
        self.log.info(f'credentials.secret_key')
        self.log.info('Executing UNLOAD command...')
        redshift_hook.run(unload_query, self.autocommit)
        self.log.info("UNLOAD command complete.")

并得到一个错误:

[2021-08-17 10:40:50,186] taskinstance.py:1150 ERROR - Specified types or functions (one per INFO message) not supported on Redshift tables.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/airflow/dags/ownoperators/aws_from_redshift_to_s3_operator.py", line 95, in execute
    redshift_hook.run(unload_query, self.autocommit)
  File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py", line 175, in run
    cur.execute(s)
psycopg2.errors.FeatureNotSupported: Specified types or functions (one per INFO message) not supported on Redshift tables.

【问题讨论】:

【参考方案1】:

此错误是由 Redshift 生成的,并且在大多数情况下,它是当您的查询使用仅领导节点的函数时(例如 generater_series() - 其中有很多)。查看您的 select_query 代码并检查调用的函数在计算节点上是否有效(在工作台中运行查询)。如果您发布查询,很乐意提供帮助。问题在于 SQL,而不是您发布的代码。

问题的根源是计算节点在执行期间需要领导节点信息,并且不支持此路由。这可能有多种原因,每种都有解决方法。

【讨论】:

以上是关于psycopg2.errors.FeatureNotSupported:Redshift 表不支持指定的类型或函数(每个 INFO 消息一个)的主要内容,如果未能解决你的问题,请参考以下文章