基于通过 $lookup 检索的字段的多阶段聚合管道匹配数据

Posted

技术标签:

【中文标题】基于通过 $lookup 检索的字段的多阶段聚合管道匹配数据【英文标题】:multi-stage aggregation pipeline matching data based on fields retrieved through $lookup 【发布时间】:2021-12-21 15:08:12 【问题描述】:

我正在尝试在 MongoDB(4.4.9 社区版,使用 Python 3.10 的 pymongo 驱动程序)中构建复杂的嵌套聚合管道。

我想将不同集合中的相关数据点汇总到一个新的(理想情况下)视图(或者,如果这不起作用)集合中。

集合和其中的相关字段遵循层次结构。有members,其中包含要合并其他数据的***键, membershipNumber.

> members.find_one()
'_id': ObjectId('61153299af6122XXXXXXXXXXXXX'), 'membershipNumber': 'N03XXXXXX'

然后,有一个不同的集合,其中包含membershipNumber,但也有一个不同的链接字段an_user_idan_user_id 在其他集合中用于表示与该特定用户相关的数组中的记录/字段。

我像这样“加入”membersan_users

result = members.aggregate([
    
        '$lookup': 
            'from': 'an_users',                   
            'localField': 'membershipNumber',     
            'foreignField': 'memref',             
            'as': 'an_users'                      
        
    ,
       '$unwind' : '$an_users' ,     
       
        '$project' : 
            '_id' : 1,
            'membershipNumber' : 1,
            'an_user_id' : '$an_users.user_id'
         
    
]);

到目前为止一切顺利,这将返回所需的聚合记录:

'_id': ObjectId('61153253aBBBBBBBBBBBB'),
  'membershipNumber': 'N0XXXXXXXX',
  'an_user_id': '48XXXXXX'

现在,我有第三个集合,其中包含 an_user_id 作为数组中的字符串,表示用户单击给定电子邮件的任何位置,其中记录是电子邮件(以及 @987654333 clicks 数组中的 @s 是单击该电子邮件中的链接的用户。

'_id': ObjectId('blah'),
 'email_id': '407XXX',
 'actions_count': 17,
 'administrative_title': 'test',
 'bounce': ['3440XXXX'],
 'click': ['38294CCC',
  '418FFFF',
  '48XXXXXX',
  '38eGGGG'

我想计算emails 集合中数组(例如clicksbouncesopens)中给定an_user_id 的出现次数,并包括它在.aggregate 调用中,检索如下内容:

'_id': ObjectId('61153253aBBBBBBBBBBBB'),
  'membershipNumber': 'N0XXXXXXXX',
  'an_user_id': '48XXXXXX',
  'n_email_clicks' : 412,
  'n_email_bounces' : 12

此外,我可能还想在我的数据库的其他集合中附加 an_user_id 的计数。

考虑一下,例如,这个名为 events 的集合:


    "_id": "617ffa96ee11844e143a63dd",
    "id": "12345",
    "administrative_title": "my_event",
    "created_at": 
        "$date": "2020-01-15T16:28:50.000Z"
    ,
    "event_creator_id": "123456",
    "event_title": "my_event",
    "group_id": "123456",
    "permalink": "event_id",
    "rsvp_count": 54,
    "rsvps": [
        "rsvp_id": "56789",
        "display_name": "John Doe",
        "rsvp_user_id": "48XXXXXX",
        "rsvp_created_at": 
            "$date": "2020-01-28T15:38:50.000Z"
        ,
        "rsvp_updated_at": 
            "$date": "2020-01-28T15:38:50.000Z"
        ,
        "first_name": "John",
        "last_name": "Doe",
    , 
        "rsvp_id": "543895",
        "display_name": "James Appleslice",
        "rsvp_user_id": "N03XXXXXX",
        "rsvp_created_at": 
            "$date": "2020-02-05T13:15:14.000Z"
        ,
        "rsvp_updated_at": 
            "$date": "2020-02-05T13:15:14.000Z"
        ,
        "first_name": "James",
        "last_name": "Appleslice"
  ]

因此,最终产品看起来像这样:

'_id': ObjectId('61153253aBBBBBBBBBBBB'),
  'membershipNumber': 'N0XXXXXXXX',
  'an_user_id': '48XXXXXX',
  'n_email_clicks' : 412,
  'n_email_bounces' : 12,
  'n_rsvps' : 12

我的想法是使用$lookup 参数——但是,我只知道如何使用它来匹配我正在执行聚合的父集合中的字段,而不是具有在聚合过程中生成。

任何帮助将不胜感激!

【问题讨论】:

【参考方案1】:

您可以使用$lookup 管道。首先,您将 $lookup 用户 ID 后跟另一个 $lookup 以验证用户 ID 是否存在于电子邮件中。最后还有几个阶段可以根据您的需要收集结果和格式。此外,如果您想将结果写入另一个集合,您可以添加$out 阶段。

db.members.aggregate([
  $lookup: 
    from: "an_users",
    let: 
      membershipNumber: "$membershipNumber"
    ,
    pipeline: [
      
        $match: 
          $expr: 
            $eq: [
              "$memref",
              "$$membershipNumber"
            ]
          ,
          
        
      ,
      
        "$lookup": 
          "from": "emails",
          "localField": "user_id",
          "foreignField": "click",
          "as": "clicks"
        
      ,
      
        "$project": 
          "_id": 1,
          "membershipNumber": 1,
          "an_user_id": "$user_id",
          "n_email_clicks": 
            $size: "$clicks"
          
        
      
    ],
    as: "details"
  
,

  $replaceRoot: 
    newRoot: 
      $mergeObjects: [
        
          $arrayElemAt: [
            "$details",
            0
          ]
        ,
        "$$ROOT"
      ]
    
  
,

  $project: 
    details: 0
  
])

工作示例 - https://mongoplayground.net/p/yrFsNp44hpi

【讨论】:

嗨@s7vr,感谢您的回复。我喜欢这个,这肯定会到达那里。我在 mongo 操场上测试了您的原始查询并且它有效。然后我将它复制到 python 中,但我收到以下 pymongo 错误:TypeError: pipeline must be a list - 这很奇怪,因为它显然 is 是一个列表。 您的查询的编辑版本对我不起作用,它给了我以下错误:TypeError: aggregate() takes from 2 to 3 positional arguments but 4 were given 不用担心 - 我认为我们不需要将它放在 [] 中以便它从 mongodb 控制台或游乐场运行,但我现在添加了 [] 并更新了工作示例链接。您上面提到的两个问题都与格式有关。请试试这个新的。 酷。查询似乎正在工作(相当 - 正在执行),但它现在达到了大小限制......我能做些什么来避免这种情况吗?也许我最后需要$group,或者在某个地方扔一个$unwind?这是错误:OperationFailure: Total size of documents in emails matching pipeline's $lookup stage exceeds 104857600 bytes, full error: 'ok': 0.0, 'errmsg': "Total size of documents in emails matching pipeline's $lookup stage exceeds 104857600 bytes", 'code': 4568, 'codeName': 'Location4568' 是的 - 我们可以添加展开后跟组进行计数 - 这是更新的链接 - mongoplayground.net/p/JHukuiDrirD。还考虑了没有用户电子邮件并将值输出为 0 的情况

以上是关于基于通过 $lookup 检索的字段的多阶段聚合管道匹配数据的主要内容,如果未能解决你的问题,请参考以下文章

MongoDB在具有附加字段的对象数组上聚合$lookup

Mongodb 聚合管道限制 $lookup 字段

MongoDB 聚合中的多个 $project 阶段是不是会影响性能

2022 MS MARCO阿里HLATR:基于混合列表感知Transformer重排的多阶段文本检索增强 ( .feat PRM:个性化的推荐重排)

2022 MS MARCO阿里HLATR:基于混合列表感知Transformer重排的多阶段文本检索增强 ( .feat PRM:个性化的推荐重排)

2022 MS MARCO阿里HLATR:基于混合列表感知Transformer重排的多阶段文本检索增强 ( .feat PRM:个性化的推荐重排)