如何使用 node.js 从`AWS dynamodb` 获取/扫描所有项目
Posted
技术标签:
【中文标题】如何使用 node.js 从`AWS dynamodb` 获取/扫描所有项目【英文标题】:How to fetch/scan all items from `AWS dynamodb` using node.js 【发布时间】:2017-11-19 06:34:00 【问题描述】:如何使用node.js
从AWS dynamodb
获取/扫描所有项目。我在这里发布我的代码。
var docClient = new aws.DynamoDB.DocumentClient();
var params =
TableName:"users",
KeyConditionExpression:"user_status=:status",
ExpressionAttributeValues:
":status": "Y"
;
var queryExecute = function(callback)
docClient.query(params,function(err,result)
if(err)
console.log(err)
callback(err);
else
console.log(result);
if(result.LastEvaluatedKey)
params.ExclusiveStartKey = result.LastEvaluatedKey;
queryExecute(callback);
else
callback(err,items);
);
queryExecute(callback);
这给了我以下错误。
ValidationException: Query condition missed key schema element: `user_id`.
这里的主键是user_id。我不想在我的查询条件中使用它,因为如果我在KeyConditionExpression
中提到主键,我需要设置一个值。可能是我错了。但是请建议我一个从dynamodb
获取所有项目的好方法,它有user_status = "Y"
【问题讨论】:
docs.aws.amazon.com/amazondynamodb/latest/developerguide/… ? 【参考方案1】:这对我有用:
export const scanTable = async (tableName) =>
const params =
TableName: tableName,
;
const scanResults = [];
const items;
do
items = await documentClient.scan(params).promise();
items.Items.forEach((item) => scanResults.push(item));
params.ExclusiveStartKey = items.LastEvaluatedKey;
while(typeof items.LastEvaluatedKey !== "undefined");
return scanResults;
;
【讨论】:
这可行,但有一些最佳实践改进:将scanResults
改成const
,将!=
更改为!==
,将items
改成const
并在循环中声明它用于存储 LastEvaluatedKey 的 do 循环之外的单独变量。
您不能在循环内分配给常量“项目”,而是改为“让”。您还可以使用更好的语法在数组中添加多个项目:scanResults.push(...items.Items),这里您还必须在推送之前检查 items.Items 是否未定义。
请问params.ExclusiveStartKey = items.LastEvaluateKey
是干什么用的?
@Jarrett 用于对结果进行分页,并允许您跟踪下一次扫描操作的起点。在 dDB 中扫描将返回最多 1 MB 的数据,或 Limit
参数指定的记录数。如果表包含更多记录,那么您将需要获取 ExclusiveStartKey 并将其作为 LastEvaluatedKey 传递,以便下一个查询从该位置开始扫描。 docs.aws.amazon.com/amazondynamodb/latest/APIReference/…【参考方案2】:
如果您想在不使用 Hash 键值的情况下从 DynamoDB 中获取数据,则需要使用Scan API
。
注意: Scan API 会读取表中的所有项目以获取结果。因此,在 DynamoDB 中这是一项代价高昂的操作。
替代方法:使用 GSI
以上场景扫码:-
var docClient = new AWS.DynamoDB.DocumentClient();
var params =
TableName: "users",
FilterExpression: "#user_status = :user_status_val",
ExpressionAttributeNames:
"#user_status": "user_status",
,
ExpressionAttributeValues: ":user_status_val": 'somestatus'
;
docClient.scan(params, onScan);
var count = 0;
function onScan(err, data)
if (err)
console.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
else
console.log("Scan succeeded.");
data.Items.forEach(function(itemdata)
console.log("Item :", ++count,JSON.stringify(itemdata));
);
// continue scanning if we have more items
if (typeof data.LastEvaluatedKey != "undefined")
console.log("Scanning for more...");
params.ExclusiveStartKey = data.LastEvaluatedKey;
docClient.scan(params, onScan);
【讨论】:
Scan 最多只能扫描 1MB 的数据,因此此代码不适用于大于 1MB 的数据库。 代码有一个递归调用,直到 LastEvaluatedKey 未定义。所以,它应该可以工作。 对不起,我错过了! 虽然这似乎可行,但我认为如果这个答案实际上会返回数据,而不是仅仅将其打印到控制台,那会好得多。对于刚接触 JS 或函数式编程的人来说,从这里想出一个实际将数据(通过 callbak)返回给调用函数的解决方案并不一定容易。 同意@JuhaKervinen,最好返回数据。汉克的回答做到了这一点,而且更加简洁。【参考方案3】:AWS 文档示例对我不起作用。 @Hank 方法成功了。
在 lambda 中使用处理程序:
const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient(
// optional tuning - 50% faster(cold) / 20% faster(hot)
apiVersion: '2012-08-10',
sslEnabled: false,
paramValidation: false,
convertResponseTypes: false
);
const tableName = 'series';
exports.handler = async (event, context, callback) =>
let params = TableName: tableName ;
let scanResults = [];
let items;
do
items = await docClient.scan(params).promise();
items.Items.forEach((item) => scanResults.push(item));
params.ExclusiveStartKey = items.LastEvaluatedKey;
while (typeof items.LastEvaluatedKey != "undefined");
callback(null, scanResults);
;
【讨论】:
【参考方案4】:使用 Promise 和异步
const aws = require('aws-sdk');
aws.config.update( region: 'us-east-1' );
const documentClient = new aws.DynamoDB.DocumentClient();
const scanAll = async (params) =>
let lastEvaluatedKey = 'dummy'; // string must not be empty
const itemsAll = [];
while (lastEvaluatedKey)
const data = await documentClient.scan(params).promise();
itemsAll.push(...data.Items);
lastEvaluatedKey = data.LastEvaluatedKey;
if (lastEvaluatedKey)
params.ExclusiveStartKey = lastEvaluatedKey;
return itemsAll;
这样使用
const itemsAll = scanAll(params);
查询的代码相同(只需将扫描替换为查询)
【讨论】:
【参考方案5】:我使用这样的承诺:
let AWS = require('aws-sdk');
let docClient = new AWS.DynamoDB.DocumentClient();
async function dbRead(params)
let promise = docClient.scan(params).promise();
let result = await promise;
let data = result.Items;
if (result.LastEvaluatedKey)
params.ExclusiveStartKey = result.LastEvaluatedKey;
data = data.concat(await dbRead(params));
return data;
并使用它:
let params =
TableName: 'Table'
;
let data = await dbRead(params);
【讨论】:
要了解递归,首先要了解递归。【参考方案6】:const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient(
// optional tuning - 50% faster(cold) / 20% faster(hot)
apiVersion: '2012-08-10',
sslEnabled: false,
paramValidation: false,
convertResponseTypes: false,
region: 'us-east-2' // put your region
);
const tableName = 'tableName'; // put your tablename
exports.handler = async (event, context, callback) =>
let params = TableName: tableName ;
let scanResults = [];
let items;
do
items = await docClient.scan(params).promise();
items.Items.forEach((item) => scanResults.push(item));
params.ExclusiveStartKey = items.LastEvaluatedKey;
while (typeof items.LastEvaluatedKey != "undefined");
callback(null, scanResults);
;
【讨论】:
我喜欢这种方法。谢谢!【参考方案7】:以 JSON 格式返回数据的 node express 解决方案:
let datapack=[];
item =
TableName: ddbTable,
FilterExpression: "aws = :e AND begins_with ( Id, :t )",
ExpressionAttributeValues:
":t" : "contact",
":e" : aws
,
ProjectionExpression: "Id,FirstName,LastName,cEmail",
;
docClient.scan(item, onScan);
function onScan(err, data)
if (err)
console.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
else
datapack = datapack.concat(data.Items);
);
if (typeof data.LastEvaluatedKey != "undefined")
item.ExclusiveStartKey = data.LastEvaluatedKey;
docClient.scan(item, onScan);
else
res.json(datapack);
【讨论】:
【参考方案8】:这是一个提供索引结果的答案,而不是使用昂贵的扫描,也是 JSON 格式的节点/快递。注意 docClient.query 的使用:
datapack=[];
item =
TableName: ddbTable,
IndexName: "cEmailIndex",
KeyConditionExpression : "aws = :e AND begins_with ( cEmail, :t )",
ExpressionAttributeValues:
":t" : search,
":e" : aws
,
ProjectionExpression: "Id,FirstName,LastName,cEmail",
;
docClient.query(item, onScan);
function onScan(err, data)
if (err)
console.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
else
datapack = datapack.concat(data.Items);
if (typeof data.LastEvaluatedKey != "undefined")
item.ExclusiveStartKey = data.LastEvaluatedKey;
docClient.query(item, onScan);
else
// console.log(JSON.stringify(datapack));
res.json(datapack);
【讨论】:
注意:如果您提供了更多数据,则此示例和下一个示例仍然有效,然后单次扫描可以处理。请注意 dynamodb 内存限制。 嗨,我关注了这个,我得到了结果。唯一的问题是查询中返回的 LastEvaluatedKey 对我来说总是相同的。【参考方案9】:对于不使用 AWS.DynamoDB.DocumentClient 的用户,此解决方案将有效。 我已将功能拆分为多个模块,以便于阅读并使用 async/await。
const AWS = require("aws-sdk");
AWS.config.update(
// update table region here
region: "us-west-2"
);
var dynamodb = new AWS.DynamoDB();
const performAsynScanOperation = (scanParams) =>
return new Promise((resolve, reject) =>
dynamodb.scan(scanParams, function (err, responseData)
if (err)
reject(err)
else
resolve(responseData)
)
)
const getAllRecords = async (tableName) =>
let allItems = [];
let LastEvaluatedKeyFlag = true;
let scanParams = TableName: tableName
while (LastEvaluatedKeyFlag)
let responseData = await performAsynScanOperation(scanParams)
let batchItems = responseData.Items;
allItems = allItems.concat(batchItems);
if (responseData.LastEvaluatedKey)
LastEvaluatedKeyFlag = true;
console.log('LastEvaluatedKey', responseData.LastEvaluatedKey)
scanParams.ExclusiveStartKey = responseData.LastEvaluatedKey
else
LastEvaluatedKeyFlag = false;
return allItems;
getAllRecords('<Name of table>').then((allItems)=>
console.log(allItems)
)
【讨论】:
【参考方案10】:您可以使用来自@aws/dynamodb-query-iterator 的 ScanPaginator:
import ScanPaginator from '@aws/dynamodb-query-iterator';
import DynamoDB = require('aws-sdk/clients/dynamodb');
const paginator = new ScanPaginator(
new DynamoDB.DocumentClient(),
TableName: "users",
FilterExpression: "#user_status = :user_status_val",
ExpressionAttributeNames:
"#user_status": "user_status",
,
ExpressionAttributeValues: ":user_status_val": 'somestatus'
);
for await (const page of paginator)
// do something with `page`, e.g. myFunction(page.Items)
【讨论】:
【参考方案11】:这是扫描所有记录的插入式替换:
const scanAll = async (params) =>
let all = [];
while (true)
let data = await new Promise((resolve, reject) =>
db.scan(params, function (err, data)
if (err)
reject(err);
else
resolve(data);
);
);
all = all.concat(data.Items);
if (data.LastEvaluatedKey)
params.ExclusiveStartKey = data.LastEvaluatedKey;
else
break;
return all;
;
用法:
scanAll(query)
.catch((err) =>
)
.then((records) =>
);
【讨论】:
【参考方案12】:scan 方法读取表中的每一项,并返回表中的所有数据。您可以提供一个可选的 filter_expression,以便只返回符合您的条件的项目。但是,仅在扫描整个表后才应用过滤器。 ref
我正在分享重构的 onScan
函数,希望对您有所帮助。
var AWS = require("aws-sdk");
AWS.config.update(
region: "us-west-2",
endpoint: "http://localhost:8000"
);
var docClient = new AWS.DynamoDB.DocumentClient();
async function read()
const params =
TableName: "tableName"
// options can be passed here e.g.
// FilterExpression: "#yr between :start_yr and :end_yr",
;
let items = [];
return new Promise((resolve, reject) =>
function onScan(err, data)
if (err)
console.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
reject();
else
items = items.concat(data.Items);
// continue scanning if we have more items, because
// scan can retrieve a maximum of 1MB of data
if (typeof data.LastEvaluatedKey !== "undefined")
params.ExclusiveStartKey = data.LastEvaluatedKey;
docClient.scan(params, onScan);
else
resolve(items);
docClient.scan(params, onScan);
);
【讨论】:
以上是关于如何使用 node.js 从`AWS dynamodb` 获取/扫描所有项目的主要内容,如果未能解决你的问题,请参考以下文章
如何将现有的 dynamo db 与 AWS Amplify 和 graphql 一起使用
如何从 AWS lambda 发布到 Node.js 中的云观察指标
如何使用适用于 Node.js 的 AWS 开发工具包将 Amazon S3 中的所有对象从一个前缀复制/移动到另一个前缀
如何在 AWS lambda 函数中设置 dynamo db 触发器
如何从部署在 AWS Elastic beanstalk 上的 node.js express 应用程序获取客户端 IP?