如何使用 AWS JavaScript SDK (dynamoDB) 处理 UnprocessedItems?

Posted

技术标签:

【中文标题】如何使用 AWS JavaScript SDK (dynamoDB) 处理 UnprocessedItems?【英文标题】:How to handle UnprocessedItems using AWS JavaScript SDK (dynamoDB)? 【发布时间】:2015-10-31 03:36:58 【问题描述】:

我正在尝试使用 AWS Lambda 函数来处理来自 SendGrid 的事件。据我了解,该事件将是一个包含可变数量 JSON 对象的数组,每个对象代表一个给定事件。我想使用 batchWriteItem 将这些事件写入 DynamoDB 并循环该过程,直到我没有返回任何 UnprocessedItems。但是,我陷入了无限循环。这是我现在的代码:

console.log('Loading function');

var aws = require('aws-sdk');
var dynamo = new aws.DynamoDB();
params = ;

exports.handler = function(sg_event, context) 

    var items = [];
    for(var i = 0; i < sg_event.length; i++) 
        var obj = sg_event[i];
        var request = 
            PutRequest: 
                Item: 
                    email:  S: obj.email ,
                    timestamp:  S: obj.timestamp.toString() ,
                    sg_message_id:  S: obj.sg_message_id ,
                    event:  S: obj.event 
                
            
        ;
        items.push(request);
    

    params = 
        RequestItems: 
            sendgrid_response: items
        
    

    do 
        dynamo.batchWriteItem( params, function(err, data) 
            if(err)
                context.fail(err);
            else
                params.RequestItems = data.UnprocessedItems;
        );
     while(!isEmpty(params.RequestItems));
;

function isEmpty(obj) 
    return (Object.keys(obj).length === 0);

我认为问题在于尝试在回调函数中设置参数,但我不知道我应该怎么做...我知道我可以在回调中使用 UnprocessedItems 调用另一个 batchWriteItem原始的,但我仍然需要能够根据需要多次运行该函数,以确保写入所有未处理项。如何正确循环 batchWriteItem?

【问题讨论】:

【参考方案1】:

Nodejs 是单线程的,它首先执行所有主要功能,因此您的 while 循环永远不会完成,回调永远不会执行。

这是你的做法:

//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data) 
  if (err)  
     //fail
   else 
    var params = ;
    params.RequestItems = data.UnprocessedItems;
    db.batchWriteItem(params, processItemsCallback);
  
;

db.batchWriteItem(/*initial params*/, processItemsCallback);

【讨论】:

应该是data.UnprocessedKeys @PiminKonstantinKefaloukos 不,UnprocessedItems 是正确的。见the docs 这是否有可能陷入无限循环? 如果有未处理的项目,最好在 db.batchWriteItem 上使用指数退避。【参考方案2】:

@Daniela Miao,感谢分享解决方案。

我们可以在您发布的代码中添加一个代码块,以避免 DynamoDB 异常。这将在再次请求 DynamoDB 进行批量写入之前检查 params.RequestItems 是否有未处理的数据。

//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data) 
  if (err)  
     //fail
   else 
    var params = ;
    params.RequestItems = data.UnprocessedItems;
    /*
    * Added Code block 
    */
    if(Object.keys(params.RequestItems).length != 0) 
      db.batchWriteItem(params, processItemsCallback);
    
  
;

db.batchWriteItem(/*initial params*/, processItemsCallback);

【讨论】:

【参考方案3】:

这是我使用“await”语法的代码示例。所以这段代码必须在异步函数中。它会在重试之前随机延迟。

do 
    batchWriteResp = await dynamo.batchWriteItem(RequestItems:batchWriteItems).promise()
    if (Object.keys(batchWriteResp.UnprocessedItems).length>0) 
        batchWriteItems = batchWriteResp.UnprocessedItems
        // delay a random time between 0.5~2.5 seconds
        const delay = Math.floor(Math.random() * 2000 + 500)
        await new Promise(resolve => setTimeout(resolve, delay));
     else 
        break
    
 while (true)

【讨论】:

我发现如果你不检查未处理的项目,条件可能会失败:(batchWriteResp && batchWriteResp.UnprocessedItems && Object.keys(batchWriteResp.UnprocessedItems) && Object.keys(batchWriteResp.UnprocessedItems) .length > 0)【参考方案4】:

对 BatchGetItem 使用递归的非常简洁的解决方案(其工作方式与 BatchWrite 完全相同):

    public async batchGet(params: BatchGetItemInput, output: BatchGetResponseMap): Promise<BatchGetResponseMap> 
    const batchGetItemOutput: BatchGetItemOutput = await this.documentClient.batchGet(params).promise();

    Object.keys(batchGetItemOutput.Responses).forEach(tableName => 
        if (output[tableName]) 
            output[tableName] = output[tableName].concat(batchGetItemOutput.Responses[tableName]);
         else 
            output[tableName] = batchGetItemOutput.Responses[tableName];
        
    );

    if (Object.keys(batchGetItemOutput.UnprocessedKeys).length !== 0) 
        output = await this.batchGet( RequestItems: batchGetItemOutput.UnprocessedKeys , output);
    

    return output;

【讨论】:

Errors 也被放入UnprocessedItems。如果你在没有重试次数限制的情况下这样做,并且你受到限制或某个项目的其他一致错误,你只会激怒问题并且循环将无限进行。【参考方案5】:

根据 AWS 文档,SDK automatically handles the retry logic:

注意 AWS 开发工具包实现自动重试逻辑和指数退避。

最大重试次数为configurable through:

var dynamodb = new AWS.DynamoDB(apiVersion: '2012-08-10', maxRetries:5);

【讨论】:

它不会自动重试未处理的项目。 AWS 文档还指定如果批处理调用部分失败,则不会重试失败的项目

以上是关于如何使用 AWS JavaScript SDK (dynamoDB) 处理 UnprocessedItems?的主要内容,如果未能解决你的问题,请参考以下文章

AWS SDK JavaScript v3 / 如何在 dynamoDB 扫描命令中使用 ExpressionAttributeNames?

AWS SDK JavaScript:如何显示 AWS.S3.putObject 的上传进度?

使用 aws sdk javascript 禁用 dynamodb 流

如何承诺 AWS JavaScript 开发工具包?

AWS API JavaScript SDK CORS 错误

Javascript AWS SDK S3上传方法与正文流生成空文件