使用 MongoDB 聚合框架计算一阶导数
Posted
技术标签:
【中文标题】使用 MongoDB 聚合框架计算一阶导数【英文标题】:Compute first order derivative with MongoDB aggregation framework 【发布时间】:2016-12-21 19:26:58 【问题描述】:是否可以使用聚合框架计算一阶导数?
例如,我有数据:
time_series : [10,20,40,70,110]
我正在尝试获得如下输出:
derivative : [10,20,30,40]
【问题讨论】:
您是否有理由希望在聚合框架中执行此操作而不是使用强大的 python 库实现? @JohnnyHK - 你能给我一个python库实现的例子吗?我目前的解决方法是使用 pymongo 获取所有字段,并在 python 中进行衍生。结果证明它很慢(受网络带宽限制?),这让我四处寻找替代方案。 @JohnnyHK 我认为聚合框架是这里的最佳选择。甚至比numpy.diff
还要快。我在答案中添加了基准测试结果
@Styvane 不要误会我的意思,我是第一个在这里对这两个答案投赞成票的人,因为它们很棒,但“最佳”选项不仅仅是性能。经过良好测试的库调用比复杂的聚合管道更简单/更容易理解/更清晰。
@JohnnyHK 我完全同意。并非编程中的所有内容都与性能有关。很遗憾 MongoDB 没有为此提供运算符。顺便说一句,我在写作时多次忘记了花括号。
【参考方案1】:
db.collection.aggregate(
[
"$addFields":
"indexes":
"$range": [
0,
"$size": "$time_series"
]
,
"reversedSeries":
"$reverseArray": "$time_series"
,
"$project":
"derivatives":
"$reverseArray":
"$slice": [
"$map":
"input":
"$zip":
"inputs": [
"$reversedSeries",
"$indexes"
]
,
"in":
"$subtract": [
"$arrayElemAt": [
"$$this",
0
]
,
"$arrayElemAt": [
"$reversedSeries",
"$add": [
"$arrayElemAt": [
"$$this",
1
]
,
1
]
]
]
,
"$subtract": [
"$size": "$time_series"
,
1
]
]
,
"time_series": 1
]
)
我们可以在 3.4+ 版本中使用上述管道来执行此操作。
在管道中,我们使用$addFields
管道阶段。运算符添加“time_series”的元素索引的数组来做文档,我们也反转时间序列数组并分别使用$range
和$reverseArray
运算符将其添加到文档中
我们在这里反转了数组,因为数组中位置p
的元素总是大于位置p+1
的元素,这意味着[p] - [p+1] < 0
,我们不想在这里使用$multiply
。(请参阅版本 3.2 的管道)
接下来,我们使用索引数组$zipped
时间序列数据,并使用$map
运算符将substract
表达式应用于结果数组。
然后我们$slice
将结果从数组中丢弃null/None
并重新反转结果。
在 3.2 中,我们可以使用$unwind
运算符来展开我们的数组,并通过将文档指定为操作数而不是传统的以 $.
接下来,我们需要 $group
我们的文档并使用 $push
累加器运算符返回一组子文档,如下所示:
"_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
"time_series" : [
"value" : 10, "index" : NumberLong(0) ,
"value" : 20, "index" : NumberLong(1) ,
"value" : 40, "index" : NumberLong(2) ,
"value" : 70, "index" : NumberLong(3) ,
"value" : 110, "index" : NumberLong(4)
]
终于来到了$project
阶段。在这个阶段,我们需要使用$map
运算符对$group
阶段新计算的数组中的每个元素应用一系列表达式。
这是$map
内部发生的事情(请参阅$map
作为 for 循环)in 表达式:
对于每个子文档,我们使用$let
变量运算符将value 字段分配给一个变量。然后我们从数组中下一个元素的“value”字段的值中减去它的值。
由于数组中的下一个元素是当前索引处的元素加一,所以我们需要的只是$arrayElemAt
运算符的帮助以及当前元素索引的简单$add
ition 和1
。
$subtract
表达式返回一个负值,因此我们需要使用 $multiply
运算符将该值乘以 -1
。
我们还需要$filter
结果数组,因为它的最后一个元素是None
或null
。原因是当当前元素是最后一个元素时,$subtract
返回None
,因为下一个元素的索引等于数组的大小。
db.collection.aggregate([
"$unwind":
"path": "$time_series",
"includeArrayIndex": "index"
,
"$group":
"_id": "$_id",
"time_series":
"$push":
"value": "$time_series",
"index": "$index"
,
"$project":
"time_series":
"$filter":
"input":
"$map":
"input": "$time_series",
"as": "el",
"in":
"$multiply": [
"$subtract": [
"$$el.value",
"$let":
"vars":
"nextElement":
"$arrayElemAt": [
"$time_series",
"$add": [
"$$el.index",
1
]
]
,
"in": "$$nextElement.value"
]
,
-1
]
,
"as": "item",
"cond":
"$gte": [
"$$item",
0
]
])
我认为效率较低的另一个选项是使用 map_reduce
方法对我们的集合执行 map/reduce 操作。
>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
... function()
... var derivatives = [];
... for (var index=1; index<this.time_series.length; index++)
... derivatives.push(this.time_series[index] - this.time_series[index-1]);
...
... emit(this._id, derivatives);
...
... """)
>>> reducer = Code("""
... function(key, value)
... """)
>>> for res in collection.map_reduce(mapper, reducer, out='inline': 1)['results']:
... print(res) # or do something with the document.
...
'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')
您还可以检索所有文档并使用numpy.diff
来返回这样的导数:
import numpy as np
for document in collection.find(, 'time_series': 1):
result = np.diff(document['time_series'])
【讨论】:
【参考方案2】:它有点脏,但也许是这样的?
use test_db
db['data'].remove()
db['data'].insert(id: 1, time_series: [10,20,40,70,110])
var mapF = function()
emit(this.id, this.time_series);
emit(this.id, this.time_series);
;
var reduceF = function(key, values)
var n = values[0].length;
var ret = [];
for(var i = 0; i < n-1; i++)
ret.push( values[0][i+1] - values[0][i] );
return 'gradient': ret;
;
var finalizeF = function(key, val)
return val.gradient;
db['data'].mapReduce(
mapF,
reduceF,
out: 'data_d1', finalize: finalizeF
)
db['data_d1'].find()
这里的“策略”是将要操作的数据发出两次,以便在reduce阶段可以访问,返回一个对象以避免消息“reduce -> multiple not supported yet” 然后在终结器中过滤回数组。
这个脚本然后产生:
MongoDB shell version: 3.2.9
connecting to: test
switched to db test_db
WriteResult( "nRemoved" : 1 )
WriteResult( "nInserted" : 1 )
"result" : "data_d1",
"timeMillis" : 13,
"counts" :
"input" : 1,
"emit" : 2,
"reduce" : 1,
"output" : 1
,
"ok" : 1
"_id" : 1, "value" : [ 10, 20, 30, 40 ]
bye
或者,可以将所有处理移至终结器(reduceF
此处未调用,因为假定mapF
发出唯一键):
use test_db
db['data'].remove()
db['data'].insert(id: 1, time_series: [10,20,40,70,110])
var mapF = function()
emit(this.id, this.time_series);
;
var reduceF = function(key, values)
;
var finalizeF = function(key, val)
var x = val;
var n = x.length;
var ret = [];
for(var i = 0; i < n-1; i++)
ret.push( x[i+1] - x[i] );
return ret;
db['data'].mapReduce(
mapF,
reduceF,
out: 'data_d1', finalize: finalizeF
)
db['data_d1'].find()
【讨论】:
以上是关于使用 MongoDB 聚合框架计算一阶导数的主要内容,如果未能解决你的问题,请参考以下文章
Spring Data mongodb聚合框架:project()中的ConditionalOperators进行中位数计算