PyMongo 中的 MapReduce

Posted

技术标签:

【中文标题】PyMongo 中的 MapReduce【英文标题】:MapReduce in PyMongo 【发布时间】:2015-10-06 01:13:47 【问题描述】:

我的 Mongo 收藏:Impressions 具有以下格式的文档:-

   
        _uid: 10,
        "impressions": [
            
                "pos": 6,
                "id": 123,
                "service": "furniture"
            ,
            
                "pos": 0,
                "id": 128,
                "service": "electronics"
            ,
            
                "pos": 2,
                "id": 127,
                "service": "furniture"
            ,
            
                "pos": 2,
                "id": 125,
                "service": "electronics"
            ,
            
                "pos": 10,
                "id": 124,
                "service": "electronics"
            
        ]
      ,
     
        _uid: 11,
        "impressions": [
            
                "pos": 1,
                "id": 124,
                "service": "furniture"
            ,
            
                "pos": 10,
                "id": 124,
                "service": "electronics"
            ,
            
                "pos": 1,
                "id": 123,
                "service": "furniture"
            ,
            
                "pos": 21,
                "id": 122,
                "service": "furniture"
            ,
            
                "pos": 3,
                "id": 125,
                "service": "electronics"
            ,
            
                "pos": 10,
                "id": 121,
                "service": "electronics"
            
            ]
         ,
            .
            .
            .
            .
            .

集合中的每个文档都有"impressions" 键,这是一个字典数组。在每个字典中,"id" 是实体的 id,"service" 是服务类型,"pos" 是项目在搜索页面结果中的位置。我的目标是找出每个类别中每个"id" 的展示次数。 因此对于"service" == "furniture" 的上述数据,我希望将其作为我的聚合结果:-

[
"id": 123,"impressions_count":2,
"id": 127,"impressions_count":1,
"id": 124,"impressions_count":1,
"id": 122,"impressions_count":1
]

我尝试通过 python 脚本中的以下函数使用 MAPREDUCE 聚合“id”

def fetch_impressions():
    try:
        imp_collection = get_mongo_connection('Impressions')
        map = Code("""
                function()
                    for( x in this.impressions)
                        var flat_id = x['id'];
                        var service_type = x['service']
                        emit(parseInt(flat_id),1);
                        
                    ;
                """)

                        """)
        reduce = Code("""
                        function(a,b)
                            return Array.sum(b);
                            ;
                        """)

        results = imp_collection.map_reduce(map, reduce, 'aggregation_result')
        return results
    except Exception as e:
        raise Exception(e)

但我得到的结果为 None,可能是因为 map 函数有问题。我是 javascript 新手,Mongo 请帮忙!

【问题讨论】:

你想做什么?预期的结果是什么? @user3100115 更新了问题,抱歉耽搁了! 【参考方案1】:

您可以使用aggregation framework

import pymongo
conn = pymongo.MongoClient()
db = conn.test
col =  db.collection

for doc in col.aggregate(['$unwind': '$impressions', 
    '$match': 'impressions.service': 'furniture', 
    '$group': '_id': '$impressions.id', 'impressions_count': '$sum': 1, 
    ]):
    print(doc)

或者更有效地使用$map$setDifference 运算符。

col.aggregate([
     "$project":  "impressions": "$setDifference": [ "$map":  "input": "$impressions", "as": "imp", "in":  "$cond":  "if":  "$eq": [ "$$imp.service", "furniture" ] , "then": "$$imp.id", "else": 0 , [0]], 
     "$unwind": "$impressions" , 
     "$group":  "_id": "$impressions", "impressions_count":  "$sum": 1 
])

产量:

'_id': 122.0, 'impressions_count': 1
'_id': 124.0, 'impressions_count': 1
'_id': 127.0, 'impressions_count': 1
'_id': 123.0, 'impressions_count': 2

【讨论】:

这仍然没有回答我们将如何使用map_reduce() API 来解决这个问题 如果有更好的方法,为什么要使用map_reduce? @AyushKSingh 完全同意这一点,但我们也应该在提供更好的解决方案之前使用 map-reduce 回答问题,以便提出问题的用户离开网站不仅可以学习一种新的更好的方式,而且还可以实现他们解决方案中的问题及其相应的修复。【参考方案2】:

我制作了一个工具,让您可以在 Python 中运行 MongoDB Map/Reduce

https://mreduce.com

import random
import threading

import bson
import pymongo

import mreduce


mongo_client = pymongo.MongoClient("mongodb://your_mongodb_server")

def map_func(document):
    for impression in document["impressions"]:
        yield document["id"], 1

def reduce_func(id, prices):
    return sum(prices)

worker_functions = 
    "exampleMap": map_func,
    "exampleReduce": reduce_func


api = mreduce.API(
    api_key = "...",
    mongo_client = mongo_client
)

project_id = "..."

thread = threading.Thread(
    target=api.run,
    args=[project_id, worker_functions]
)
thread.start()

job = api.submit_job(
    projectId=project["_id"],
    mapFunctionName="exampleMap",
    reduceFunctionName="exampleReduce",
    inputDatabase="db",
    inputCollection="impressions",
    outputDatabase="db",
    outputCollection="impressions_results"
)
result = job.wait_for_result()
for key, value in result:
    print("Key: " + key, ", Value: " + str(value))

【讨论】:

欢迎来到 Stack Overflow!仅链接到您自己的库或教程并不是一个好的答案。链接到它,解释它为什么解决问题,提供如何解决问题的代码,并否认你编写了它,这样可以得到更好的答案。见:What signifies “Good” self promotion? 我确实解释了为什么它可以解决问题,它允许您像 OP 要求的那样在 Python 中运行 map/reduce。我也确实说过我写了这个工具 @Dharman ErlVolton 是我意外登录的另一个帐户。我还更新了代码,以解决 OP 遇到的特定问题,每个 ID 的总印象数

以上是关于PyMongo 中的 MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

Pymongo 中的聚合函数

PyMongo 中的 MapReduce

pymongo中的连接操作:Connection()与MongoClient()

PyMongo - 将属性中的所有值设置为小写[重复]

Pymongo $regexMatch 正则表达式选项中的无效标志:u

如何优化 pymongo 中的更新查询以进行抓取项目