如何使用 AWS JavaScript SDK (dynamoDB) 处理 UnprocessedItems?
Posted
技术标签:
【中文标题】如何使用 AWS JavaScript SDK (dynamoDB) 处理 UnprocessedItems?【英文标题】:How to handle UnprocessedItems using AWS JavaScript SDK (dynamoDB)? 【发布时间】:2015-10-31 03:36:58 【问题描述】:我正在尝试使用 AWS Lambda 函数来处理来自 SendGrid 的事件。据我了解,该事件将是一个包含可变数量 JSON 对象的数组,每个对象代表一个给定事件。我想使用 batchWriteItem 将这些事件写入 DynamoDB 并循环该过程,直到我没有返回任何 UnprocessedItems。但是,我陷入了无限循环。这是我现在的代码:
console.log('Loading function');
var aws = require('aws-sdk');
var dynamo = new aws.DynamoDB();
params = ;
exports.handler = function(sg_event, context)
var items = [];
for(var i = 0; i < sg_event.length; i++)
var obj = sg_event[i];
var request =
PutRequest:
Item:
email: S: obj.email ,
timestamp: S: obj.timestamp.toString() ,
sg_message_id: S: obj.sg_message_id ,
event: S: obj.event
;
items.push(request);
params =
RequestItems:
sendgrid_response: items
do
dynamo.batchWriteItem( params, function(err, data)
if(err)
context.fail(err);
else
params.RequestItems = data.UnprocessedItems;
);
while(!isEmpty(params.RequestItems));
;
function isEmpty(obj)
return (Object.keys(obj).length === 0);
我认为问题在于尝试在回调函数中设置参数,但我不知道我应该怎么做...我知道我可以在回调中使用 UnprocessedItems 调用另一个 batchWriteItem原始的,但我仍然需要能够根据需要多次运行该函数,以确保写入所有未处理项。如何正确循环 batchWriteItem?
【问题讨论】:
【参考方案1】:Nodejs 是单线程的,它首先执行所有主要功能,因此您的 while 循环永远不会完成,回调永远不会执行。
这是你的做法:
//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data)
if (err)
//fail
else
var params = ;
params.RequestItems = data.UnprocessedItems;
db.batchWriteItem(params, processItemsCallback);
;
db.batchWriteItem(/*initial params*/, processItemsCallback);
【讨论】:
应该是data.UnprocessedKeys
@PiminKonstantinKefaloukos 不,UnprocessedItems
是正确的。见the docs
这是否有可能陷入无限循环?
如果有未处理的项目,最好在 db.batchWriteItem 上使用指数退避。【参考方案2】:
@Daniela Miao,感谢分享解决方案。
我们可以在您发布的代码中添加一个代码块,以避免 DynamoDB 异常。这将在再次请求 DynamoDB 进行批量写入之前检查 params.RequestItems 是否有未处理的数据。
//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data)
if (err)
//fail
else
var params = ;
params.RequestItems = data.UnprocessedItems;
/*
* Added Code block
*/
if(Object.keys(params.RequestItems).length != 0)
db.batchWriteItem(params, processItemsCallback);
;
db.batchWriteItem(/*initial params*/, processItemsCallback);
【讨论】:
【参考方案3】:这是我使用“await”语法的代码示例。所以这段代码必须在异步函数中。它会在重试之前随机延迟。
do
batchWriteResp = await dynamo.batchWriteItem(RequestItems:batchWriteItems).promise()
if (Object.keys(batchWriteResp.UnprocessedItems).length>0)
batchWriteItems = batchWriteResp.UnprocessedItems
// delay a random time between 0.5~2.5 seconds
const delay = Math.floor(Math.random() * 2000 + 500)
await new Promise(resolve => setTimeout(resolve, delay));
else
break
while (true)
【讨论】:
我发现如果你不检查未处理的项目,条件可能会失败:(batchWriteResp && batchWriteResp.UnprocessedItems && Object.keys(batchWriteResp.UnprocessedItems) && Object.keys(batchWriteResp.UnprocessedItems) .length > 0)【参考方案4】:对 BatchGetItem 使用递归的非常简洁的解决方案(其工作方式与 BatchWrite 完全相同):
public async batchGet(params: BatchGetItemInput, output: BatchGetResponseMap): Promise<BatchGetResponseMap>
const batchGetItemOutput: BatchGetItemOutput = await this.documentClient.batchGet(params).promise();
Object.keys(batchGetItemOutput.Responses).forEach(tableName =>
if (output[tableName])
output[tableName] = output[tableName].concat(batchGetItemOutput.Responses[tableName]);
else
output[tableName] = batchGetItemOutput.Responses[tableName];
);
if (Object.keys(batchGetItemOutput.UnprocessedKeys).length !== 0)
output = await this.batchGet( RequestItems: batchGetItemOutput.UnprocessedKeys , output);
return output;
【讨论】:
Errors 也被放入UnprocessedItems
。如果你在没有重试次数限制的情况下这样做,并且你受到限制或某个项目的其他一致错误,你只会激怒问题并且循环将无限进行。【参考方案5】:
根据 AWS 文档,SDK automatically handles the retry logic:
注意 AWS 开发工具包实现自动重试逻辑和指数退避。
最大重试次数为configurable through:
var dynamodb = new AWS.DynamoDB(apiVersion: '2012-08-10', maxRetries:5);
【讨论】:
它不会自动重试未处理的项目。 AWS 文档还指定如果批处理调用部分失败,则不会重试失败的项目以上是关于如何使用 AWS JavaScript SDK (dynamoDB) 处理 UnprocessedItems?的主要内容,如果未能解决你的问题,请参考以下文章
AWS SDK JavaScript v3 / 如何在 dynamoDB 扫描命令中使用 ExpressionAttributeNames?
AWS SDK JavaScript:如何显示 AWS.S3.putObject 的上传进度?
使用 aws sdk javascript 禁用 dynamodb 流