PyMongo 中的 MapReduce
Posted
技术标签:
【中文标题】PyMongo 中的 MapReduce【英文标题】:MapReduce in PyMongo 【发布时间】:2015-10-06 01:13:47 【问题描述】:我的 Mongo 收藏:Impressions
具有以下格式的文档:-
_uid: 10,
"impressions": [
"pos": 6,
"id": 123,
"service": "furniture"
,
"pos": 0,
"id": 128,
"service": "electronics"
,
"pos": 2,
"id": 127,
"service": "furniture"
,
"pos": 2,
"id": 125,
"service": "electronics"
,
"pos": 10,
"id": 124,
"service": "electronics"
]
,
_uid: 11,
"impressions": [
"pos": 1,
"id": 124,
"service": "furniture"
,
"pos": 10,
"id": 124,
"service": "electronics"
,
"pos": 1,
"id": 123,
"service": "furniture"
,
"pos": 21,
"id": 122,
"service": "furniture"
,
"pos": 3,
"id": 125,
"service": "electronics"
,
"pos": 10,
"id": 121,
"service": "electronics"
]
,
.
.
.
.
.
集合中的每个文档都有"impressions"
键,这是一个字典数组。在每个字典中,"id"
是实体的 id,"service"
是服务类型,"pos"
是项目在搜索页面结果中的位置。我的目标是找出每个类别中每个"id"
的展示次数。
因此对于"service"
== "furniture"
的上述数据,我希望将其作为我的聚合结果:-
[
"id": 123,"impressions_count":2,
"id": 127,"impressions_count":1,
"id": 124,"impressions_count":1,
"id": 122,"impressions_count":1
]
我尝试通过 python 脚本中的以下函数使用 MAPREDUCE 聚合“id”
def fetch_impressions():
try:
imp_collection = get_mongo_connection('Impressions')
map = Code("""
function()
for( x in this.impressions)
var flat_id = x['id'];
var service_type = x['service']
emit(parseInt(flat_id),1);
;
""")
""")
reduce = Code("""
function(a,b)
return Array.sum(b);
;
""")
results = imp_collection.map_reduce(map, reduce, 'aggregation_result')
return results
except Exception as e:
raise Exception(e)
但我得到的结果为 None,可能是因为 map 函数有问题。我是 javascript 新手,Mongo 请帮忙!
【问题讨论】:
你想做什么?预期的结果是什么? @user3100115 更新了问题,抱歉耽搁了! 【参考方案1】:您可以使用aggregation framework
import pymongo
conn = pymongo.MongoClient()
db = conn.test
col = db.collection
for doc in col.aggregate(['$unwind': '$impressions',
'$match': 'impressions.service': 'furniture',
'$group': '_id': '$impressions.id', 'impressions_count': '$sum': 1,
]):
print(doc)
或者更有效地使用$map
和$setDifference
运算符。
col.aggregate([
"$project": "impressions": "$setDifference": [ "$map": "input": "$impressions", "as": "imp", "in": "$cond": "if": "$eq": [ "$$imp.service", "furniture" ] , "then": "$$imp.id", "else": 0 , [0]],
"$unwind": "$impressions" ,
"$group": "_id": "$impressions", "impressions_count": "$sum": 1
])
产量:
'_id': 122.0, 'impressions_count': 1
'_id': 124.0, 'impressions_count': 1
'_id': 127.0, 'impressions_count': 1
'_id': 123.0, 'impressions_count': 2
【讨论】:
这仍然没有回答我们将如何使用map_reduce()
API 来解决这个问题
如果有更好的方法,为什么要使用map_reduce
? @AyushKSingh
完全同意这一点,但我们也应该在提供更好的解决方案之前使用 map-reduce 回答问题,以便提出问题的用户离开网站不仅可以学习一种新的更好的方式,而且还可以实现他们解决方案中的问题及其相应的修复。【参考方案2】:
我制作了一个工具,让您可以在 Python 中运行 MongoDB Map/Reduce
https://mreduce.com
import random
import threading
import bson
import pymongo
import mreduce
mongo_client = pymongo.MongoClient("mongodb://your_mongodb_server")
def map_func(document):
for impression in document["impressions"]:
yield document["id"], 1
def reduce_func(id, prices):
return sum(prices)
worker_functions =
"exampleMap": map_func,
"exampleReduce": reduce_func
api = mreduce.API(
api_key = "...",
mongo_client = mongo_client
)
project_id = "..."
thread = threading.Thread(
target=api.run,
args=[project_id, worker_functions]
)
thread.start()
job = api.submit_job(
projectId=project["_id"],
mapFunctionName="exampleMap",
reduceFunctionName="exampleReduce",
inputDatabase="db",
inputCollection="impressions",
outputDatabase="db",
outputCollection="impressions_results"
)
result = job.wait_for_result()
for key, value in result:
print("Key: " + key, ", Value: " + str(value))
【讨论】:
欢迎来到 Stack Overflow!仅链接到您自己的库或教程并不是一个好的答案。链接到它,解释它为什么解决问题,提供如何解决问题的代码,并否认你编写了它,这样可以得到更好的答案。见:What signifies “Good” self promotion? 我确实解释了为什么它可以解决问题,它允许您像 OP 要求的那样在 Python 中运行 map/reduce。我也确实说过我写了这个工具 @Dharman ErlVolton 是我意外登录的另一个帐户。我还更新了代码,以解决 OP 遇到的特定问题,每个 ID 的总印象数以上是关于PyMongo 中的 MapReduce的主要内容,如果未能解决你的问题,请参考以下文章
pymongo中的连接操作:Connection()与MongoClient()