PySpark 读取 MongoDB 报错 Cursor not found / no longer available
Posted 终回首
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark 读取 MongoDB 报错 Cursor not found / no longer available相关的知识,希望对你有一定的参考价值。
一、问题
PySpark 读取MongoDB报错:
Caused by: com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message ‘Cursor 8312913963057096512 not found on server 192.168.12.15:27017’
部分日志:
Original error message: Cursor 8312913963057096512 not found on server 192.168.12.15:27017
at com.mongodb.spark.rdd.MongoRDD$MongoCursorIterator.hasNext(MongoRDD.scala:196)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:156)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:148)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 8312913963057096512 not found on server 192.168.12.15:27017' on server 192.168.12.15:27017
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:229)
at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:115)
at com.mongodb.client.internal.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:54)
at com.mongodb.spark.rdd.MongoRDD$MongoCursorIterator.hasNext(MongoRDD.scala:194)
... 15 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mongodb.spark.exceptions.MongoSparkCursorNotFoundException: Cursor not found / no longer available.
By default cursors timeout after 10 minutes and are cleaned up by MongoDB.
To prevent cursors being removed before being used, ensure that all data is read from MongoDB into Spark.
The best time to do this is before entering expensive / slow computations or merges.
See: https://docs.mongodb.com/manual/reference/parameters/#param.cursorTimeoutMillis for information about changing the default timeout.
PySpark读取的MongoDB表的记录数:47万条
代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import pymongo as pm
import threading
import logging as log
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
class MatchName:
def __init__(self):
self.lock = threading.RLock()
def get_enterprise_list(self):
enterprise_list = []
myclient = None
enterprise_doc = None
self.lock.acquire()
try:
username = "root"
password = "root"
ip = "192.168.12.15"
port = "27017"
db_name = "my_db1"
table_name = "my_table1"
myclient = pm.MongoClient("mongodb://{}:{}@{}:{}/".format(username, password, ip, port))
enterprise_db = myclient[db_name]
enterprise_col = enterprise_db[table_name ]
# enterprise_query = {"company_name_en": {"$ne": "-"}, "company_name_cn": {"$ne": "-"},
# "company_name_en": {"$ne": ""}, "company_name_cn": {"$ne": ""}}
enterprise_query = {"company_name_en": {"$ne": "-"}}
enterprise_doc = enterprise_col.find(enterprise_query, no_cursor_timeout=True).batch_size(10000)
myclient.close()
for i in enterprise_doc:
enterprise_list.append(i)
finally:
self.lock.release()
if enterprise_doc is not None:
enterprise_doc.close()
if myclient is not None:
myclient.close()
return enterprise_list
def filter_empty(x):
filter0 = x["buyer_t"] is not None
filter1 = str(x["buyer_t"]) != "-"
filter2 = str(x["buyer_t"]).strip() != ""
return filter0 and filter1 and filter2
def do_match(custom, enterprise_list_broadcast):
buyer = str(custom["buyer_t"]).lower()
buyer_array = str(buyer).lower().split(" ")
max_length = len(buyer_array)
max_count = 0
matched_enterprise = None
enterprise = enterprise_list_broadcast
for each in enterprise:
count = 0
company_name_en = str(each["company_name_en"]).lower().replace("(", " ").replace(")", " ")
company_name_array = company_name_en.split(" ")
# 如果第一个单词不同直接跳过
if buyer_array[0] != company_name_array[0]:
continue
# 找到包含单词数最多的英文名
enterprise = each
for word in buyer_array:
if word in company_name_en:
count = count + 1
if count > max_count:
max_count = count
matched_enterprise = each
if matched_enterprise is None:
return buyer, "", 0
return buyer, matched_enterprise["company_name_en"], max_count
if __name__ == '__main__':
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s - %(lineno)s"
log.basicConfig(level=log.INFO, format=LOG_FORMAT)
log.info("match start")
mn = MatchName()
# load mongo data
username = "root"
password = "root"
ip = "192.168.12.15"
port = "27017"
db_name = "my_db2"
table_name = "my_table2"
input_uri = "mongodb://{}:{}@{}:{}/dmp_ods_customs.ods_customs_inbound_customs_info_none?authSource=admin".format(
username, password, ip, port, db_name, table_name)
output_uri = "mongodb://127.0.0.1:spark.spark_test"
enterprise_list = mn.get_enterprise_list()
session = SparkSession \\
.builder \\
.appName("MyApp") \\
.master("local[*]") \\
.config('spark.driver.memory', '2g') \\
.config('spark.executor.memory', '2g') \\
.config("spark.mongodb.input.uri", input_uri) \\
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \\
.getOrCreate()
df = session.read.format('com.mongodb.spark.sql.DefaultSource').load()
sc = session.sparkContext
enterprise_list_broadcast = sc.broadcast(enterprise_list)
result_rdd = df.rdd.filter(lambda x: filter_empty(x)).map(
lambda each: do_match(each, enterprise_list_broadcast.value))
result_file = "D:\\\\data\\\\tmp\\\\match_result.txt"
schema = StructType([StructField('buyer_name', StringType()), StructField('company_name_en', StringType()),
StructField('count', IntegerType())])
result_dataframe = session.createDataFrame(result_rdd, schema=schema)
result_dataframe.repartition(1).write.csv(result_file, mode="overwrite")
log.info("match end")
代码大概流程:
- 调用spark api读取MongoDB的一些数据作为事实表
- 调用pymongo读取MongoDB的一些数据作为基准表
- 用事实表的数据关联基准表数据,做一些关联
当尝试用PySpark的API读取MongoDB某个库下某个表的全部数据时出现的问题。
如果限制读取的条数,例如改为如下
# 修改前
df = session.read.format('com.mongodb.spark.sql.DefaultSource').load()
# 修改后
df = session.read.format('com.mongodb.spark.sql.DefaultSource').load().limit(10000)
则会在大概10分钟左右正常执行完毕,不会报错。
二、解决
分析
日志里提示默认情况下,游标会在10分钟后被MongoDB清除。
为了避免游标被移除,要确保所有数据都从MongoDB中读取到Spark中,最好在进入昂贵或缓慢的计算或合并前就做完读取数据的工作。
1 解决pymongo读取超时
find函数添加一个参数
# 修改前
query = {"name_en": {"$ne": "-"}}
db.find(query).batch_size(10000)
# 修改后
query = {"name_en": {"$ne": "-"}}
db.find(query, no_cursor_timeout=True).batch_size(10000)
2 解决pyspark用PySpark的API读取超时
思路就是读取完后先cache起来,后面用到的时候不用再从MongoDB里查,直接从缓存里读取
部分代码案例如下
# 修改前
input_uri = "mongodb://{}:{}@{}:{}/dmp_ods_customs.ods_customs_inbound_customs_info_none?authSource=admin".format(
username, password, ip, port, db_name, table_name)
session = SparkSession \\
.builder \\
.appName("MyApp") \\
.master("local[*]") \\
.config('spark.driver.memory', '3g') \\
.config('spark.executor.memory', '3g') \\
.config("spark.mongodb.input.uri", input_uri) \\
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \\
.getOrCreate()
df = session.read.format('com.mongodb.spark.sql.DefaultSource').load()
result_rdd = df.rdd.filter(lambda x: filter_empty(x)).map(
lambda each: do_match(each, enterprise_list_broadcast.value))
# 修改后
input_uri = "mongodb://{}:{}@{}:{}/dmp_ods_customs.ods_customs_inbound_customs_info_none?authSource=admin".format(
username, password, ip, port, db_name, table_name)
session = SparkSession \\
.builder \\
.appName("MyApp") \\
.master("local[*]") \\
.config('spark.driver.memory', '3g') \\
.config('spark.executor.memory', '3g') \\
.config("spark.mongodb.input.uri", input_uri) \\
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \\
.getOrCreate()
source_rdd = df.rdd
source_rdd.cache()
df = session.read.format('com.mongodb.spark.sql.DefaultSource').load()
result_rdd = source_rdd.filter(lambda x: filter_empty(x)).map(
lambda each: do_match(each, enterprise_list_broadcast.value))
修改完毕后没再报错,跑了2个小时后终于跑出了结果。
参考资料
https://www.itdaan.com/blog/2017/07/20/f7cb630f73c59537dff6ce213b9f7bf4.html
以上是关于PySpark 读取 MongoDB 报错 Cursor not found / no longer available的主要内容,如果未能解决你的问题,请参考以下文章
无法读取 pyspark 中的 mongodb 数据(json)
使用seek()方法报错:“io.UnsupportedOperation: can't do nonzero cur-relative seeks”错误的原因