_pickle.PicklingError:无法序列化对象:TypeError:无法腌制_thread.RLock对象
Posted
技术标签:
【中文标题】_pickle.PicklingError:无法序列化对象:TypeError:无法腌制_thread.RLock对象【英文标题】:_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects 【发布时间】:2018-09-18 15:46:53 【问题描述】:我想使用 Kafka 和 Spark 进行情感分析。我想做的是从 Kafka 中读取 Streaming Data,然后使用 Spark 对数据进行批处理。之后,我想使用我使用 Tensorflow 制作的函数 SentimentPredict() 来分析批处理。这就是我到目前为止所做的......
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
# Spark
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
from multiprocessing import Lock
lock = Lock()
numDimensions = 300
maxSeqLength = 70
batchSize = 24
lstmUnits = 128
numClasses = 2
iterations = 100000
import numpy as np
import pickle
from nltk.tokenize import word_tokenize
import DataPreprocessing as proc
import time
with open('dictionary.pickle', 'rb') as handle:
wordsList = pickle.load(handle)
wordVectors = np.load('final_embeddings.npy')
import tensorflow as tf
tf.reset_default_graph()
labels = tf.placeholder(tf.float32, [batchSize, numClasses])
input_data = tf.placeholder(tf.int32, [batchSize, maxSeqLength])
data = tf.Variable(tf.zeros([batchSize, maxSeqLength, numDimensions]),dtype=tf.float32)
data = tf.nn.embedding_lookup(wordVectors,input_data)
lstmCell = tf.contrib.rnn.BasicLSTMCell(lstmUnits)
lstmCell = tf.contrib.rnn.DropoutWrapper(cell=lstmCell, output_keep_prob=0.25)
value, _ = tf.nn.dynamic_rnn(lstmCell, data, dtype=tf.float32)
weight = tf.Variable(tf.truncated_normal([lstmUnits, numClasses]))
bias = tf.Variable(tf.constant(0.1, shape=[numClasses]))
value = tf.transpose(value, [1, 0, 2])
last = tf.gather(value, int(value.get_shape()[0]) - 1)
prediction = (tf.matmul(last, weight) + bias)
correctPred = tf.equal(tf.argmax(prediction,1), tf.argmax(labels,1))
accuracy = tf.reduce_mean(tf.cast(correctPred, tf.float32))
sess = tf.InteractiveSession()
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint('models'))
def getSentenceMatrix(sentence):
arr = np.zeros([batchSize, maxSeqLength])
sentenceMatrix = np.zeros([batchSize,maxSeqLength], dtype='int32')
cleanedSentence = proc.cleanSentences(sentence)
split = cleanedSentence.split()
for indexCounter,word in enumerate(split):
try:
if word in wordsList:
sentenceMatrix[0,indexCounter] = wordsList[word]
else:
sentenceMatrix[0,indexCounter] = 0 #Vector for unkown words
except ValueError:
sentenceMatrix[0,indexCounter] = 399999 #Vector for unkown words
return sentenceMatrix
def sentimentCorrect(data):
try:
sentiment_results =
#sentences = data['sentences']
string = data.split(' ')
exact = [(spell.correction(word)) for word in string]
exact = ' '.join(exact)
inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(exact))))
predictedSentiment = sess.run(prediction, input_data: inputMatrix)[0]
# predictedSentiment[0] represents output score for positive sentiment
# predictedSentiment[1] represents output score for negative sentiment
print("Positive : ",predictedSentiment[0])
print("Negative : ",predictedSentiment[1])
if (predictedSentiment[0] > predictedSentiment[1]):
result = "Positive"
else:
result = "Negative"
sentiment_results["sentences"] = data
sentiment_results["positiveScores"] = str(predictedSentiment[0])
sentiment_results["negativeScores"] = str(predictedSentiment[1])
sentiment_results["sentiment"] = result
return sentiment_results
except:
print("Delay for 5 seconds")
time.sleep(5)
def sentimentPredict(data):
try:
sentiment_results =
#sentences = data['sentences']
#string = sentences.split(' ')
#exact = [get_exact_words(word) for word in string]
#exact = ' '.join(exact)
inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(data))))
predictedSentiment = sess.run(prediction, input_data: inputMatrix)[0]
# predictedSentiment[0] represents output score for positive sentiment
# predictedSentiment[1] represents output score for negative sentiment
print("Positive : ",predictedSentiment[0])
print("Negative : ",predictedSentiment[1])
if (predictedSentiment[0] > predictedSentiment[1]):
result = "Positive"
else:
result = "Negative"
sentiment_results["sentences"] = data
sentiment_results["positiveScores"] = str(predictedSentiment[0])
sentiment_results["negativeScores"] = str(predictedSentiment[1])
sentiment_results["sentiment"] = result
return sentiment_results
except TypeError:
raise
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
#kafkaStream = KafkaUtils.createStream(ssc, 'NLP:2181', 'spark-streaming', 'weblogs':1)
kafkaStream = KafkaUtils.createDirectStream(ssc, topics = ['weblogs'], kafkaParams = "metadata.broker.list": "NLP:9092")
# Here to parse the inbound messages isn't valid JSON
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.saveAsTextFiles("file:///D:/spark-kafka.txt")
id_twitter = parsed.map(lambda tweet: tweet["id"])
id_twitter.saveAsTextFiles("file:///D:/id-tweet.txt")
id_twitter.count().map(lambda x:'ID in this batch: %s' % x).pprint()
name = parsed.map(lambda tweet: tweet["name"])
name.saveAsTextFiles("file:///D:/name-tweet.txt")
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
text = parsed.map(lambda tweet: tweet["text"])
text.saveAsTextFiles("file:///D:/text-tweet.txt")
sentiment = text.mapPartitions(sentimentPredict)
sentiment.saveAsTextFiles("file:///D:/sentiment-tweet.txt")
#sentiment_result = text.map(sentimentPredict(text))
#sentiment_result = text.flatMap(sentimentPredict(text))
#print(sentiment_result)
#parsed.map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.encode("utf-8").pprint()
#print(parsed)
#print(soup.encode("utf-8"))
#sentiment_result.saveAsTextFiles("file:///D:/spark-kafka.txt")
ssc.start()
ssc.awaitTermination()
但是,当我在终端中使用 spark-submit2 运行我的代码时,我收到了这个错误...
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
save(element)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
save(x)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
save(tmp[0])
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
2018-04-09 16:21:48 ERROR JobScheduler:91 - Error generating jobs for time 1523265708000 ms
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
save(element)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
save(x)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
save(tmp[0])
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
return r._jrdd
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
self._jrdd_deserializer, profiler)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
return cloudpickle.dumps(obj, 2)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
cp.dump(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Traceback (most recent call last):
File "D:/PROJECT_MABESPOLRI/progress_spark_sentiment/spark+sentiment.py", line 171, in <module>
ssc.awaitTermination()
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\context.py", line 206, in awaitTermination
self._jssc.awaitTermination()
File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o22.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
save(element)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
save(x)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
save(tmp[0])
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
return r._jrdd
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
self._jrdd_deserializer, profiler)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
return cloudpickle.dumps(obj, 2)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
cp.dump(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
谁能告诉我如何解决这个问题???谢谢
【问题讨论】:
【参考方案1】:尝试将 spark 代码放在一个单独的函数中,只包含必要的参数。当您运行 spark 操作时,它会尝试腌制当前范围内的所有内容(在您的情况下是***),如果遇到一些无法腌制的对象,则会引发错误。在您的情况下,我怀疑该错误可能是由变量“lock”引起的。
【讨论】:
很好的答案!我不认为嫌疑人是“锁定”,但我认为无法腌制的麻烦制造者是 tensorflow - ***.com/questions/61096573/… 就我而言,当我尝试进行广播加入时出现此错误。如果我将广播连接放在一个函数中,该函数将 2 个数据帧作为参数,我仍然会得到PicklingError: Could not serialize broadcast: TypeError: cannot pickle '_thread.RLock' object
任何想法?以上是关于_pickle.PicklingError:无法序列化对象:TypeError:无法腌制_thread.RLock对象的主要内容,如果未能解决你的问题,请参考以下文章
pickle.PicklingError:无法腌制未打开读取的文件
尝试从 BigQuery 读取表并使用 Airflow 将其保存为数据框时出现 _pickle.PicklingError
pickle.PicklingError: Can't pickle <function past_match_sim at 0x7fa26e03b7b8>: attribute look
pickle.PicklingError: Can't pickle: it's not the same object as