如何使用自动化或命令行工具管理 AWS Redshift 中的存储过程?
Posted
技术标签:
【中文标题】如何使用自动化或命令行工具管理 AWS Redshift 中的存储过程?【英文标题】:How to manage stored procedure in AWS Redshift using an automation or command line tool? 【发布时间】:2021-10-18 14:14:45 【问题描述】:我们已经在 AWS 上的 Redshift 中构建了许多存储过程。 我们需要下载和上传这些存储过程,以便将它们保存在 GITHUB 中作为跟踪更改的一种手段。
这些程序最终需要成为 cloudformation 模板的一部分,这样基础架构也可以得到维护。
理想情况下,这可以使用 AWS CLI 完成,但似乎没有执行此操作的命令。
如何在自动化/CICD 环境中管理 AWS RedShift 存储过程?
【问题讨论】:
【参考方案1】:我有一部分可行的解决方案。
import json
import psycopg2
import os
def run_sql_commands(input_command, database="mydatabasename", host_port=1234 ,host_url="datawarehouse.redshift.amazonaws.com"):
"""
:param input_command: sql string to execute
:param database: which database to run the query
:return:
"""
results = None
# db_user and db_pass will need to be set as environment variables
db_user = os.environ["db_user"]
db_pass = os.environ["db_pass"]
db_host = host_url
db_port = host_port
db_name = database
try:
conn = psycopg2.connect(
"dbname= port= user= host= password=".format(db_name, db_port, db_user, db_host, db_pass))
cursor = conn.cursor()
cursor.execute(input_command)
results = cursor.fetchall()
except Exception as e:
return None
return results
def get_arg_type(oid, database):
sql = f"SELECT typname FROM pg_catalog.pg_type WHERE oid=oid"
r = run_sql_commands(sql, database)
return r[0][0]
def download_all_procedures(database, file_location="./local-code-store"):
get_all_stored_procedure_sql = """
SELECT
n.nspname,
b.usename,
p.proname,
p.proargnames,
p.proargtypes,
p.prosrc
FROM
pg_catalog.pg_namespace n
JOIN pg_catalog.pg_proc p ON
pronamespace = n.oid
JOIN pg_user b ON
b.usesysid = p.proowner
WHERE
nspname NOT IN ('information_schema', 'pg_catalog');
"""
r = run_sql_commands(get_all_stored_procedure_sql, database)
counted_items=[]
for item in r:
table = item[0]
author = item[1]
procedure_name = item[2]
procedure_arguments = item[3]
procedure_argtypes = item[4]
procedure = item[5]
t_list = []
for this_oid in procedure_argtypes.split():
t = get_arg_type(this_oid, database="mydatabasename")
t_list.append(t)
meta_data = 'table': table,
'author': author,
'arguments': procedure_arguments,
'argument_types': t_list
filename = f'file_location/database/Schemas/table/procedure_name.sql'
os.makedirs(os.path.dirname(filename), exist_ok=True)
f = open(filename, 'w')
count = f.write(procedure.replace('\r\n', '\n'))
f.close()
filename = f'file_location/database/Schemas/table/procedure_name.json'
os.makedirs(os.path.dirname(filename), exist_ok=True)
f = open(filename, 'w')
counted_items.append(f.write(json.dumps(meta_data)))
f.close()
return counted_items
if __name__ == "__main__":
print("Starting...")
c = download_all_procedures("mydatabase")
print("... finished")
【讨论】:
以上是关于如何使用自动化或命令行工具管理 AWS Redshift 中的存储过程?的主要内容,如果未能解决你的问题,请参考以下文章
无法在命令行或管理仪表板中创建 AWS Elastic Beanstalk 环境