Python/Pyspark 迭代代码(用于 AWS Glue ETL 作业)
Posted
技术标签:
【中文标题】Python/Pyspark 迭代代码(用于 AWS Glue ETL 作业)【英文标题】:Python/Pyspark iteration code (for AWS Glue ETL job) 【发布时间】:2020-05-28 17:40:22 【问题描述】:我正在使用 AWS Glue,如果不使用迭代,您将无法读取/写入多个动态帧。我在下面编写了这段代码,但在两件事上苦苦挣扎:
-
“tableName”即过滤后的表列表是否正确(我要迭代的所有表都以 client_historical_* 开头)。
我不知道如何使用下面的映射动态填充 Redshift 表名称。
红移映射:
client_historical_ks --> table_01_a
client_historical_kg --> table_01_b
client_historical_kt --> table_01_c
client_historical_kf --> table_01_d
代码:
client = boto3.client('glue',region_name='us-east-1')
databaseName = 'incomingdata'
tables = client.get_tables(DatabaseName = databaseName)
tableList = tables['TableList']
for table in tableList:
start_prefix = client_historical_
tableName = list(filter(lambda x: x.startswith(start_prefix), table['Name']))
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "incomingdata", table_name = tableName, transformation_ctx = "datasource0")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "Redshift", connection_options = "dbtable": "nameoftablehere", "database": "metadata", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
【问题讨论】:
start_prefix = client_historical_ ...你能把它放在引号里(start_prefix = 'client_historical_')然后试试。顺便说一句,这段代码的结果是它工作还是不工作?如果不起作用,您遇到的错误是什么,请添加更多信息 【参考方案1】:您可以创建一个映射字典,然后执行您的代码 您还可以在循环之外过滤表,然后仅在需要的表上循环。
mapping = 'client_historical_ks': 'table_01_a',
'client_historical_kg': 'table_01_b',
'client_historical_kt': 'table_01_c',
'client_historical_kf': 'table_01_d'
client = boto3.client('glue',region_name='us-east-1')
databaseName = 'incomingdata'
tables = client.get_tables(DatabaseName = databaseName)
tableList = tables['TableList']
start_prefix = 'client_historical_'
tableNames = list(filter(lambda x: x.startswith(start_prefix), table['Name']))
for table in tableNames:
target_table = mapping.get(table)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "incomingdata", table_name = table, transformation_ctx = "datasource0")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "Redshift", connection_options = "dbtable": target_table, "database": "metadata", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
【讨论】:
当我尝试这个时,我在 Glue 中遇到错误:raise ConnectTimeoutError(endpoint_url=request.url, error=e) botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "glue.us-east-1.amazonaws.com" .好像 boto3 没有按预期工作? Boto3 在胶水作业的火花外壳中不起作用。或者,您可以创建一个 lambda 来获取表格列表,然后从该 lambda 传递表格列表中调用 Glue 作业作为参数 不幸的是,我需要使用 Glue,因为需要设置数据目录。 :( 是制作单个脚本的替代方案吗?以上是关于Python/Pyspark 迭代代码(用于 AWS Glue ETL 作业)的主要内容,如果未能解决你的问题,请参考以下文章
[dfs] aw180. 排书(IDA*+dfs深入理解+思维+好题)
如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame