使用 MongoDB 聚合将集合合并到固定大小

Posted

技术标签:

【中文标题】使用 MongoDB 聚合将集合合并到固定大小【英文标题】:Merge collections up to a fixed size using MongoDB Aggregation 【发布时间】:2021-06-22 23:06:20 【问题描述】:

我有一个看起来像这样的集合:


    "_id" : id1,
    "field1" : 11,
    "field2": 101,
    "localityID" : 27


    "_id" : id2,
    "field1" : 22,
    "field2": 202,
    "localityID" : 27


    "_id" : id3,
    "field1" : 33,
    "field2": 303,
    "localityID" : 27


    "_id" : id4,
    "field1" : 44,
    "field2": 404,
    "localityID" : 27


    "_id" : id5,
    "field1" : 55,
    "field2": 505,
    "localityID" : 27


    "_id" : id6,
    "field1" : 66,
    "field2": 606,
    "localityID" : 61


    "_id" : id4,
    "field1" : 77,
    "field2": 707,
    "localityID" : 61

用例 - 我想以 3 的批次检索和处理具有相同 localityID 的记录。 出于跟踪目的,我还想跟踪在特定批次中处理的记录

同样,我想使用 MongoDB 的聚合框架来组合具有相同 localityID 但最大为固定大小(如上所述为 3)的集合。

我想将上面的集合更新为:


  "_id" : "id111",
  "batchId" : "batch1",
  "localityID": 27,
  "batches": [
     
         "field1" : 11,
         "field2": 101
     ,
     
         "field1" : 22,
         "field2": 202
     ,
     
         "field1" : 33,
         "field2": 303
     
  ]


  "_id" : "id222",
  "batchId" : "batch2",
  "localityID": 27,
  "batches": [
     
         "field1" : 44,
         "field2": 404
     ,
     
         "field1" : 55,
         "field2": 505
     
  ]


  "_id" : "id333",
  "batchId" : "batch1",
  "localityID": 61,
  "batches": [
     
         "field1" : 66,
         "field2": 606
     ,
     
         "field1" : 77,
         "field2": 707
     
  ]

我尝试了几种聚合函数的组合,如下所示,但未能获得预期的结果。

(这能够将所有具有相同localityID 的记录组合在一起但只能在一个文档中,而不是批量

db.old_collection.aggregate([
   "$group":  "_id": "$localityID"  ,
   "$lookup": 
    "from": "old_collection",
    "let":  "lid": "$_id" ,
    "pipeline": [
       "$match":  "$expr":  "$eq": [ "$localityID", "$$lid" ]  ,
       "$project":  "_id": 0, "field1": 1, "field2": 1  
    ],
    "as": "batches"
  ,
  "$out": "new_collection" 
])

上述聚合函数产生以下结果 -


  "_id" : "id111",
  "batchId" : "batch1",
  "localityID": 27,
  "batches": [
     
         "field1" : 11,
         "field2": 101
     ,
     
         "field1" : 22,
         "field2": 202
     ,
     
         "field1" : 33,
         "field2": 303
     ,
     
         "field1" : 44,
         "field2": 404
     ,
     
         "field1" : 55,
         "field2": 505
     
  ]


  "_id" : "id333",
  "batchId" : "batch1",
  "localityID": 61,
  "batches": [
     
         "field1" : 66,
         "field2": 606
     ,
     
         "field1" : 77,
         "field2": 707
     
  ]

这可以通过 Mongo 的聚合框架实现吗?或者我会用其他东西更好吗?

【问题讨论】:

您的聚合管道没有任何batchId 字段,因此您提供的结果肯定不是来自此聚合管道。我不明白 batchId 字段的逻辑。 是 @WernfriedDomscheit ,batchId 字段在输入中不存在。 .每个localityIDbatchId 的值可以是一个简单的序列号,从0 到为该localityID 创建的文档总数 【参考方案1】:

这个想法来自this answer。 您可以使用$range 生成一个索引数组,其中步骤参数设置为一些bucketSize。那么你只需要$slice 就可以得到一个大小为bucketSize 的数组,试试这个:

let bucketSize = 3;

db.old_collection.aggregate([
    
        $group: 
            _id: "$localityID",
            id:  $first: "$_id" ,
            localityID:  $first: "$localityID" ,
            batches: 
                $push: 
                    field1: "$field1",
                    field2: "$field2"
                
            
        
    ,
    
        $project: 
            _id: 0,
            localityID: "$localityID",
            batches: 
                $map: 
                    input:  $range: [0,  $size: "$batches" , bucketSize] ,
                    as: "index",
                    in:  $slice: ["$batches", "$$index", bucketSize] 
                
            
        
    ,
    
        $unwind: 
            path: "$batches",
            includeArrayIndex: "batchId"
        
    ,
    
        $addFields: 
            batchId: 
                $concat: [
                    "batch",
                     $toString:  $add: ["$batchId", 1]  
                ]
            
        
    ,
    // $sort is optional. You can remove it if not required.
    
        $sort: 
            localityID: 1,
            batchId: 1
        
    
     $out: "new_collection" 
]);

输出

[
    
        "_id": ObjectId("..."),
        "localityID": 27,
        "batches": [
            
                "field1": 11,
                "field2": 101
            ,
            
                "field1": 22,
                "field2": 202
            ,
            
                "field1": 33,
                "field2": 303
            
        ],
        "batchId": "batch1"
    ,
    
        "_id": ObjectId("..."),
        "localityID": 27,
        "batches": [
            
                "field1": 44,
                "field2": 404
            ,
            
                "field1": 55,
                "field2": 505
            
        ],
        "batchId": "batch2"
    ,
    
        "_id": ObjectId("..."),
        "localityID": 61,
        "batches": [
            
                "field1": 66,
                "field2": 606
            ,
            
                "field1": 77,
                "field2": 707
            
        ],
        "batchId": "batch1"
    
]

【讨论】:

在为大型集合(超过 6000 万条记录)实施上述解决方案时,我收到以下错误 $push used too much memory and cannot spill to disk.。有什么方法可以修改上述解决方案以解决此错误。我尝试启用allowDiskUse,但没有解决问题。 完整错误消息The full response is "operationTime": "$timestamp": "t": 1617712444, "i": 1, "ok": 0.0, "errmsg": "$push used too much memory and cannot spill to disk. Memory limit: 104857600 bytes", "code": 146, "codeName": "ExceededMemoryLimit", "$clusterTime": "clusterTime": "$timestamp": "t": 1617712522, "i": 1, "keyId": 6903928055920590851 我不擅长编写内存高效查询。如果我找到一些解决方案,我会更新你。对不起! 对大型收藏的解决方案有任何想法吗?【参考方案2】:

如前所述,我不了解字段 batchId 的逻辑。除此之外,简单的解决方案可能是这个:

db.collection.aggregate([
    $group:  _id: "$localityID", batches:  $push:  field1: "$field1", field2: "$field2"    ,
   
      $project: 
         localityID: "$_id",
         batches:  $slice: ["$batches", 1, 3] 
      
   
])

【讨论】:

以上是关于使用 MongoDB 聚合将集合合并到固定大小的主要内容,如果未能解决你的问题,请参考以下文章

Mongodb聚合计数数组/集合大小

MongoDB固定集合

基于键合并两个数组并使用 mongodb 聚合添加新字段

MongoDB基础

MongoDB聚合结果输出到新的集合方法与案例实践

MongoDB聚合结果输出到新的集合方法与案例实践