Apache Spark读取S3:无法pickle thread.lock对象
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Spark读取S3:无法pickle thread.lock对象相关的知识,希望对你有一定的参考价值。
所以我希望我的Spark应用程序能够从亚马逊的S3中读取一些文本。我写了以下简单的脚本:
import boto3
s3_client = boto3.client('s3')
text_keys = ["key1.txt", "key2.txt"]
data = sc.parallelize(text_keys).flatMap(lambda key: s3_client.get_object(Bucket="my_bucket", Key=key)['Body'].read().decode('utf-8'))
当我做data.collect
时,我收到以下错误:
TypeError: can't pickle thread.lock objects
我似乎没有在网上找到任何帮助。也许有人设法解决了上述问题?
您的s3_client不可序列化。
而不是flatMap使用mapPartitions,并初始化lambda体内的s3_client以避免开销。那会:
- 每个worker上的init s3_client
- 减少初始化开销
以下是如何使用mapPartitions并初始化lambda体内的s3_client以避免开销。
下面以并行化方法提取S3数据的动机来自本文:How NOT to pull from S3 using Apache Spark
注意:get_matching_s3_objects(..)
方法和get_matching_s3_keys(..)
方法的归功于Alex Chan,在这里:Listing S3 Keys
可能有一种更简单/更好的方法来列出密钥并将它们并行化,但这对我有用。此外,我强烈建议您不要像在此简化示例中那样以纯文本格式传输AWS_SECRET或AWS_ACCESS_KEY_ID。这里有关于如何正确保护代码(通过Boto3访问AWS)的良好文档:Boto 3 Docs - Configuration and Credentials
首先,导入和字符串变量:
import boto3
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
AWS_ACCESS_KEY_ID = 'DONT_DO_THIS_ESPECIALLY-IN-PRODUCTION'
AWS_SECRET = 'ALSO_DONT_DO_THIS_ESPECIALLY-IN-PRODUCTION'
bucket_name = 'my-super-s3-bucket-example-name'
appName = 'mySuperAppExample'
那么,我上面提到的第一个链接的方法:
def get_matching_s3_objects(s3, bucket, prefix='', suffix=''):
"""
Generate objects in an S3 bucket.
:param bucket: Name of the S3 bucket.
:param prefix: Only fetch objects whose key starts with
this prefix (optional).
:param suffix: Only fetch objects whose keys end with
this suffix (optional).
"""
kwargs = {'Bucket': bucket}
# If the prefix is a single string (not a tuple of strings), we can
# do the filtering directly in the S3 API.
if isinstance(prefix, str):
kwargs['Prefix'] = prefix
while True:
# The S3 API response is a large blob of metadata.
# 'Contents' contains information about the listed objects.
resp = s3.list_objects_v2(**kwargs)
try:
contents = resp['Contents']
except KeyError:
return
for obj in contents:
key = obj['Key']
if key.startswith(prefix) and key.endswith(suffix):
yield obj
# The S3 API is paginated, returning up to 1000 keys at a time.
# Pass the continuation token into the next response, until we
# reach the final page (when this field is missing).
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
def get_matching_s3_keys(s3, bucket, prefix='', suffix=''):
"""
Generate the keys in an S3 bucket.
:param bucket: Name of the S3 bucket.
:param prefix: Only fetch keys that start with this prefix (optional).
:param suffix: Only fetch keys that end with this suffix (optional).
"""
for obj in get_matching_s3_objects(s3, bucket, prefix, suffix):
yield obj['Key']
然后,我编写一个方法来构造一个与.mapPartitions(..)
兼容的闭包的函数:
# Again, please don't transmit your keys in plain text.
# I did this here just for the sake of completeness of the example
# so that the code actually works.
def getObjsFromMatchingS3Keys(AWS_ACCESS_KEY_ID, AWS_SECRET, bucket_name):
def getObjs(s3Keys):
for key in s3Keys:
session = boto3.session.Session(AWS_ACCESS_KEY_ID, AWS_SECRET)
s3_client = session.client('s3')
body = s3_client.get_object(Bucket=bucket_name, Key=key)['Body'].read().decode('utf-8')
yield body
return getObjs
然后,设置SparkContext并获取S3对象键列表:
conf = SparkConf().setAppName(appName)
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
session = boto3.session.Session(AWS_ACCESS_KEY_ID, AWS_SECRET)
# For the third time, please don't transmit your credentials in plain text like this.
# Hopefully you won't need another warning.
s3_client = session.client('s3')
func = getObjsFromMatchingS3Keys(AWS_ACCESS_KEY_ID, AWS_SECRET, bucket_name)
myFileObjs = []
for fName in get_matching_s3_keys(s3_client, bucket_name):
myFileObjs.append(fName)
旁注:我们需要构建一个SparkSession,以便.toDF()
可用于PipelinedRDD类型,因为猴子补丁,如下所述:PipelinedRDD object has no attribute toDF in PySpark
最后,使用.mapPartitions(..)
和我们构造的函数并行化S3对象键:
pathToSave = r'absolute_path_to_your_desired_file.json'
sc.parallelize(myFileObjs)
.mapPartitions(lambda keys: func(keys))
.map(lambda x: (x, ))
.toDF()
.toPandas()
.to_json(pathToSave)
可能有一种更简洁的方法来写入目标输出文件,但此代码仍然有效。此外,使用map(lambda x: (x, ))
的目的是强制模式推理,如下所述:Create Spark DataFrame - Cannot infer schema for type以这种方式强制模式推理可能不是适用于所有情况的最佳方法,但这对于此示例就足够了。
以上是关于Apache Spark读取S3:无法pickle thread.lock对象的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Apache Spark 访问 s3a:// 文件?
Spark 使用 sc.textFile ("s3n://...) 从 S3 读取文件
在 Python 中使用 Spark 读取 S3 文件时权限被拒绝