无法对 AWS Glue 作业脚本中的两个表进行连接和查询

Posted

技术标签:

【中文标题】无法对 AWS Glue 作业脚本中的两个表进行连接和查询【英文标题】:Not able to put a join and query on two tables in AWS Glue job script 【发布时间】:2021-05-21 14:26:51 【问题描述】:

所以,我创建了一个 AWS 粘合作业脚本,在其中添加了两个数据源并将它们从动态帧转换为数据帧。我的目标是使用内部联接从两个表中获取查询,但我无法做到这一点。作业在查询步骤失败。我也添加了错误。请在这里帮助我。 另外,请查看下面的代码。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext,SparkConf
from pyspark.sql.window import Window
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## @type: DataSource
## @args: [database = "matomo", table_name = "matomo_matomo_log_visit", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "matomo", table_name = "matomo_matomo_log_visit", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("visit_last_action_time", "timestamp", "visit_last_action_time", "timestamp"), ("custom_dimension_1", "string", "custom_dimension_1", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("idsite" , "int" , "idsite" , "int"),("visit_last_action_time", "timestamp", "visit_last_action_time", "timestamp"), ("custom_dimension_1", "string", "custom_dimension_1", "string")], transformation_ctx = "applymapping1")

## @type: DataSource
## @args: [database = "matomo", table_name = "matomo_matomo_site", transformation_ctx = "datasource1"]
## @return: datasource1
## @inputs: []
datasource1 =  glueContext.create_dynamic_frame.from_catalog(database = "matomo", table_name = "matomo_matomo_site", transformation_ctx = "datasource1")
## @type: ApplyMapping
## @args: [mapping = [("idsite", "int", "idsite", "int"), ("main_url", "string", "main_url", "string")], transformation_ctx = "applymapping2"]
## @return: applymapping2
## @inputs: [frame = datasource1]
applymapping2 = ApplyMapping.apply(frame = datasource1, mappings = [("idsite", "int", "idsite", "int"), ("main_url", "string", "main_url", "string")], transformation_ctx = "applymapping2")

df = datasource0.toDF()
df1= datasource1.toDF()
print("hi4")
df.createOrReplaceTempView("matomoLogVisit")
df1.createOrReplaceTempView("matomoSite")
print("hi5")
consolidated_df = spark.sql("""Select  ms.main_url, ml.custom_dimension_1 , max(ml.visit_last_action_time) from
                            matomoLogVisit ml inner join matomoSite ms
                            on ms.idsite = ml.idsite
                            where ms.main_url = "abcde"
                            group by ml.custom_dimension_1""")

output_df = consolidated_df.orderBy('custom_dimension_1', ascending=True)
consolidated_dynamicframe = DynamicFrame.fromDF(output_df.repartition(1), glueContext, "consolidated_dynamicframe")
print("hi8")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = "path": "s3://gevme-datalake", format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = consolidated_dynamicframe, connection_type = "s3", connection_options = "path": "s3://gevme-datalake/", format = "csv", transformation_ctx = "datasink2")
job.commit()

【问题讨论】:

【参考方案1】:

您也可以按ms.main_url 分组,因为它在where 子句中始终等于abcde

consolidated_df = spark.sql("""Select ms.main_url, ml.custom_dimension_1 , max(ml.visit_last_action_time) from
                            matomoLogVisit ml inner join matomoSite ms
                            on ms.idsite = ml.idsite
                            where ms.main_url = "abcde"
                            group by ms.main_url, ml.custom_dimension_1""")

【讨论】:

以上是关于无法对 AWS Glue 作业脚本中的两个表进行连接和查询的主要内容,如果未能解决你的问题,请参考以下文章

AWS Glue 作业输入参数

Pyspark 数据框删除 AWS Glue 脚本中的重复项

如何在数据加载之前截断 AWS Glue 作业中的 RDS 表?

AWS Glue ETL作业缺少对爬网程序可见的字段

如何在 AWS 中使用 Glue 作业覆盖 s3 数据

Python/Pyspark 迭代代码(用于 AWS Glue ETL 作业)