使用 Lambda 将 Json 添加到 DynamoDB

Posted

技术标签:

【中文标题】使用 Lambda 将 Json 添加到 DynamoDB【英文标题】:Using Lambda to add Json to DynamoDB 【发布时间】:2020-09-04 00:42:46 【问题描述】:

我正在尝试使用 Lambda 函数将这个具有以下结构的大型 Json 文件(超过 8k 个事务)加载到 DynamoDB 中。


    "transactions": [
        
            "customerId": "abc",
            "transactionId": "123",
            "transactionDate": "2020-09-01",
            "merchantId": "1234",
            "categoryId": "3",
            "amount": "5",
            "description": "McDonalds"
        ,
        
            "customerId": "def",
            "transactionId": "456",
            "transactionDate": "2020-09-01",
            "merchantId": "45678",
            "categoryId": "2",
            "amount": "-11.70",
            "description": "Tescos"
        ,
        
            "customerId": "jkl",
            "transactionId": "gah",
            "transactionDate": "2020-09-01",
            "merchantId": "9081",
            "categoryId": "3",
            "amount": "-139.00",
            "description": "Amazon"
        ,
    ...

我尝试使用的 lambda 函数将在将 Json 文件上传到 S3 存储桶时触发。然后应该会自动将数据加载到 DynamoDB 中。 lambda函数目前有如下代码:

import json
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    json_file_name = event['Records'][0]['s3']['object']['key']
    print(bucket)
    print(json_file_name)
    print(str(event))
    json_object = s3_client.get_object(Bucket=bucket,Key=json_file_name)
    jsonFileReader = json_object ['Body'].read()
    jsonDict = json.loads(jsonFileReader)
    table = dynamodb.Table('CustomerEvents')
    table.put_item(Item=jsonDict)
    return 'Hello from Lambda'

如果我尝试将一个独特的事务上传到 DynamoDB 中,这很好用,即,如果文件的结构如下所示:


            "customerId": "abc",
            "transactionId": "123",
            "transactionDate": "2020-09-01",
            "merchantId": "1234",
            "categoryId": "3",
            "amount": "5",
            "description": "McDonalds"
 

如何调整 lambda 函数以按照上述方式将所有事务 (> 8k) 加载到 DynamoDB 中?

【问题讨论】:

你想循环运行吗? @Marcin 是的,请问我该怎么做? 尝试 25 条记录的批次,这是每个 DynamoDB 请求的最大记录数。 嗨@TraychoIvanov 如何使用@Marcin 的以下代码设置最大记录数为25? 【参考方案1】:

您可以使用batch_writer 从您的文件中写入多个transactions

一个例子是:

import json
import boto3

s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')

table = dynamodb.Table('CustomerEvents')

def lambda_handler(event, context):

    bucket = event['Records'][0]['s3']['bucket']['name']
    json_file_name = event['Records'][0]['s3']['object']['key']

    print(bucket)
    print(json_file_name)
    print(str(event))

    json_object = s3_client.get_object(Bucket=bucket,Key=json_file_name)
    jsonFileReader = json_object['Body'].read()
    jsonDict = json.loads(jsonFileReader)
    
    with table.batch_writer() as batch:
        for transaction in jsonDict['transactions']:
            print(transaction)
            batch.put_item(Item=transaction)

    return 'Hello from Lambda'

【讨论】:

谢谢,我会试试这个。我看到您注释掉了“#table = dynamodb.Table('CustomerEvents')”这一行,但是 Lambda 如何知道它应该从 DynamoDB 中选择哪个表来加载数据? @ERR 对不起。刚刚更正。我在我自己的 lambda 函数上对其进行了测试,所以我不得不将表更改为我的表。忘记取消评论它回到你的。我看到还有我的测试桶名称。也改变了这一点。 完全不用担心。还有这一行“bucket='my-bucket-for-custom-objects361'” - 那是你的测试桶吗?我可以删除它并简单地保留“bucket = event['Records'][0]['s3']['bucket']['name']”?或者您建议将存储桶的确切名称分配给变量存储桶? @ERR 是的,这是我的测试桶。也忘记删除了。已修改答案以纠正此问题。 我在 CloudWatch 上尝试运行上述代码时遇到了这个问题:调用 BatchWriteItem 操作时发生错误 (ValidationException):提供的项目键列表包含重复项。你知道我怎么能解决这个问题吗?谢谢

以上是关于使用 Lambda 将 Json 添加到 DynamoDB的主要内容,如果未能解决你的问题,请参考以下文章

使用 writerows,lambda 将数据添加到特定列

Lambda 异步调用

如何在 lambda 中使用 node-js 将我的文件添加到现有的 zip 文件夹?

如何使用 JSON 格式将 lambda 请求 ID 记录到 AWS CloudWatch Api 网关日志组中?

从AWS Lambda python函数将多个JSON文件合并到S3中的单个JSON文件

未处理的异常:从 jsonplaceholder.typecode.com/photos 获取 Json 时,类型“List<dynamic>”不是“Map<String, dyna