MongoDB:将来自多个集合的数据合并为一个..如何?

Posted

技术标签:

【中文标题】MongoDB:将来自多个集合的数据合并为一个..如何?【英文标题】:MongoDB: Combine data from multiple collections into one..how? 【发布时间】:2011-08-06 14:44:08 【问题描述】:

我如何(在 MongoDB 中)将来自多个集合的数据合并到一个集合中?

我可以使用 map-reduce 吗?如果可以,如何使用?

由于我是新手,我将不胜感激。

【问题讨论】:

您只是想将不同集合中的文档复制到一个集合中,还是您的计划是什么?你能指定“组合”吗?如果您只想通过 mongo shell 复制 db.collection1.find().forEach(function(doc)db.collection2.save(doc)); 就足够了。如果您不使用 mongo shell,请指定您使用的驱动程序(java、php、...)。 所以我有一个集合(比如用户)而不是其他集合说地址簿集合、书籍集合列表等。我如何根据 say user_id 键将这些集合组合成一个收藏。 ? 相关:***.com/q/2350495/435605 【参考方案1】:

MongoDB 3.2 现在允许通过$lookup aggregation stage 将来自多个集合的数据合并为一个。作为一个实际示例,假设您将有关书籍的数据分为两个不同的集合。

第一个集合,称为books,具有以下数据:


    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe"


    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe"

第二个集合,称为books_selling_data,具有以下数据:


    "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
    "isbn": "978-3-16-148410-0",
    "copies_sold": 12500


    "_id": ObjectId("56e31ce076cdf52e541d9d28"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 720050


    "_id": ObjectId("56e31ce076cdf52e541d9d29"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 1000

要合并两个集合,只需按以下方式使用 $lookup:

db.books.aggregate([
    $lookup: 
            from: "books_selling_data",
            localField: "isbn",
            foreignField: "isbn",
            as: "copies_sold"
        
])

经过此聚合后,books 集合将如下所示:


    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe",
    "copies_sold": [
        
            "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
            "isbn": "978-3-16-148410-0",
            "copies_sold": 12500
        
    ]


    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe",
    "copies_sold": [
        
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 720050
        ,
        
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 1000
        
    ]

注意几点很重要:

    “来自”集合(在本例中为 books_selling_data)无法分片。 “as”字段将是一个数组,如上例所示。 如果 $lookup stage 上的“localField”和“foreignField”选项在各自的集合中不存在,则它们都将被视为 null 以进行匹配($lookup docs 有一个很好的例子)。李>

因此,作为一个结论,如果您想合并两个集合,在这种情况下,有一个包含已售总副本的平面副本sold 字段,您将需要多做一些工作,可能会使用一个中间集合,然后,$out 到最后的集合。

【讨论】:

您好,您能告诉我管理这样的数据的优化方法是什么:用户、file.files 和 file.chunks 是三个集合,我想要特定用户及其所有相关文件一个回应是可能的。? “名称”:“蝙蝠侠”,“电子邮件”:“bt@gmail.com”,“文件”:[file1,file2,file3,....等等] 上述解决方案的官方文档示例可以在这里找到:docs.mongodb.com/manual/reference/operator/aggregation/lookup 好吧,实际上我的答案已经包含三个指向官方文档的链接。但无论如何感谢您的贡献。 @JakubCzaplicki 我可能大脑完全失灵(很可能),但在$lookup 中,“localField”和“foreignField”不应该都等于“isbn”吗?不是“_id”和“isbn”?【参考方案2】:

虽然您不能实时执行此操作,但您可以使用 MongoDB 1.8+ map/reduce 中的“reduce”out 选项多次运行 map-reduce 以将数据合并在一起(请参阅http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Outputoptions)。您需要在两个集合中都有一些可以用作 _id 的键。

例如,假设您有一个 users 集合和一个 comments 集合,并且您希望拥有一个包含每个评论的一些用户人口统计信息的新集合。

假设users 集合具有以下字段:

_id 名字 姓氏 国家 性别 年龄

然后comments集合有以下字段:

_id 用户ID 评论 已创建

你会做这个 map/reduce:

var mapUsers, mapComments, reduce;
db.users_comments.remove();

// setup sample data - wouldn't actually use this in production
db.users.remove();
db.comments.remove();
db.users.save(firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18");
db.users.save(firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25");
db.users.save(firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13");
var users = db.users.find();
db.comments.save(userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate());
db.comments.save(userId: users[1]._id, "comment": "Not much", created: new ISODate());
db.comments.save(userId: users[0]._id, "comment": "Cool", created: new ISODate());
// end sample data setup

mapUsers = function() 
    var values = 
        country: this.country,
        gender: this.gender,
        age: this.age
    ;
    emit(this._id, values);
;
mapComments = function() 
    var values = 
        commentId: this._id,
        comment: this.comment,
        created: this.created
    ;
    emit(this.userId, values);
;
reduce = function(k, values) 
    var result = , commentFields = 
        "commentId": '', 
        "comment": '',
        "created": ''
    ;
    values.forEach(function(value) 
        var field;
        if ("comment" in value) 
            if (!("comments" in result)) 
                result.comments = [];
            
            result.comments.push(value);
         else if ("comments" in value) 
            if (!("comments" in result)) 
                result.comments = [];
            
            result.comments.push.apply(result.comments, value.comments);
        
        for (field in value) 
            if (value.hasOwnProperty(field) && !(field in commentFields)) 
                result[field] = value[field];
            
        
    );
    return result;
;
db.users.mapReduce(mapUsers, reduce, "out": "reduce": "users_comments");
db.comments.mapReduce(mapComments, reduce, "out": "reduce": "users_comments");
db.users_comments.find().pretty(); // see the resulting collection

此时,您将拥有一个名为 users_comments 的新集合,其中包含合并的数据,您现在可以使用它了。这些缩减的集合都有_id,这是您在地图函数中发出的键,然后所有值都是value 键内的子对象 - 这些值不在这些缩减文档的顶层.

这是一个简单的例子。您可以对更多集合重复此操作,以继续建立减少的集合。您还可以在此过程中对数据进行汇总和聚合。您可能会定义多个 reduce 函数,因为聚合和保留现有字段的逻辑变得更加复杂。

您还会注意到,现在每个用户都有一个文档,其中该用户的所有 cmets 都在一个数组中。如果我们合并具有一对一关系而不是一对多关系的数据,它将是平坦的,您可以简单地使用这样的 reduce 函数:

reduce = function(k, values) 
    var result = ;
    values.forEach(function(value) 
        var field;
        for (field in value) 
            if (value.hasOwnProperty(field)) 
                result[field] = value[field];
            
        
    );
    return result;
;

如果您想展平 users_comments 集合,使其成为每个评论一个文档,另外运行以下命令:

var map, reduce;
map = function() 
    var debug = function(value) 
        var field;
        for (field in value) 
            print(field + ": " + value[field]);
        
    ;
    debug(this);
    var that = this;
    if ("comments" in this.value) 
        this.value.comments.forEach(function(value) 
            emit(value.commentId, 
                userId: that._id,
                country: that.value.country,
                age: that.value.age,
                comment: value.comment,
                created: value.created,
            );
        );
    
;
reduce = function(k, values) 
    var result = ;
    values.forEach(function(value) 
        var field;
        for (field in value) 
            if (value.hasOwnProperty(field)) 
                result[field] = value[field];
            
        
    );
    return result;
;
db.users_comments.mapReduce(map, reduce, "out": "comments_with_demographics");

绝对不应该即时执行此技术。它适用于 cron 作业或类似定期更新合并数据的作业。您可能希望在新集合上运行 ensureIndex 以确保您对它执行的查询快速运行(请记住,您的数据仍在 value 键中,所以如果您要索引 comments_with_demographics on评论created时间,应该是db.comments_with_demographics.ensureIndex("value.created": 1);

【讨论】:

我可能永远不会在生产软件中这样做,但它仍然是一种非常酷的技术。 谢谢,戴夫。在过去的 3 个月里,我使用这种技术为生产中的高流量站点生成了导出和报告表,没有出现任何问题。这是另一篇描述该技术类似使用的文章:tebros.com/2011/07/… 感谢@rmarscher,您的额外细节确实帮助我更好地理解了一切。 我应该用一个使用聚合管道和新的 $lookup 操作的例子来更新这个答案。在这里提到它,直到我可以整理出适当的文章为止。 docs.mongodb.org/manual/reference/operator/aggregation/lookup 仅供参考,对于那些想要快速了解它的作用的人来说,这是在第一块代码 gist.github.com/nolanamy/83d7fb6a9bf92482a1c4311ad9c78835 之后的 users_comments 集合中的内容【参考方案3】:

在单个查询中使用聚合和查找可以在 MongoDB 中以“SQL UNION”方式进行联合。这是我测试过的适用于 MongoDB 4.0 的示例:

// Create employees data for testing the union.
db.getCollection('employees').insert( name: "John", type: "employee", department: "sales" );
db.getCollection('employees').insert( name: "Martha", type: "employee", department: "accounting" );
db.getCollection('employees').insert( name: "Amy", type: "employee", department: "warehouse" );
db.getCollection('employees').insert( name: "Mike", type: "employee", department: "warehouse"  );

// Create freelancers data for testing the union.
db.getCollection('freelancers').insert( name: "Stephany", type: "freelancer", department: "accounting" );
db.getCollection('freelancers').insert( name: "Martin", type: "freelancer", department: "sales" );
db.getCollection('freelancers').insert( name: "Doug", type: "freelancer", department: "warehouse"  );
db.getCollection('freelancers').insert( name: "Brenda", type: "freelancer", department: "sales"  );

// Here we do a union of the employees and freelancers using a single aggregation query.
db.getCollection('freelancers').aggregate( // 1. Use any collection containing at least one document.
  [
     $limit: 1 , // 2. Keep only one document of the collection.
     $project:  _id: '$$REMOVE'  , // 3. Remove everything from the document.

    // 4. Lookup collections to union together.
     $lookup:  from: 'employees', pipeline: [ $match:  department: 'sales'  ], as: 'employees'  ,
     $lookup:  from: 'freelancers', pipeline: [ $match:  department: 'sales'  ], as: 'freelancers'  ,

    // 5. Union the collections together with a projection.
     $project:  union:  $concatArrays: ["$employees", "$freelancers"]   ,

    // 6. Unwind and replace root so you end up with a result set.
     $unwind: '$union' ,
     $replaceRoot:  newRoot: '$union'  
  ]);

下面是它如何工作的解释:

    从您的数据库的任何集合中实例化一个aggregate,其中至少包含一个文档。如果您不能保证您的数据库的任何集合都不会为空,您可以通过在数据库中创建某种包含单个空文档的“虚拟”集合来解决此问题,该集合将专门用于执行联合查询。

    将管道的第一阶段设为 $limit: 1 。这将删除集合中除第一个之外的所有文档。

    使用$project 阶段剥离剩余文档的所有字段:

     $project:  _id: '$$REMOVE'  
    

    您的聚合现在包含一个空文档。是时候为要联合在一起的每个集合添加查找了。您可以使用 pipeline 字段进行一些特定的过滤,或者将 localFieldforeignField 保留为 null 以匹配整个集合。

     $lookup:  from: 'collectionToUnion1', pipeline: [...], as: 'Collection1'  ,
     $lookup:  from: 'collectionToUnion2', pipeline: [...], as: 'Collection2'  ,
     $lookup:  from: 'collectionToUnion3', pipeline: [...], as: 'Collection3'  
    

    您现在有一个聚合,其中包含一个包含 3 个数组的单个文档,如下所示:

    
        Collection1: [...],
        Collection2: [...],
        Collection3: [...]
    
    

    然后,您可以使用 $project 阶段和 $concatArrays 聚合运算符将它们合并到一个数组中:

    
      "$project" :
      
        "Union" :  $concatArrays: ["$Collection1", "$Collection2", "$Collection3"] 
      
    
    

    您现在有一个包含单个文档的聚合,其中包含一个包含您的集合并集的数组。剩下要做的是添加一个$unwind 和一个$replaceRoot 阶段以将您的数组拆分为单独的文档:

     $unwind: "$Union" ,
     $replaceRoot:  newRoot: "$Union"  
    

    瞧。您现在有一个结果集,其中包含您想要联合在一起的集合。然后,您可以添加更多阶段以进一步过滤、排序、应用 skip() 和 limit()。几乎任何你想要的东西。

【讨论】:

查询失败并显示消息“$projection 需要至少一个输出字段”。 @abhishek 如果你明白了,那是因为你试图在单个投影阶段从单个文档中删除所有字段。 MongoDB 不会让你这样做。要解决此问题,您需要执行 2 个连续的投影,其中第一个删除除 _id 之外的所有内容,第二个删除剩余的 _id。 感谢对每个步骤的详细解释 @sboisse 此查询将如何在大型集合上执行? @ankita 到目前为止,我对这种方法的个人体验在性能方面非常令人满意。但是,如果您需要以 SQL UNION 方式进行聚合,我看不到替代方案。如果您对这种方法有性能问题,我会考虑在查找管道中优化我的查询,并为查找的集合添加适当的索引。在管道的初始步骤中过滤得越多越好。在第 1 步,我也会尝试选择一个小集合。也许是一个只包含一个文档的集合,以便此步骤尽可能快。【参考方案4】:

非常基本的 $lookup 示例。

db.getCollection('users').aggregate([
    
        $lookup: 
            from: "userinfo",
            localField: "userId",
            foreignField: "userId",
            as: "userInfoData"
        
    ,
    
        $lookup: 
            from: "userrole",
            localField: "userId",
            foreignField: "userId",
            as: "userRoleData"
        
    ,
     $unwind:  path: "$userInfoData", preserveNullAndEmptyArrays: true ,
     $unwind:  path: "$userRoleData", preserveNullAndEmptyArrays: true 
])

这里是用的

  $unwind:  path: "$userInfoData", preserveNullAndEmptyArrays: true , 
  $unwind:  path: "$userRoleData", preserveNullAndEmptyArrays: true 

代替

 $unwind:"$userRoleData" 
 $unwind:"$userRoleData"

因为 $unwind:"$userRoleData" 如果使用 $lookup 找不到匹配记录,这将返回空或 0 结果。

【讨论】:

【参考方案5】:

如果没有批量插入到mongodb中,我们循环small_collection中的所有对象,并一一插入到big_collection中:

db.small_collection.find().forEach(function(obj) 
   db.big_collection.insert(obj)
);

【讨论】:

db.colleciton.insert([,,]) Insert 接受数组。 这适用于小型集合,但不要忘记迁移索引 :)【参考方案6】:

对聚合中的多个集合使用多个 $lookup

查询:

db.getCollection('servicelocations').aggregate([
  
    $match: 
      serviceLocationId: 
        $in: ["36728"]
      
    
  ,
  
    $lookup: 
      from: "orders",
      localField: "serviceLocationId",
      foreignField: "serviceLocationId",
      as: "orders"
    
  ,
  
    $lookup: 
      from: "timewindowtypes",
      localField: "timeWindow.timeWindowTypeId",
      foreignField: "timeWindowTypeId",
      as: "timeWindow"
    
  ,
  
    $lookup: 
      from: "servicetimetypes",
      localField: "serviceTimeTypeId",
      foreignField: "serviceTimeTypeId",
      as: "serviceTime"
    
  ,
  
    $unwind: "$orders"
  ,
  
    $unwind: "$serviceTime"
  ,
  
    $limit: 14
  
])

结果:


    "_id" : ObjectId("59c3ac4bb7799c90ebb3279b"),
    "serviceLocationId" : "36728",
    "regionId" : 1.0,
    "zoneId" : "DXBZONE1",
    "description" : "AL HALLAB REST EMIRATES MALL",
    "locationPriority" : 1.0,
    "accountTypeId" : 1.0,
    "locationType" : "SERVICELOCATION",
    "location" : 
        "makani" : "",
        "lat" : 25.119035,
        "lng" : 55.198694
    ,
    "deliveryDays" : "MTWRFSU",
    "timeWindow" : [ 
        
            "_id" : ObjectId("59c3b0a3b7799c90ebb32cde"),
            "timeWindowTypeId" : "1",
            "Description" : "MORNING",
            "timeWindow" : 
                "openTime" : "06:00",
                "closeTime" : "08:00"
            ,
            "accountId" : 1.0
        , 
        
            "_id" : ObjectId("59c3b0a3b7799c90ebb32cdf"),
            "timeWindowTypeId" : "1",
            "Description" : "MORNING",
            "timeWindow" : 
                "openTime" : "09:00",
                "closeTime" : "10:00"
            ,
            "accountId" : 1.0
        , 
        
            "_id" : ObjectId("59c3b0a3b7799c90ebb32ce0"),
            "timeWindowTypeId" : "1",
            "Description" : "MORNING",
            "timeWindow" : 
                "openTime" : "10:30",
                "closeTime" : "11:30"
            ,
            "accountId" : 1.0
        
    ],
    "address1" : "",
    "address2" : "",
    "phone" : "",
    "city" : "",
    "county" : "",
    "state" : "",
    "country" : "",
    "zipcode" : "",
    "imageUrl" : "",
    "contact" : 
        "name" : "",
        "email" : ""
    ,
    "status" : "ACTIVE",
    "createdBy" : "",
    "updatedBy" : "",
    "updateDate" : "",
    "accountId" : 1.0,
    "serviceTimeTypeId" : "1",
    "orders" : [ 
        
            "_id" : ObjectId("59c3b291f251c77f15790f92"),
            "orderId" : "AQ18O1704264",
            "serviceLocationId" : "36728",
            "orderNo" : "AQ18O1704264",
            "orderDate" : "18-Sep-17",
            "description" : "AQ18O1704264",
            "serviceType" : "Delivery",
            "orderSource" : "Import",
            "takenBy" : "KARIM",
            "plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
            "plannedDeliveryTime" : "",
            "actualDeliveryDate" : "",
            "actualDeliveryTime" : "",
            "deliveredBy" : "",
            "size1" : 296.0,
            "size2" : 3573.355,
            "size3" : 240.811,
            "jobPriority" : 1.0,
            "cancelReason" : "",
            "cancelDate" : "",
            "cancelBy" : "",
            "reasonCode" : "",
            "reasonText" : "",
            "status" : "",
            "lineItems" : [ 
                
                    "ItemId" : "BNWB020",
                    "size1" : 15.0,
                    "size2" : 78.6,
                    "size3" : 6.0
                , 
                
                    "ItemId" : "BNWB021",
                    "size1" : 20.0,
                    "size2" : 252.0,
                    "size3" : 11.538
                , 
                
                    "ItemId" : "BNWB023",
                    "size1" : 15.0,
                    "size2" : 285.0,
                    "size3" : 16.071
                , 
                
                    "ItemId" : "CPMW112",
                    "size1" : 3.0,
                    "size2" : 25.38,
                    "size3" : 1.731
                , 
                
                    "ItemId" : "MMGW001",
                    "size1" : 25.0,
                    "size2" : 464.375,
                    "size3" : 46.875
                , 
                
                    "ItemId" : "MMNB218",
                    "size1" : 50.0,
                    "size2" : 920.0,
                    "size3" : 60.0
                , 
                
                    "ItemId" : "MMNB219",
                    "size1" : 50.0,
                    "size2" : 630.0,
                    "size3" : 40.0
                , 
                
                    "ItemId" : "MMNB220",
                    "size1" : 50.0,
                    "size2" : 416.0,
                    "size3" : 28.846
                , 
                
                    "ItemId" : "MMNB270",
                    "size1" : 50.0,
                    "size2" : 262.0,
                    "size3" : 20.0
                , 
                
                    "ItemId" : "MMNB302",
                    "size1" : 15.0,
                    "size2" : 195.0,
                    "size3" : 6.0
                , 
                
                    "ItemId" : "MMNB373",
                    "size1" : 3.0,
                    "size2" : 45.0,
                    "size3" : 3.75
                
            ],
            "accountId" : 1.0
        , 
        
            "_id" : ObjectId("59c3b291f251c77f15790f9d"),
            "orderId" : "AQ137O1701240",
            "serviceLocationId" : "36728",
            "orderNo" : "AQ137O1701240",
            "orderDate" : "18-Sep-17",
            "description" : "AQ137O1701240",
            "serviceType" : "Delivery",
            "orderSource" : "Import",
            "takenBy" : "KARIM",
            "plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
            "plannedDeliveryTime" : "",
            "actualDeliveryDate" : "",
            "actualDeliveryTime" : "",
            "deliveredBy" : "",
            "size1" : 28.0,
            "size2" : 520.11,
            "size3" : 52.5,
            "jobPriority" : 1.0,
            "cancelReason" : "",
            "cancelDate" : "",
            "cancelBy" : "",
            "reasonCode" : "",
            "reasonText" : "",
            "status" : "",
            "lineItems" : [ 
                
                    "ItemId" : "MMGW001",
                    "size1" : 25.0,
                    "size2" : 464.38,
                    "size3" : 46.875
                , 
                
                    "ItemId" : "MMGW001-F1",
                    "size1" : 3.0,
                    "size2" : 55.73,
                    "size3" : 5.625
                
            ],
            "accountId" : 1.0
        , 
        
            "_id" : ObjectId("59c3b291f251c77f15790fd8"),
            "orderId" : "AQ110O1705036",
            "serviceLocationId" : "36728",
            "orderNo" : "AQ110O1705036",
            "orderDate" : "18-Sep-17",
            "description" : "AQ110O1705036",
            "serviceType" : "Delivery",
            "orderSource" : "Import",
            "takenBy" : "KARIM",
            "plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
            "plannedDeliveryTime" : "",
            "actualDeliveryDate" : "",
            "actualDeliveryTime" : "",
            "deliveredBy" : "",
            "size1" : 60.0,
            "size2" : 1046.0,
            "size3" : 68.0,
            "jobPriority" : 1.0,
            "cancelReason" : "",
            "cancelDate" : "",
            "cancelBy" : "",
            "reasonCode" : "",
            "reasonText" : "",
            "status" : "",
            "lineItems" : [ 
                
                    "ItemId" : "MMNB218",
                    "size1" : 50.0,
                    "size2" : 920.0,
                    "size3" : 60.0
                , 
                
                    "ItemId" : "MMNB219",
                    "size1" : 10.0,
                    "size2" : 126.0,
                    "size3" : 8.0
                
            ],
            "accountId" : 1.0
        
    ],
    "serviceTime" : 
        "_id" : ObjectId("59c3b07cb7799c90ebb32cdc"),
        "serviceTimeTypeId" : "1",
        "serviceTimeType" : "nohelper",
        "description" : "",
        "fixedTime" : 30.0,
        "variableTime" : 0.0,
        "accountId" : 1.0
    

【讨论】:

【参考方案7】:

Mongo 4.4 开始,我们可以通过将新的$unionWith 聚合阶段与$group 的新$accumulator 运算符耦合来在聚合管道中实现此连接:

// > db.users.find()
//   [ user: 1, name: "x" ,  user: 2, name: "y" ]
// > db.books.find()
//   [ user: 1, book: "a" ,  user: 1, book: "b" ,  user: 2, book: "c" ]
// > db.movies.find()
//   [ user: 1, movie: "g" ,  user: 2, movie: "h" ,  user: 2, movie: "i" ]
db.users.aggregate([
   $unionWith: "books"  ,
   $unionWith: "movies" ,
   $group: 
    _id: "$user",
    user: 
      $accumulator: 
        accumulateArgs: ["$name", "$book", "$movie"],
        init: function()  return  books: [], movies: []  ,
        accumulate: function(user, name, book, movie) 
          if (name) user.name = name;
          if (book) user.books.push(book);
          if (movie) user.movies.push(movie);
          return user;
        ,
        merge: function(userV1, userV2) 
          if (userV2.name) userV1.name = userV2.name;
          userV1.books.concat(userV2.books);
          userV1.movies.concat(userV2.movies);
          return userV1;
        ,
        lang: "js"
      
    
  
])
//  _id: 1, user:  books: ["a", "b"], movies: ["g"], name: "x"  
//  _id: 2, user:  books: ["c"], movies: ["h", "i"], name: "y"  

$unionWith 将来自给定集合的记录合并到已经在聚合管道中的文档中。在 2 个联合阶段之后,我们因此拥有所有用户、书籍和电影记录。

然后我们通过$user 记录$group 并使用$accumulator 运算符累积项目,从而允许在文档分组时自定义累积:

我们有兴趣累积的字段是用accumulateArgs 定义的。 init 定义了在我们对元素进行分组时将累积的状态。 accumulate 函数允许执行自定义操作,将记录分组以构建累积状态。例如,如果被分组的项目定义了 book 字段,那么我们更新状态的 books 部分。 merge 用于合并两个内部状态。它仅用于在分片集群上运行的聚合或当操作超出内存限制时。

【讨论】:

【参考方案8】:

Mongorestore 具有附加到数据库中已有内容之上的这一特性,因此此行为可用于组合两个集合:

    mongodump 收集1 collection2.rename(collection1) mongorestore

尚未尝试,但它可能比 map/reduce 方法执行得更快。

【讨论】:

【参考方案9】:

是的,你可以:使用我今天写的这个实用函数:

function shangMergeCol() 
  tcol= db.getCollection(arguments[0]);
  for (var i=1; i<arguments.length; i++)
    scol= db.getCollection(arguments[i]);
    scol.find().forEach(
        function (d) 
            tcol.insert(d);
        
    )
  

您可以向此函数传递任意数量的集合,第一个将成为目标集合。所有其余的集合都是要转移到目标集合的源。

【讨论】:

【参考方案10】:

代码 sn-p。礼貌-多个关于堆栈溢出的帖子,包括这个。

 db.cust.drop();
 db.zip.drop();
 db.cust.insert(cust_id:1, zip_id: 101);
 db.cust.insert(cust_id:2, zip_id: 101);
 db.cust.insert(cust_id:3, zip_id: 101);
 db.cust.insert(cust_id:4, zip_id: 102);
 db.cust.insert(cust_id:5, zip_id: 102);

 db.zip.insert(zip_id:101, zip_cd:'AAA');
 db.zip.insert(zip_id:102, zip_cd:'BBB');
 db.zip.insert(zip_id:103, zip_cd:'CCC');

mapCust = function() 
    var values = 
        cust_id: this.cust_id
    ;
    emit(this.zip_id, values);
;

mapZip = function() 
    var values = 
    zip_cd: this.zip_cd
    ;
    emit(this.zip_id, values);
;

reduceCustZip =  function(k, values) 
    var result = ;
    values.forEach(function(value) 
    var field;
        if ("cust_id" in value) 
            if (!("cust_ids" in result)) 
                result.cust_ids = [];
            
            result.cust_ids.push(value);
         else 
    for (field in value) 
        if (value.hasOwnProperty(field) ) 
                result[field] = value[field];
        
         ;  
       
      );
       return result;
;


db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, "out": "reduce": "cust_zip");
db.zip.mapReduce(mapZip, reduceCustZip, "out": "reduce": "cust_zip");
db.cust_zip.find();


mapCZ = function() 
    var that = this;
    if ("cust_ids" in this.value) 
        this.value.cust_ids.forEach(function(value) 
            emit(value.cust_id, 
                zip_id: that._id,
                zip_cd: that.value.zip_cd
            );
        );
    
;

reduceCZ = function(k, values) 
    var result = ;
    values.forEach(function(value) 
        var field;
        for (field in value) 
            if (value.hasOwnProperty(field)) 
                result[field] = value[field];
            
        
    );
    return result;
;
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, "out": "cust_zip_joined"); 
db.cust_zip_joined.find().pretty();


var flattenMRCollection=function(dbName,collectionName) 
    var collection=db.getSiblingDB(dbName)[collectionName];

    var i=0;
    var bulk=collection.initializeUnorderedBulkOp();
    collection.find( value:  $exists: true  ).addOption(16).forEach(function(result) 
        print((++i));
        //collection.update(_id: result._id,result.value);

        bulk.find(_id: result._id).replaceOne(result.value);

        if(i%1000==0)
        
            print("Executing bulk...");
            bulk.execute();
            bulk=collection.initializeUnorderedBulkOp();
        
    );
    bulk.execute();
;


flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();

【讨论】:

【参考方案11】:

您必须在应用程序层中执行此操作。如果您使用的是 ORM,它可以使用注释(或类似的东西)来提取其他集合中存在的引用。我只使用过Morphia,而@Reference注解在查询时会获取引用的实体,所以我可以避免自己在代码中这样做。

【讨论】:

以上是关于MongoDB:将来自多个集合的数据合并为一个..如何?的主要内容,如果未能解决你的问题,请参考以下文章

$lookup 运算符未能返回来自多个集合的组合数据

来自mongodb的多个集合的总和

NoSQL中的数据模型来自多个SQL表

将具有相同列/索引的两个 pandas DataFrame 合并为一个 DataFrame

MongoDB 简介

在mongodb聚合中将多个对象合并为一个对象