使用 pyspark 将作业粘合到联合数据帧

Posted

技术标签:

【中文标题】使用 pyspark 将作业粘合到联合数据帧【英文标题】:Glue Job to union dataframes using pyspark 【发布时间】:2019-10-31 12:19:48 【问题描述】:

我基本上是在尝试将行从一个 DF 更新/添加到另一个。这是我的代码:

# S3
import boto3

# SOURCE
source_table = "someDynamoDbtable"
source_s3 = "s://mybucket/folder/"

# DESTINATION
destination_bucket = "s3://destination-bucket"

#Select which attributes to update/add
params = ['attributeD', 'attributeF', 'AttributeG']

#spark wrapper
glueContext = GlueContext(SparkContext.getOrCreate())

newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = "tableName": source_table)
newValues = newData.select_fields(params)
newDF = newValues.toDF()

oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options="paths": [source_s3], format="orc", format_options=, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()

#makes a union of the dataframes
rebuildData = oldDF.union(newDF)
#error happens here
readyData = DynamicFrame.fromDF(rebuildData, glueContext, "readyData")

#writes new data to s3 destination, into orc files, while partitioning
glueContext.write_dynamic_frame.from_options(frame = readyData, connection_type = "s3", connection_options = "path": destination_bucket, format = "orc", partitionBy=['partition_year', 'partition_month', 'partition_day'])

我得到的错误是:

SyntaxError:readyData = ... 行上的语法无效

到目前为止,我不知道出了什么问题。

【问题讨论】:

你确定rebuildData = oldDF.union(newData) 有效吗? 【参考方案1】:

您正在执行数据帧和动态帧之间的联合操作。

这将创建一个名为 newData 的动态帧和一个名为 newDF 的数据帧:

newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = "tableName": source_table)
newValues = newData.select_fields(params)
newDF = newValues.toDF()

这将创建一个名为 oldData 的动态帧和一个名为 oldDF 的数据帧:

oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options="paths": [source_s3], format="orc", format_options=, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()

您正在对以上两个实体执行联合操作,如下所示:

rebuildData = oldDF.union(newData)

应该是:

rebuildData = oldDF.union(newDF)

【讨论】:

是的,我注意到了,但是在复制我的代码时,这似乎是一个拼写错误。不幸的是,这不是问题:( 能否添加rebuildData.show 的输出和详细的堆栈跟踪?【参考方案2】:

是的,所以我认为对于我需要做的事情,使用 OUTER JOIN 会更好。 让我解释一下:

我加载了两个数据帧,其中一个删除了我们想要更新的字段。 第二个只选择那些字段,因此两者都不会有重复的行/列。 我们使用外部(或完全)连接,而不是只会添加行的联合。这将添加我的数据框中的所有数据而不会重复。

现在我的逻辑可能有缺陷,但到目前为止它对我来说还可以。如果有人正在寻找类似的解决方案,欢迎您使用它。 我更改的代码:

rebuildData = oldDF.join(newData, 'id', 'outer')

【讨论】:

以上是关于使用 pyspark 将作业粘合到联合数据帧的主要内容,如果未能解决你的问题,请参考以下文章

在 s3 pyspark 作业中创建单个镶木地板文件

创建 AWS 粘合作业是不是需要爬网程序?

Pyspark 数据帧从一个存储桶中读取,并在同一作业中使用不同的 KMS 密钥写入另一个存储桶

PySpark:将 PythonRDD 附加/合并到 PySpark 数据帧

PySpark:执行联合中的列 dtype 更改 [重复]

无法使用 pyspark 将 Xml 数据读取到数据帧