aws 胶水 pyspark 删除数组中的结构,但保留数据并保存到 dynamodb

Posted

技术标签:

【中文标题】aws 胶水 pyspark 删除数组中的结构,但保留数据并保存到 dynamodb【英文标题】:aws glue pyspark remove struct in an array but keep the data and save into dynamodb 【发布时间】:2021-11-10 04:55:48 【问题描述】:

一个 dynamodb 表被导出到 s3,aws glue crawler 爬取 s3 数据。 Aws 粘合作业从爬取的数据中获取源,这是由 MergeLineItems 转换的架构:

def MergeLineItems(rec):
    rec["lineItems1"] = 
    a = []
    for x in rec["lineItems"]:
        a.append(x["M"])
    rec["lineItems1"] = a
    return rec
  
mapped_dyF =  Map.apply(frame = Transform0, f = MergeLineItems)

架构是这样的:

    -- lineItems1: array
    |    |-- element: struct
    |    |    |-- price: struct
    |    |    |    |-- N: string
    |    |    |-- grade: struct
    |    |    |    |-- S: string
    |    |    |-- expectedAmount: struct
    |    |    |    |-- N: string
    |    |    |-- notifiedAmount: struct
    |    |    |    |-- N: string

当我运行 aws 胶水作业并且保存到 dynamodb 中的数据是这样的:

[
    
        "M":
        
            "expectedAmount":
            
                "M":
                
                    "N":
                    
                        "S": "10"
                    
                
            ,
            "grade":
            
                "M":
                
                    "S":
                    
                        "S": "GradeAAA"
                    
                
            ,
            "notifiedAmount":
            
                "M":
                
                    "N":
                    
                        "S": "0"
                    
                
            ,
            "price":
            
                "M":
                
                    "N":
                    
                        "S": "2.15"
                    
                
            
        
    
]

虽然来自原始 dynamodb 的数据与此不同。如何将数据更改为这个:

[
    
        "M":
        
            "expectedAmount":
            
                "N": "10"
            ,
            "notifiedAmount":
            
                "N": "0"
            ,
            "grade":
            
                "S": "GradeAAA"
            ,
            "price":
            
                "N": "2.15"
            
        
    
]

【问题讨论】:

【参考方案1】:

我让它工作了。这是我的答案:

DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "data", transformation_ctx = "DataSource0")

Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("item.lineItems.L", "array", "lineItems", "array")], transformation_ctx = "Transform0")


def MergeLineItems(rec):
    rec["lineItems1"] = 
    a = []
    for x in rec["lineItems"]:
        val = x["M"]["expectedAmount"]["N"]
        x["M"]["expectedAmount"] = Decimal(val)
        
        val = x["M"]["notifiedAmount"]["N"]
        x["M"]["notifiedAmount"] = Decimal(val)
        
        val = x["M"]["grade"]["S"]
        x["M"]["grade"] = str(val)
        
        val = x["M"]["price"]["N"]
        x["M"]["price"] = Decimal(val)
        
        a.append(x["M"])
    rec["lineItems1"] = a
    return rec
  
mapped_dyF =  Map.apply(frame = Transform0, f = MergeLineItems)
mapped_dyF = DropFields.apply(mapped_dyF, paths=['lineItems'])
mapped_dyF = RenameField.apply(mapped_dyF, "lineItems1", "lineItems") 


glueContext.write_dynamic_frame_from_options(
    frame=mapped_dyF,
    connection_type="dynamodb",
    connection_options=
        "dynamodb.region": "us-east-1",
        "dynamodb.output.tableName": "mydb",
        "dynamodb.throughput.write.percent": "1.0"
    
)
job.commit()

【讨论】:

以上是关于aws 胶水 pyspark 删除数组中的结构,但保留数据并保存到 dynamodb的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark aws 胶水时显示 DataFrame

如何使用 pyspark 从 aws 胶水的时间戳中提取年份

使用 pyspark 和 aws 胶水进行数据转置

为啥我的 aws 胶水作业只使用一个执行器和驱动程序?

将胶水pyspark错误写入文本文件

字符串长度超过 Glue 中的 DDL 长度(python,pyspark)