PySpark / Glue:PicklingError:无法序列化对象:TypeError:无法腌制thread.lock对象

Posted

技术标签:

【中文标题】PySpark / Glue:PicklingError:无法序列化对象:TypeError:无法腌制thread.lock对象【英文标题】:PySpark/Glue: PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects 【发布时间】:2019-06-02 23:59:50 【问题描述】:

我有以下错误:

File "script_2019-06-02-23-49-11.py", line 478, in <module>
datarobot1Df = datarobot1Df.rdd.mapPartitions(partitionMapper).toDF()
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/sql/session.py", line 58, in toDF
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/sql/session.py", line 582, in createDataFrame
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/sql/session.py", line 380, in _createFromRDD
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/sql/session.py", line 351, in _inferSchema
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 1361, in first
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 1343, in take
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/context.py", line 992, in runJob
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects

这里有什么问题?是因为我试图使用线程来执行我的代码的某些部分来调用 API?

logs = boto3.client('logs', region_name="ap-southeast-1")

NUMBER_THREADS = 6

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

streamId = datetime.now().strftime('%Y%m%d-%H%M%S')

logs.create_log_stream(
  logGroupName='...',
  logStreamName=streamId
)

prevLogSeqToken = None
def log(msg):
  print(msg)
  global prevLogSeqToken
  if prevLogSeqToken != None:
    resp = logs.put_log_events(
      logGroupName="...",
      logStreamName=streamId,
      logEvents=[
        
          'timestamp': int(time.time() * 1000),
          'message': msg
        
      ],
      sequenceToken=prevLogSeqToken
    )
  else:
    resp = logs.put_log_events(
      logGroupName="pinfare-glue",
      logStreamName=streamId,
      logEvents=[
        
          'timestamp': int(time.time() * 1000),
          'message': msg
        
      ],
    )
  prevLogSeqToken = resp["nextSequenceToken"]
log('Reading files ...')
inputGdf = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = "paths": ["s3://..."], format = "parquet")
datarobot1Df = inputGdf.toDF()
datarobot1Df = datarobot1Df.limit(1000).repartition(6)

numRows = datarobot1Df.count()
log('Counted rows %d ...' % numRows)
datarobot1Df.show(10)
# print("Number of rows: %d (%s)" % (numRows, datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

if numRows > 0 :
  # Given price, output groups eg. (-Inf, 200], (400, 600]
  def getPriceInterval(price):
    if (price <= 200):
        return '(-Inf,200]'
    if (price <= 400):
        return '(200,400]'
    if (price <= 600):
        return '(400,600]'
    if (price <= 800):
        return '(600,800]'
    else:
        return '(800,Inf]'

  spark.udf.register("getPriceInterval", getPriceInterval, DecimalType())
  getPriceIntervalUdf = udf(getPriceInterval)

  BATCH_SIZE = 3600

  def callDatarobot(arr, data):
    print("Calling datarobot %s" % time.ctime())
    log("Calling datarobot ...")
    # call mod A
    dataJsonStr = json.dumps(data)
    res = requests.post("http://...", data = dataJsonStr, headers =  'Content-Type': 'application/json' )
    batchResults = res.json()
    batchResults = list(map(lambda o: o['prediction'], batchResults['data']))

    for i, o in enumerate(batchResults):
      data[i]['PredictedFare'] = o

    # call mod B
    dataJsonStr = json.dumps(data)
    res = requests.post("http://...", data = dataJsonStr, headers =  'Content-Type': 'application/json' )
    batchResults = res.json()
    batchResults = list(map(lambda o: o['prediction'], batchResults['data']))

    for i, o in enumerate(batchResults):
      data[i] = 
        'PredictionSurge': o,
        'price_chg_inc_gt10_a7': data[i]['price_chg_inc_gt10_a7']
      

    arr.append(data)

  def worker(q, arr):
    while True:
      jsonStr = q.get()
      if jsonStr is None:
          break
      callDatarobot(arr, jsonStr)
      q.task_done()

  def partitionMapper(partition):
    q = Queue()
    arr = Manager().list()
    threads = []
    print("Partition mapper start %s" % time.ctime())
    log("Partition mapper start %s" % time.ctime())

    # init worker threads
    for i in range(NUMBER_THREADS):
      t = Thread(target=worker, args=(q, arr,))
      t.start()
      threads.append(t)

    data = []
    for row in partition:
      data.append(row.asDict())
      if (len(data) == BATCH_SIZE):
        q.put(json.dumps(data))
        data = []

    if (len(data) > 0):
      q.put(json.dumps(data))
      data = []

    # Wait till all jobs done
    q.join()

    # Stop workers
    for i in range(NUMBER_THREADS):
      q.put(None)
    for t in threads:
      t.join()

    print("Partition mapper end %s" % time.ctime())
    log("Partition mapper end")
    return [val for sublist in arr for val in sublist]

  # print("Number of partitions %d (%s)" % (datarobot1Df.rdd.getNumPartitions(), datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
  datarobot1Df = datarobot1Df.withColumn('PriceCategory', getPriceIntervalUdf(datarobot1Df['Price']))
  datarobot1Df = datarobot1Df.rdd.mapPartitions(partitionMapper).toDF()

  log("Writing intermediate results")
  datarobot1Df.write \
    .mode('overwrite') \
    .parquet('s3://...')

我还可以检查一下,即使每个执行程序上没有线程,API 也会被并行调用吗?当我以前在没有线程的情况下使用这种技术时,似乎只有 1 个执行程序在调用 API。这是为什么?

【问题讨论】:

【参考方案1】:

原来在这种情况下使用 CloudWatch SDK 是个问题

【讨论】:

以上是关于PySpark / Glue:PicklingError:无法序列化对象:TypeError:无法腌制thread.lock对象的主要内容,如果未能解决你的问题,请参考以下文章

查看 PySpark 脚本的 Glue 作业输出的最佳方式

PySpark / Glue:PicklingError:无法序列化对象:TypeError:无法腌制thread.lock对象

字符串长度超过 Glue 中的 DDL 长度(python,pyspark)

如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?

如何在 pyspark 中处理 Glue 数据目录中的空表

从 AWS Glue/PySpark 中的 100 个表中选择数据