如何使用自动化或命令行工具管理 AWS Redshift 中的存储过程?

Posted

技术标签:

【中文标题】如何使用自动化或命令行工具管理 AWS Redshift 中的存储过程?【英文标题】:How to manage stored procedure in AWS Redshift using an automation or command line tool? 【发布时间】:2021-10-18 14:14:45 【问题描述】:

我们已经在 AWS 上的 Redshift 中构建了许多存储过程。 我们需要下载和上传这些存储过程,以便将它们保存在 GITHUB 中作为跟踪更改的一种手段。

这些程序最终需要成为 cloudformation 模板的一部分,这样基础架构也可以得到维护。

理想情况下,这可以使用 AWS CLI 完成,但似乎没有执行此操作的命令。

如何在自动化/CICD 环境中管理 AWS RedShift 存储过程?

【问题讨论】:

【参考方案1】:

我有一部分可行的解决方案。

import json
import psycopg2
import os


def run_sql_commands(input_command, database="mydatabasename", host_port=1234 ,host_url="datawarehouse.redshift.amazonaws.com"):
    """
    :param input_command: sql string to execute
    :param database: which database to run the query
    :return:
    """
    results = None
    # db_user and db_pass will need to be set as environment variables
    db_user = os.environ["db_user"]
    db_pass = os.environ["db_pass"]

    db_host = host_url
    db_port = host_port
    db_name = database

    try:
        conn = psycopg2.connect(
            "dbname= port= user= host= password=".format(db_name, db_port, db_user, db_host, db_pass))

        cursor = conn.cursor()
        cursor.execute(input_command)
        results = cursor.fetchall()

    except Exception as e:
        return None

    return results


def get_arg_type(oid, database):
    sql = f"SELECT typname FROM pg_catalog.pg_type WHERE oid=oid"
    r = run_sql_commands(sql, database)
    return r[0][0]


def download_all_procedures(database, file_location="./local-code-store"):
    get_all_stored_procedure_sql = """
    SELECT
        n.nspname,
        b.usename,
        p.proname,
        p.proargnames,
        p.proargtypes,
        p.prosrc
    FROM
        pg_catalog.pg_namespace n
    JOIN pg_catalog.pg_proc p ON
        pronamespace = n.oid
    JOIN pg_user b ON
        b.usesysid = p.proowner
    WHERE
        nspname NOT IN ('information_schema', 'pg_catalog');
    """

    r = run_sql_commands(get_all_stored_procedure_sql, database)
    counted_items=[]
    for item in r:
        table = item[0]
        author = item[1]
        procedure_name = item[2]
        procedure_arguments = item[3]
        procedure_argtypes = item[4]
        procedure = item[5]
        t_list = []
        for this_oid in procedure_argtypes.split():
            t = get_arg_type(this_oid, database="mydatabasename")
            t_list.append(t)
        meta_data = 'table': table,
                     'author': author,
                     'arguments': procedure_arguments,
                     'argument_types': t_list
        filename = f'file_location/database/Schemas/table/procedure_name.sql'
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        f = open(filename, 'w')
        count = f.write(procedure.replace('\r\n', '\n'))
        f.close()
        filename = f'file_location/database/Schemas/table/procedure_name.json'
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        f = open(filename, 'w')
        counted_items.append(f.write(json.dumps(meta_data)))
        f.close()

    return counted_items


if __name__ == "__main__":
    print("Starting...")
    c = download_all_procedures("mydatabase")
    print("... finished")

【讨论】:

以上是关于如何使用自动化或命令行工具管理 AWS Redshift 中的存储过程?的主要内容,如果未能解决你的问题,请参考以下文章

无法在命令行或管理仪表板中创建 AWS Elastic Beanstalk 环境

如何从本地安装的 spark 连接到 aws-redshift?

如何使用 sqlite3.exe 命令行工具自动化进程?

windows powershell是啥

通过 CLI 管理 Jenkins Server

如何在 Linux 中使用 apt 命令管理包