Mongodb报错:"not authorized on admin to execute command "

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Mongodb报错:"not authorized on admin to execute command "相关的知识,希望对你有一定的参考价值。

参考技术A Mongodb默认是不需要用户密码就可以连接的,如果使用命令报错 "not authorized on admin to execute command " ,则表示当前登陆用户不具备相应权限;

解决办法:通过创建一个用户,赋予用户root权限
注意:在createUser之前先use admin切换一下。

添加用户权限成功之后,使用root用户登陆,再次使用命令即可成功!!!

附:添加用户时各个角色对应权限

PySpark 读取 MongoDB 报错 Cursor not found / no longer available

一、问题

PySpark 读取MongoDB报错:

Caused by: com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message ‘Cursor 8312913963057096512 not found on server 192.168.12.15:27017’

部分日志

Original error message: Cursor 8312913963057096512 not found on server 192.168.12.15:27017
        
	at com.mongodb.spark.rdd.MongoRDD$MongoCursorIterator.hasNext(MongoRDD.scala:196)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:156)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:148)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 8312913963057096512 not found on server 192.168.12.15:27017' on server 192.168.12.15:27017
	at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
	at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:229)
	at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:115)
	at com.mongodb.client.internal.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:54)
	at com.mongodb.spark.rdd.MongoRDD$MongoCursorIterator.hasNext(MongoRDD.scala:194)
	... 15 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.mongodb.spark.exceptions.MongoSparkCursorNotFoundException: Cursor not found / no longer available.

By default cursors timeout after 10 minutes and are cleaned up by MongoDB.
To prevent cursors being removed before being used, ensure that all data is read from MongoDB into Spark.
The best time to do this is before entering expensive / slow computations or merges.

See: https://docs.mongodb.com/manual/reference/parameters/#param.cursorTimeoutMillis for information about changing the default timeout.

PySpark读取的MongoDB表的记录数:47万条

代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import pymongo as pm
import threading
import logging as log
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class MatchName:

    def __init__(self):
        self.lock = threading.RLock()

    def get_enterprise_list(self):
        enterprise_list = []
        myclient = None
        enterprise_doc = None
        self.lock.acquire()
        try:
        	username = "root"
    		password = "root"
    		ip = "192.168.12.15"
    		port = "27017"
    		db_name = "my_db1"
    		table_name = "my_table1"
            myclient = pm.MongoClient("mongodb://{}:{}@{}:{}/".format(username,  password, ip, port))
            enterprise_db = myclient[db_name]
            enterprise_col = enterprise_db[table_name ]
            # enterprise_query = {"company_name_en": {"$ne": "-"}, "company_name_cn": {"$ne": "-"},
            #                     "company_name_en": {"$ne": ""}, "company_name_cn": {"$ne": ""}}
            enterprise_query = {"company_name_en": {"$ne": "-"}}
            enterprise_doc = enterprise_col.find(enterprise_query, no_cursor_timeout=True).batch_size(10000)
            myclient.close()
            for i in enterprise_doc:
                enterprise_list.append(i)
        finally:
            self.lock.release()
            if enterprise_doc is not None:
                enterprise_doc.close()
            if myclient is not None:
                myclient.close()

        return enterprise_list


def filter_empty(x):
    filter0 = x["buyer_t"] is not None
    filter1 = str(x["buyer_t"]) != "-"
    filter2 = str(x["buyer_t"]).strip() != ""
    return filter0 and filter1 and filter2


def do_match(custom, enterprise_list_broadcast):
    buyer = str(custom["buyer_t"]).lower()
    buyer_array = str(buyer).lower().split(" ")
    max_length = len(buyer_array)
    max_count = 0
    matched_enterprise = None
    enterprise = enterprise_list_broadcast
    for each in enterprise:
        count = 0
        company_name_en = str(each["company_name_en"]).lower().replace("(", " ").replace(")", " ")
        company_name_array = company_name_en.split(" ")
        # 如果第一个单词不同直接跳过
        if buyer_array[0] != company_name_array[0]:
            continue
        # 找到包含单词数最多的英文名
        enterprise = each
        for word in buyer_array:
            if word in company_name_en:
                count = count + 1
            if count > max_count:
                max_count = count
                matched_enterprise = each
    if matched_enterprise is None:
        return buyer, "", 0
    return buyer, matched_enterprise["company_name_en"], max_count


if __name__ == '__main__':
    LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s - %(lineno)s"
    log.basicConfig(level=log.INFO, format=LOG_FORMAT)
    log.info("match start")

    mn = MatchName()
    # load mongo data
    username = "root"
    password = "root"
    ip = "192.168.12.15"
    port = "27017"
    db_name = "my_db2"
    table_name = "my_table2"
    input_uri = "mongodb://{}:{}@{}:{}/dmp_ods_customs.ods_customs_inbound_customs_info_none?authSource=admin".format(
        username, password, ip, port, db_name, table_name)
    output_uri = "mongodb://127.0.0.1:spark.spark_test"
    enterprise_list = mn.get_enterprise_list()
    session = SparkSession \\
        .builder \\
        .appName("MyApp") \\
        .master("local[*]") \\
        .config('spark.driver.memory', '2g') \\
        .config('spark.executor.memory', '2g') \\
        .config("spark.mongodb.input.uri", input_uri) \\
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \\
        .getOrCreate()

    df = session.read.format('com.mongodb.spark.sql.DefaultSource').load()
    sc = session.sparkContext
    enterprise_list_broadcast = sc.broadcast(enterprise_list)

    result_rdd = df.rdd.filter(lambda x: filter_empty(x)).map(
        lambda each: do_match(each, enterprise_list_broadcast.value))
    result_file = "D:\\\\data\\\\tmp\\\\match_result.txt"
    schema = StructType([StructField('buyer_name', StringType()), StructField('company_name_en', StringType()),
                         StructField('count', IntegerType())])
    result_dataframe = session.createDataFrame(result_rdd, schema=schema)
    result_dataframe.repartition(1).write.csv(result_file, mode="overwrite")
    log.info("match end")

代码大概流程:

  1. 调用spark api读取MongoDB的一些数据作为事实表
  2. 调用pymongo读取MongoDB的一些数据作为基准表
  3. 用事实表的数据关联基准表数据,做一些关联

当尝试用PySpark的API读取MongoDB某个库下某个表的全部数据时出现的问题。

如果限制读取的条数,例如改为如下

# 修改前
df = session.read.format('com.mongodb.spark.sql.DefaultSource').load()

# 修改后
df = session.read.format('com.mongodb.spark.sql.DefaultSource').load().limit(10000)

则会在大概10分钟左右正常执行完毕,不会报错。

二、解决

分析

日志里提示默认情况下,游标会在10分钟后被MongoDB清除。
为了避免游标被移除,要确保所有数据都从MongoDB中读取到Spark中,最好在进入昂贵或缓慢的计算或合并前就做完读取数据的工作。

1 解决pymongo读取超时

find函数添加一个参数

# 修改前
query = {"name_en": {"$ne": "-"}}
db.find(query).batch_size(10000)

# 修改后
query = {"name_en": {"$ne": "-"}}
db.find(query, no_cursor_timeout=True).batch_size(10000)

2 解决pyspark用PySpark的API读取超时

思路就是读取完后先cache起来,后面用到的时候不用再从MongoDB里查,直接从缓存里读取

部分代码案例如下

# 修改前
input_uri = "mongodb://{}:{}@{}:{}/dmp_ods_customs.ods_customs_inbound_customs_info_none?authSource=admin".format(
        username, password, ip, port, db_name, table_name)
session = SparkSession \\
        .builder \\
        .appName("MyApp") \\
        .master("local[*]") \\
        .config('spark.driver.memory', '3g') \\
        .config('spark.executor.memory', '3g') \\
        .config("spark.mongodb.input.uri", input_uri) \\
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \\
        .getOrCreate()

df = session.read.format('com.mongodb.spark.sql.DefaultSource').load()
result_rdd = df.rdd.filter(lambda x: filter_empty(x)).map(
        lambda each: do_match(each, enterprise_list_broadcast.value))

# 修改后
input_uri = "mongodb://{}:{}@{}:{}/dmp_ods_customs.ods_customs_inbound_customs_info_none?authSource=admin".format(
        username, password, ip, port, db_name, table_name)
session = SparkSession \\
        .builder \\
        .appName("MyApp") \\
        .master("local[*]") \\
        .config('spark.driver.memory', '3g') \\
        .config('spark.executor.memory', '3g') \\
        .config("spark.mongodb.input.uri", input_uri) \\
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \\
        .getOrCreate()
source_rdd = df.rdd
source_rdd.cache()
df = session.read.format('com.mongodb.spark.sql.DefaultSource').load()
result_rdd = source_rdd.filter(lambda x: filter_empty(x)).map(
        lambda each: do_match(each, enterprise_list_broadcast.value))

修改完毕后没再报错,跑了2个小时后终于跑出了结果。

参考资料

https://www.itdaan.com/blog/2017/07/20/f7cb630f73c59537dff6ce213b9f7bf4.html

以上是关于Mongodb报错:"not authorized on admin to execute command "的主要内容,如果未能解决你的问题,请参考以下文章

删除mongodb iteratior报错:java.lang.UnsupportedOperationException: Cursors do not support removal

删除mongodb iteratior报错:java.lang.UnsupportedOperationException: Cursors do not support removal

mongodb 添加用户报错TypeError:db.addUser is not a function

AMQ5540, AMQ5541 and AMQ5542, application did not supply a user ID and password, 2035 MQRC_NOT_AUTHO

MongoDB添加仲裁节点报错replica set IDs do not match办法

Mac10.12.1 安装mongodbPHP扩展报错 <openssl/evp.h>not found