如何在 AWS Glue PySpark 中运行并行线程?

Posted

技术标签:

【中文标题】如何在 AWS Glue PySpark 中运行并行线程?【英文标题】:How to run parallel threads in AWS Glue PySpark? 【发布时间】:2020-10-24 10:28:11 【问题描述】:

我有一个 spark 作业,它只会从具有相同转换的多个表中提取数据。基本上是一个 for 循环,它遍历表列表、查询目录表、添加时间戳,然后推入 Redshift(下面的示例)。

完成这项工作大约需要 30 分钟。有没有办法在相同的火花/胶水环境下并行运行这些?如果可以避免的话,我不想创建单独的胶水作业。

import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *


# query the runtime arguments
args = getResolvedOptions(
    sys.argv,
    ["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)

# build the job session and context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# set the job execution timestamp
job_execution_timestamp = datetime.datetime.utcnow()

tables = []

for table in tables:
    catalog_table = glueContext.create_dynamic_frame.from_catalog(
        database="test", table_name=table, transformation_ctx=table
    )
    data_set = catalog_table.toDF().withColumn(
        "batchLoadTimestamp", lit(job_execution_timestamp)
    )

    # covert back to glue dynamic frame
    export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")

    # remove null rows from dynamic frame
    non_null_records = DropNullFields.apply(
        frame=export_frame, transformation_ctx="non_null_records"
    )

    temp_dir = os.path.join(args["TempDir"], redshift_table_name)

    stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=non_null_records,
        catalog_connection=args["redshift_catalog_connection"],
        connection_options=
            "dbtable": f"args['target_schema'].redshift_table_name",
            "database": args["target_database"],
            "preactions": f"truncate table args['target_schema'].redshift_table_name;",
        ,
        redshift_tmp_dir=temp_dir,
        transformation_ctx="stores_redshiftSink",
    ) ```

【问题讨论】:

【参考方案1】:

您可以执行以下操作来加快此过程

    启用作业的并发执行。 分配足够数量的 DPU。 将表列表作为参数传递 使用 Glue 工作流或步进函数并行执行作业。

现在假设您有 100 个表要摄取,您可以将列表分成 10 个表并同时运行该作业 10 次。

由于您的数据将被并行加载,因此 Glue 作业运行的时间将减少,因此将产生更少的成本。

另一种更快的方法是直接使用 redshift 实用程序。

    在 redshift 中创建表并将 batchLoadTimestamp 列默认为 current_timestamp。 现在创建复制命令并将数据直接从 s3 加载到表中。 利用 pg8000 使用 Glue python shell 作业运行复制命令。

为什么这种方法会更快? 因为 spark redshift jdbc 连接器首先将 spark 数据帧卸载到 s3,然后准备复制命令到 redshift 表。在直接运行复制命令时,您将消除运行卸载命令的开销并将数据读取到 spark df 中。

【讨论】:

两种方法都非常适合我的用例。谢谢!

以上是关于如何在 AWS Glue PySpark 中运行并行线程?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 AWS Glue pyspark 脚本中合并两个节点

AWS Glue ETL 作业中的 Boto3 Glue

在运行 AWS Glue ETL 作业并命名输出文件名时,有没有办法从 S3 存储桶中读取文件名。 pyspark 是不是提供了一种方法来做到这一点?

如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?

AWS Glue PySpark 替换 NULL

如何从aws glue pyspark作业中的嵌套数组中提取数据