为啥我的 aws 胶水作业只使用一个执行器和驱动程序?

Posted

技术标签:

【中文标题】为啥我的 aws 胶水作业只使用一个执行器和驱动程序?【英文标题】:why is my aws glue job uses only one executor and the driver?为什么我的 aws 胶水作业只使用一个执行器和驱动程序? 【发布时间】:2018-07-24 23:03:58 【问题描述】:

在我的脚本中,我将pyspark中的所有dynamicframe转换为dataframe,并进行groupbyjoin操作。然后在matrics视图中,我发现无论我设置多少DPU,只有一个executor处于活动状态。

大约 2 小时后作业失败了

诊断:容器 [pid=8417,containerID=container_1532458272694_0001_01_000001] 是 超出物理内存限制。当前使用情况:5.5 GB 的 5.5 GB 使用的物理内存;使用了 7.7 GB 的 27.5 GB 虚拟内存。杀戮 容器。

我有大约 20 亿行数据。我的DPU 设置为 80。

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "in_json", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "out_json", transformation_ctx = "datasource0")


applymapping0 = ApplyMapping.apply(frame = datasource0, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")
applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")

df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))

df1.join(df2, "fieldB")

result = DynamicFrame.fromDF(result_joined, glueContext, "result")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = result, connection_type = "s3", connection_options = "path": "s3://test-bucket", format = "json", transformation_ctx = "datasink2")
job.commit()

我错过了什么吗?

【问题讨论】:

在您按fieldA分组后,您的DataFrame是否还有列fieldB?? @botchniaque 是的。这会有什么不同? 我认为如果按一列分组,则只能访问不同列的聚合。您按fieldA 分组,并聚合为count(*),因此我认为fieldB 不是数据框的一部分。 【参考方案1】:

试试repartition你的DataFrame。您可以重新分区based on a column,或to an arbitrary number of partitions 或both。

类似这样的:

df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))

df1_r = df1.repartition(df1("fieldB"))
df2_r = df2.repartition(df2("fieldB"))

df1_r.join(df2_r, "fieldB")

【讨论】:

【参考方案2】:

原来是因为我输入的数据太大,所以一开始就卡住了,只有一个executor是活跃的。计算开始后,我看到多个执行程序处于活动状态。

df1.repartition(df1("fieldB")) 实际上让它变慢了,也许我没有正确使用它。

【讨论】:

以上是关于为啥我的 aws 胶水作业只使用一个执行器和驱动程序?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 AWS Glue 不生成火花事件日志

AWS Glue 作业生命周期 - 笔记本到作业

aws 胶水 HiveContext 访问胶水 DataCatalog

如何从 AWS 胶水访问 VPC 中的 aws 资源?

为啥只有一个 spark 作业只使用一个执行器运行?

将 AWS Glue 作业迁移到 EC2