使用 MongoDB 聚合框架计算一阶导数

Posted

技术标签:

【中文标题】使用 MongoDB 聚合框架计算一阶导数【英文标题】:Compute first order derivative with MongoDB aggregation framework 【发布时间】:2016-12-21 19:26:58 【问题描述】:

是否可以使用聚合框架计算一阶导数?

例如,我有数据:

time_series : [10,20,40,70,110]

我正在尝试获得如下输出:

derivative : [10,20,30,40]

【问题讨论】:

您是否有理由希望在聚合框架中执行此操作而不是使用强大的 python 库实现? @JohnnyHK - 你能给我一个python库实现的例子吗?我目前的解决方法是使用 pymongo 获取所有字段,并在 python 中进行衍生。结果证明它很慢(受网络带宽限制?),这让我四处寻找替代方案。 @JohnnyHK 我认为聚合框架是这里的最佳选择。甚至比numpy.diff 还要快。我在答案中添加了基准测试结果 @Styvane 不要误会我的意思,我是第一个在这里对这两个答案投赞成票的人,因为它们很棒,但“最佳”选项不仅仅是性能。经过良好测试的库调用比复杂的聚合管道更简单/更容易理解/更清晰。 @JohnnyHK 我完全同意。并非编程中的所有内容都与性能有关。很遗憾 MongoDB 没有为此提供运算符。顺便说一句,我在写作时多次忘记了花括号。 【参考方案1】:
db.collection.aggregate(
    [
      
        "$addFields": 
          "indexes": 
            "$range": [
              0,
              
                "$size": "$time_series"
              
            ]
          ,
          "reversedSeries": 
            "$reverseArray": "$time_series"
          
        
      ,
      
        "$project": 
          "derivatives": 
            "$reverseArray": 
              "$slice": [
                
                  "$map": 
                    "input": 
                      "$zip": 
                        "inputs": [
                          "$reversedSeries",
                          "$indexes"
                        ]
                      
                    ,
                    "in": 
                      "$subtract": [
                        
                          "$arrayElemAt": [
                            "$$this",
                            0
                          ]
                        ,
                        
                          "$arrayElemAt": [
                            "$reversedSeries",
                            
                              "$add": [
                                
                                  "$arrayElemAt": [
                                    "$$this",
                                    1
                                  ]
                                ,
                                1
                              ]
                            
                          ]
                        
                      ]
                    
                  
                ,
                
                  "$subtract": [
                    
                      "$size": "$time_series"
                    ,
                    1
                  ]
                
              ]
            
          ,
          "time_series": 1
        
      
    ]
)

我们可以在 3.4+ 版本中使用上述管道来执行此操作。 在管道中,我们使用$addFields 管道阶段。运算符添加“time_series”的元素索引的数组来做文档,我们也反转时间序列数组并分别使用$range$reverseArray运算符将其添加到文档中

我们在这里反转了数组,因为数组中位置p的元素总是大于位置p+1的元素,这意味着[p] - [p+1] < 0,我们不想在这里使用$multiply。(请参阅版本 3.2 的管道)

接下来,我们使用索引数组$zipped 时间序列数据,并使用$map 运算符将substract 表达式应用于结果数组。

然后我们$slice 将结果从数组中丢弃null/None 并重新反转结果。


在 3.2 中,我们可以使用$unwind 运算符来展开我们的数组,并通过将文档指定为操作数而不是传统的以 $.

接下来,我们需要 $group 我们的文档并使用 $push 累加器运算符返回一组子文档,如下所示:


    "_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
    "time_series" : [
         "value" : 10, "index" : NumberLong(0) ,
         "value" : 20, "index" : NumberLong(1) ,
         "value" : 40, "index" : NumberLong(2) ,
         "value" : 70, "index" : NumberLong(3) ,
         "value" : 110, "index" : NumberLong(4) 
    ]


终于来到了$project 阶段。在这个阶段,我们需要使用$map 运算符对$group 阶段新计算的数组中的每个元素应用一系列表达式。

这是$map 内部发生的事情(请参阅$map 作为 for 循环)in 表达式:

对于每个子文档,我们使用$let 变量运算符将value 字段分配给一个变量。然后我们从数组中下一个元素的“value”字段的值中减去它的值。

由于数组中的下一个元素是当前索引处的元素加一,所以我们需要的只是$arrayElemAt 运算符的帮助以及当前元素索引的简单$addition 和1

$subtract 表达式返回一个负值,因此我们需要使用 $multiply 运算符将该值乘以 -1

我们还需要$filter 结果数组,因为它的最后一个元素是Nonenull。原因是当当前元素是最后一个元素时,$subtract 返回None,因为下一个元素的索引等于数组的大小。

db.collection.aggregate([
  
    "$unwind": 
      "path": "$time_series",
      "includeArrayIndex": "index"
    
  ,
  
    "$group": 
      "_id": "$_id",
      "time_series": 
        "$push": 
          "value": "$time_series",
          "index": "$index"
        
      
    
  ,
  
    "$project": 
      "time_series": 
        "$filter": 
          "input": 
            "$map": 
              "input": "$time_series",
              "as": "el",
              "in": 
                "$multiply": [
                  
                    "$subtract": [
                      "$$el.value",
                      
                        "$let": 
                          "vars": 
                            "nextElement": 
                              "$arrayElemAt": [
                                "$time_series",
                                
                                  "$add": [
                                    "$$el.index",
                                    1
                                  ]
                                
                              ]
                            
                          ,
                          "in": "$$nextElement.value"
                        
                      
                    ]
                  ,
                  -1
                ]
              
            
          ,
          "as": "item",
          "cond": 
            "$gte": [
              "$$item",
              0
            ]
          
        
      
    
  
])

我认为效率较低的另一个选项是使用 map_reduce 方法对我们的集合执行 map/reduce 操作。

>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
...               function() 
...                 var derivatives = [];
...                 for (var index=1; index<this.time_series.length; index++) 
...                   derivatives.push(this.time_series[index] - this.time_series[index-1]);
...                 
...                 emit(this._id, derivatives);
...               
...               """)
>>> reducer = Code("""
...                function(key, value) 
...                """)
>>> for res in collection.map_reduce(mapper, reducer, out='inline': 1)['results']:
...     print(res)  # or do something with the document.
... 
'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')

您还可以检索所有文档并使用numpy.diff 来返回这样的导数:

import numpy as np


for document in collection.find(, 'time_series': 1):
    result = np.diff(document['time_series']) 

【讨论】:

【参考方案2】:

它有点脏,但也许是这样的?

use test_db
db['data'].remove()
db['data'].insert(id: 1, time_series: [10,20,40,70,110])

var mapF = function() 
    emit(this.id, this.time_series);
    emit(this.id, this.time_series);
;

var reduceF = function(key, values)
    var n = values[0].length;
    var ret = [];
    for(var i = 0; i < n-1; i++)
        ret.push( values[0][i+1] - values[0][i] );
    
    return 'gradient': ret;
;

var finalizeF = function(key, val)
    return val.gradient;


db['data'].mapReduce(
    mapF,
    reduceF,
     out: 'data_d1', finalize: finalizeF 
)

db['data_d1'].find()

这里的“策略”是将要操作的数据发出两次,以便在reduce阶段可以访问,返回一个对象以避免消息“reduce -> multiple not supported yet” 然后在终结器中过滤回数组。

这个脚本然后产生:

MongoDB shell version: 3.2.9
connecting to: test
switched to db test_db
WriteResult( "nRemoved" : 1 )
WriteResult( "nInserted" : 1 )

    "result" : "data_d1",
        "timeMillis" : 13,
        "counts" : 
            "input" : 1,
            "emit" : 2,     
            "reduce" : 1,           
            "output" : 1                    
        ,                                      
        "ok" : 1                                    
                                                   
 "_id" : 1, "value" : [ 10, 20, 30, 40 ]          
bye

或者,可以将所有处理移至终结器(reduceF 此处未调用,因为假定mapF 发出唯一键):

use test_db
db['data'].remove()
db['data'].insert(id: 1, time_series: [10,20,40,70,110])

var mapF = function() 
    emit(this.id, this.time_series);
;

var reduceF = function(key, values)
;

var finalizeF = function(key, val)
    var x = val;
    var n = x.length;

    var ret = [];
    for(var i = 0; i < n-1; i++)
        ret.push( x[i+1] - x[i] );
    
    return ret;


db['data'].mapReduce(
    mapF,
    reduceF,
     out: 'data_d1', finalize: finalizeF 
)

db['data_d1'].find()

【讨论】:

以上是关于使用 MongoDB 聚合框架计算一阶导数的主要内容,如果未能解决你的问题,请参考以下文章

一阶导数怎么求

在 MongoDB 聚合框架中计算中位数

什么是一阶导数光谱法,他的基本原理和操作是如何进行的?

Spring Data mongodb聚合框架:project()中的ConditionalOperators进行中位数计算

MongoDB聚合操作总结

MongoDB聚合操作总结