无法对 AWS Glue 作业脚本中的两个表进行连接和查询
Posted
技术标签:
【中文标题】无法对 AWS Glue 作业脚本中的两个表进行连接和查询【英文标题】:Not able to put a join and query on two tables in AWS Glue job script 【发布时间】:2021-05-21 14:26:51 【问题描述】:所以,我创建了一个 AWS 粘合作业脚本,在其中添加了两个数据源并将它们从动态帧转换为数据帧。我的目标是使用内部联接从两个表中获取查询,但我无法做到这一点。作业在查询步骤失败。我也添加了错误。请在这里帮助我。 另外,请查看下面的代码。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext,SparkConf
from pyspark.sql.window import Window
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "matomo", table_name = "matomo_matomo_log_visit", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "matomo", table_name = "matomo_matomo_log_visit", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("visit_last_action_time", "timestamp", "visit_last_action_time", "timestamp"), ("custom_dimension_1", "string", "custom_dimension_1", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("idsite" , "int" , "idsite" , "int"),("visit_last_action_time", "timestamp", "visit_last_action_time", "timestamp"), ("custom_dimension_1", "string", "custom_dimension_1", "string")], transformation_ctx = "applymapping1")
## @type: DataSource
## @args: [database = "matomo", table_name = "matomo_matomo_site", transformation_ctx = "datasource1"]
## @return: datasource1
## @inputs: []
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "matomo", table_name = "matomo_matomo_site", transformation_ctx = "datasource1")
## @type: ApplyMapping
## @args: [mapping = [("idsite", "int", "idsite", "int"), ("main_url", "string", "main_url", "string")], transformation_ctx = "applymapping2"]
## @return: applymapping2
## @inputs: [frame = datasource1]
applymapping2 = ApplyMapping.apply(frame = datasource1, mappings = [("idsite", "int", "idsite", "int"), ("main_url", "string", "main_url", "string")], transformation_ctx = "applymapping2")
df = datasource0.toDF()
df1= datasource1.toDF()
print("hi4")
df.createOrReplaceTempView("matomoLogVisit")
df1.createOrReplaceTempView("matomoSite")
print("hi5")
consolidated_df = spark.sql("""Select ms.main_url, ml.custom_dimension_1 , max(ml.visit_last_action_time) from
matomoLogVisit ml inner join matomoSite ms
on ms.idsite = ml.idsite
where ms.main_url = "abcde"
group by ml.custom_dimension_1""")
output_df = consolidated_df.orderBy('custom_dimension_1', ascending=True)
consolidated_dynamicframe = DynamicFrame.fromDF(output_df.repartition(1), glueContext, "consolidated_dynamicframe")
print("hi8")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = "path": "s3://gevme-datalake", format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = consolidated_dynamicframe, connection_type = "s3", connection_options = "path": "s3://gevme-datalake/", format = "csv", transformation_ctx = "datasink2")
job.commit()
【问题讨论】:
【参考方案1】:您也可以按ms.main_url
分组,因为它在where
子句中始终等于abcde
:
consolidated_df = spark.sql("""Select ms.main_url, ml.custom_dimension_1 , max(ml.visit_last_action_time) from
matomoLogVisit ml inner join matomoSite ms
on ms.idsite = ml.idsite
where ms.main_url = "abcde"
group by ms.main_url, ml.custom_dimension_1""")
【讨论】:
以上是关于无法对 AWS Glue 作业脚本中的两个表进行连接和查询的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 数据框删除 AWS Glue 脚本中的重复项