如何在 AWS Glue pyspark 脚本中合并两个节点
Posted
技术标签:
【中文标题】如何在 AWS Glue pyspark 脚本中合并两个节点【英文标题】:How to merge two nodes in AWS Glue pyspark script 【发布时间】:2018-04-17 04:59:43 【问题描述】:我有一堆 JSON 文件需要处理。 JSON文件的结构(例如简化)是(当aws glue crawler在这些json文件上运行时的目录架构):
root
|-- Meta: struct
| |-- DataModel: string
| |-- EventType: string
| |-- EventDateTime: string
|-- User: struct
| |-- Demographics: struct
| | |-- FirstName: string
| | |-- MiddleName: string
| | |-- LastName: string
我希望将 User.Demographics.FirstName 与 User.Demographics.MiddleName 合并或加入 和 User.Demographics.LastName。 所以最终处理的 JSON 看起来像:
root
|-- Meta: struct
| |-- DataModel: string
| |-- EventType: string
| |-- EventDateTime: string
|-- User: struct
| |-- Demographics: struct
| | |-- Name: string
我浏览了AWS glue Developers Guide describing DynamicFrams 和允许的选项,但找不到任何似乎有用的功能。
到目前为止,我有以下由 AWS Glue 自动生成的代码,但似乎不起作用。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = <db_name>, table_name = <table_name>, transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = <db_name>, table_name = <table_name>, transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: <Currently using mapping function>
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [<Mapping Tuples>], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = "path": <S3 Destination Path>, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
# datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = "path": <S3 Destination Path>, format = "json", transformation_ctx = "datasink2")
job.commit()
AWS Glue 还支持 python 的 PySpark 库,如果有办法使用 PySpark 实现这一点,请分享一些细节/链接,我可以参考它。
附: 我有一些用 Python 编写脚本的经验,但对 python 的 PySpark 库或任何其他与 ETL 相关的代码/脚本一无所知。
谢谢。
【问题讨论】:
【参考方案1】:只需使用地图转换。在map操作中作为参数传递的函数接受一个dict,你可以执行任何逻辑并返回一个更新的或新的dict。
查看示例:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-map.html
【讨论】:
【参考方案2】:您可以参考这些链接或其他链接来合并数据框中的列。 https://***.com/questions/40643550/how-to-merge-two-columns-with-a-condition-in-pyspark。您可能需要调整或修改代码以满足您的需要
【讨论】:
以上是关于如何在 AWS Glue pyspark 脚本中合并两个节点的主要内容,如果未能解决你的问题,请参考以下文章
如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?