使用 mongodb / mongoose 有条件地将 5-20k 文档的输入批次处理成一个包含多达一百万个文档的集合的有效方法是啥?

Posted

技术标签:

【中文标题】使用 mongodb / mongoose 有条件地将 5-20k 文档的输入批次处理成一个包含多达一百万个文档的集合的有效方法是啥?【英文标题】:What is an efficient way to conditionally process input batches of 5-20k docs into a collection with up to a million docs with mongodb / mongoose?使用 mongodb / mongoose 有条件地将 5-20k 文档的输入批次处理成一个包含多达一百万个文档的集合的有效方法是什么? 【发布时间】:2019-06-30 12:13:39 【问题描述】:

在我的 mmo 人口普查/角色统计跟踪应用程序中,我从用户那里获得了最多 5-20k 个文档的输入批次,我需要将这些文档汇总到数据库中。我有特定的标准要寻找来确定输入中的文档是否已经存在于集合中并且需要更新,或者它是否是全新的并且需要插入到集合中。

为了让我的应用正常工作,我可以准确确定在处理输入后直接更新和/或添加了多少文档。

为了更好地解释我想要做什么,我将其分解为一个简化的示例,我可以在其中向您展示输入的样子以及所需的结果。


作为以下输入案例的起点,集合如下所示:

collection = [
   name: 'Jean', server: 'Alpha', level: 9 ,
   name: 'Anna', server: 'Beta', level: 17 ,
   name: 'Jean', server: 'Beta', level: 10 
];

我基本上需要涵盖 3 个输入案例。


案例#1

当我收到带有全新 name+server 组合的输入时,应将新文档添加到集合中

input =  name: 'Victor', server: 'Alpha', level: 22 ;

应该变成:

collection = [
   name: 'Jean', server: 'Alpha', level: 9 ,
   name: 'Anna', server: 'Beta', level: 17 ,
   name: 'Jean', server: 'Beta', level: 10 ,
   name: 'Victor', server: 'Alpha', level: 22 
];

案例#2

当我收到包含现有name+server 组合但更高 level 的输入时,应该更新现有文档

input =  name: 'Jean', server: 'Alpha', level: 10 ;

应该变成

collection = [
   name: 'Jean', server: 'Alpha', level: 10 ,
   name: 'Anna', server: 'Beta', level: 17 ,
   name: 'Jean', server: 'Beta', level: 10 
];

案例#3

当我使用现有的name+server 组合获得输入,但使用 equallower level 时,什么都不会发生,并且集合应该保持原样

input =  name: 'Jean', server: 'Alpha', level: 9 ;

input =  name: 'Jean', server: 'Alpha', level: 8 ;

应该留下:

collection = [
   name: 'Jean', server: 'Alpha', level: 9 ,
   name: 'Anna', server: 'Beta', level: 17 ,
   name: 'Jean', server: 'Beta', level: 10 
];

到目前为止,我所做的基本上是将整个集合提取到一个数组中,然后使用Array.filter 找出集合中已经存在的输入并使用findOneAndUpdate 更新它们,以及哪些输入是新的和使用insertMany 将它们插入到集合中:

Test.find(, async (err, documents) => 
  if (err) return console.log(err);
  if (documents.length > 0) 
    const changedInputs = inputs.filter(byChanged(documents));
    const newInputs = inputs.filter(byNew(documents));

    const insertResult = await Test.insertMany(newInputs);
    const inserted = insertResult.length;

    const updateResults = await Promise.all(compileUpdatePromises(changedInputs));
    let updated = 0;
    updateResults.forEach(updateResult => 
      updated = updateResult === 'updated' ? updated + 1 : updated;
    );

    console.log('updated:', updated);
    console.log('inserted:', inserted);
  
);

Link to a gist with the whole example

当集合中的文档不多时,这工作得很好,但是现在它已经增长到 50k+ 个文档,它变得异常缓慢并且在该过程中阻塞了 mongo 连接,这也阻塞了所有其他调用的整个 api。

一旦这个应用程序获得更多流量,它就有可能迅速增长到一百万个文档的集合,然后不断更新。

有没有什么简单有效的方法让mongodb替我做所有辛苦的工作,而不是自己一个人做?


更新 1:

simagixblackening 的建议下,我非常接近解决方案。这就是我更改后的代码现在的样子:

const bulkInput = inputs.map(input => (
  updateOne: 
    filter:  name: input.name, server: input.server, level:  $lte: input.level  ,
    upsert: true,
    update:  $set:  name: input.name, server: input.server, level: input.level  
  
));

Test.bulkWrite(bulkInput).then(result => 
  console.log('inserted:', result.nUpserted, 'updated:', result.nModified);
);

现在的问题是案例#3的第二个例子:

input =  name: 'Jean', server: 'Alpha', level: 8 ;

结果:

collection = [
   name: 'Jean', server: 'Alpha', level: 9 ,
   name: 'Anna', server: 'Beta', level: 17 ,
   name: 'Jean', server: 'Beta', level: 10 ,
   name: 'Jean', server: 'Alpha', level: 8 
];

Link to updated gist


更新 2:

只需要制作复合索引

testSchema.index( name: 1, server: 1 );

到一个唯一的复合索引

testSchema.index( name: 1, server: 1 ,  unique: true );

现在我必须找到一个合适的解决方案来处理 Case #3 示例 2 引发的 E11000 duplicate key error 异常。

Link to updated gist

【问题讨论】:

【参考方案1】:

首先,设置复合索引。 https://docs.mongodb.com/manual/core/index-compound/

在 mongodb 和 mongoose 上都可用。

其次,请编写正确的检索查询。 $or (https://docs.mongodb.com/manual/reference/operator/query/or/) 当索引支持时为 O(k log n),其中 k 是匹配项的数量。

或者,尝试批量操作。 https://docs.mongodb.com/manual/reference/method/Bulk/。

它可以返回成功查找/更新的次数。 https://docs.mongodb.com/manual/reference/method/BulkWriteResult/。添加一个额外的字段来查找级别: $lt: currlvl 仅有条件地进行更新。我不是特别清楚如何将它与 upserts 结合起来。

最后,如果我是你,我会散列/连接服务器和名称并将其设为 id。会让生活变得如此轻松。

【讨论】:

bulkWrite 真是个好主意。我将它与 simagix 的建议一起使用并更新了我的问题。【参考方案2】:

从您的简化示例中,nameserver 的组合是唯一的。您可以在name: 1, server: 1 上创建唯一索引。如果文档不存在,使用updateOne 函数更新并将upsert 标志设置为true 以插入文档。下面是来自 mongo shell 的命令,向您展示它是如何工作的。

db.records.drop()

db.records.createIndex(name:1, server:1)

db.records.insertMany([     
     name: 'Jean', server: 'Alpha', level: 9 ,        
     name: 'Anna', server: 'Beta', level: 17 ,        
     name: 'Jean', server: 'Beta', level: 10   ])

db.records.find(, _id: 0)

db.records.updateOne(
     name: 'Victor', server: 'Alpha', level: $lte: 22 ,     
    $set: name: 'Victor', server: 'Alpha', level: 22 ,      
    upsert: true)

db.records.find(, _id: 0)

db.records.updateOne(
     name: 'Jean', server: 'Alpha', level: $lte: 9 , 
    $set: name: 'Jean', server: 'Alpha', level: 9, 
    upsert: 1)

db.records.find(, _id: 0)

db.records.updateOne(
     name: 'Jean', server: 'Alpha', level: $lte: 10 , 
    $set: name: 'Jean', server: 'Alpha', level: 10 , 
    upsert: 1)

db.records.find(, _id: 0)

【讨论】:

这真的很好,但现在我坚持使用案例 #3 的第二个示例。 name: 'Jean', server: 'Alpha', level: $lte: 8 不匹配任何内容并更新插入,导致文档重复。 不,唯一索引将防止创建重复。 哦,我知道了。只是盲目地复制了没有设置唯一选项的createIndex(name:1, server:1)。现在我只需要找到一种正确的方法来处理现在正在抛出的“E11000 重复键错误”异常,然后我猜我就完成了。真是天才,非常感谢您的帮助。

以上是关于使用 mongodb / mongoose 有条件地将 5-20k 文档的输入批次处理成一个包含多达一百万个文档的集合的有效方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

使用 OR 条件使用 mongoose 删除 MongoDB 中的文档

$match mongodb聚合框架_mongoose中的多条件[重复]

$match mongodb聚合框架_mongoose中的多条件[重复]

mongoose使用之查询篇

带有 $or 条件的 Mongoose findOne 方法会引发错误

带有 $or 条件的 Mongoose findOne 方法会引发错误