aws 胶水 pyspark 删除数组中的结构,但保留数据并保存到 dynamodb
Posted
技术标签:
【中文标题】aws 胶水 pyspark 删除数组中的结构,但保留数据并保存到 dynamodb【英文标题】:aws glue pyspark remove struct in an array but keep the data and save into dynamodb 【发布时间】:2021-11-10 04:55:48 【问题描述】:一个 dynamodb 表被导出到 s3,aws glue crawler 爬取 s3 数据。 Aws 粘合作业从爬取的数据中获取源,这是由 MergeLineItems 转换的架构:
def MergeLineItems(rec):
rec["lineItems1"] =
a = []
for x in rec["lineItems"]:
a.append(x["M"])
rec["lineItems1"] = a
return rec
mapped_dyF = Map.apply(frame = Transform0, f = MergeLineItems)
架构是这样的:
-- lineItems1: array
| |-- element: struct
| | |-- price: struct
| | | |-- N: string
| | |-- grade: struct
| | | |-- S: string
| | |-- expectedAmount: struct
| | | |-- N: string
| | |-- notifiedAmount: struct
| | | |-- N: string
当我运行 aws 胶水作业并且保存到 dynamodb 中的数据是这样的:
[
"M":
"expectedAmount":
"M":
"N":
"S": "10"
,
"grade":
"M":
"S":
"S": "GradeAAA"
,
"notifiedAmount":
"M":
"N":
"S": "0"
,
"price":
"M":
"N":
"S": "2.15"
]
虽然来自原始 dynamodb 的数据与此不同。如何将数据更改为这个:
[
"M":
"expectedAmount":
"N": "10"
,
"notifiedAmount":
"N": "0"
,
"grade":
"S": "GradeAAA"
,
"price":
"N": "2.15"
]
【问题讨论】:
【参考方案1】:我让它工作了。这是我的答案:
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "data", transformation_ctx = "DataSource0")
Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("item.lineItems.L", "array", "lineItems", "array")], transformation_ctx = "Transform0")
def MergeLineItems(rec):
rec["lineItems1"] =
a = []
for x in rec["lineItems"]:
val = x["M"]["expectedAmount"]["N"]
x["M"]["expectedAmount"] = Decimal(val)
val = x["M"]["notifiedAmount"]["N"]
x["M"]["notifiedAmount"] = Decimal(val)
val = x["M"]["grade"]["S"]
x["M"]["grade"] = str(val)
val = x["M"]["price"]["N"]
x["M"]["price"] = Decimal(val)
a.append(x["M"])
rec["lineItems1"] = a
return rec
mapped_dyF = Map.apply(frame = Transform0, f = MergeLineItems)
mapped_dyF = DropFields.apply(mapped_dyF, paths=['lineItems'])
mapped_dyF = RenameField.apply(mapped_dyF, "lineItems1", "lineItems")
glueContext.write_dynamic_frame_from_options(
frame=mapped_dyF,
connection_type="dynamodb",
connection_options=
"dynamodb.region": "us-east-1",
"dynamodb.output.tableName": "mydb",
"dynamodb.throughput.write.percent": "1.0"
)
job.commit()
【讨论】:
以上是关于aws 胶水 pyspark 删除数组中的结构,但保留数据并保存到 dynamodb的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyspark aws 胶水时显示 DataFrame