如何在 AWS Glue pyspark 脚本中合并两个节点

Posted

技术标签:

【中文标题】如何在 AWS Glue pyspark 脚本中合并两个节点【英文标题】:How to merge two nodes in AWS Glue pyspark script 【发布时间】:2018-04-17 04:59:43 【问题描述】:

我有一堆 JSON 文件需要处理。 JSON文件的结构(例如简化)是(当aws glue crawler在这些json文件上运行时的目录架构):

root
|-- Meta: struct
|    |-- DataModel: string
|    |-- EventType: string
|    |-- EventDateTime: string
|-- User: struct
|    |-- Demographics: struct
|    |    |-- FirstName: string
|    |    |-- MiddleName: string
|    |    |-- LastName: string

我希望将 User.Demographics.FirstNameUser.Demographics.MiddleName 合并或加入 和 User.Demographics.LastName。 所以最终处理的 JSON 看起来像:

root
|-- Meta: struct
|    |-- DataModel: string
|    |-- EventType: string
|    |-- EventDateTime: string
|-- User: struct
|    |-- Demographics: struct
|    |    |-- Name: string

我浏览了AWS glue Developers Guide describing DynamicFrams 和允许的选项,但找不到任何似乎有用的功能。

到目前为止,我有以下由 AWS Glue 自动生成的代码,但似乎不起作用。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


## @type: DataSource
## @args: [database = <db_name>, table_name = <table_name>, transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = <db_name>, table_name = <table_name>, transformation_ctx = "datasource0")


## @type: ApplyMapping
## @args: <Currently using mapping function>
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [<Mapping Tuples>], transformation_ctx = "applymapping1")


## @type: DataSink
## @args: [connection_type = "s3", connection_options = "path": <S3 Destination Path>, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
# datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = "path": <S3 Destination Path>, format = "json", transformation_ctx = "datasink2")
job.commit()

AWS Glue 还支持 python 的 PySpark 库,如果有办法使用 PySpark 实现这一点,请分享一些细节/链接,我可以参考它。

附: 我有一些用 Python 编写脚本的经验,但对 python 的 PySpark 库或任何其他与 ETL 相关的代码/脚本一无所知。

谢谢。

【问题讨论】:

【参考方案1】:

只需使用地图转换。在map操作中作为参数传递的函数接受一个dict,你可以执行任何逻辑并返回一个更新的或新的dict。

查看示例:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-map.html

【讨论】:

【参考方案2】:

您可以参考这些链接或其他链接来合并数据框中的列。 https://***.com/questions/40643550/how-to-merge-two-columns-with-a-condition-in-pyspark。您可能需要调整或修改代码以满足您的需要

【讨论】:

以上是关于如何在 AWS Glue pyspark 脚本中合并两个节点的主要内容,如果未能解决你的问题,请参考以下文章

如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?

Pyspark 数据框删除 AWS Glue 脚本中的重复项

如何在 AWS-Glue 脚本中编写用户定义的函数?

如何仅为 AWS Glue 脚本启用粘合日志记录

AWS Glue PySpark 替换 NULL

如何在 AWS Glue PySpark 中运行并行线程?