Apache Spark读取S3:无法pickle thread.lock对象

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Spark读取S3:无法pickle thread.lock对象相关的知识,希望对你有一定的参考价值。

所以我希望我的Spark应用程序能够从亚马逊的S3中读取一些文本。我写了以下简单的脚本:

import boto3
s3_client = boto3.client('s3')
text_keys = ["key1.txt", "key2.txt"]
data = sc.parallelize(text_keys).flatMap(lambda key: s3_client.get_object(Bucket="my_bucket", Key=key)['Body'].read().decode('utf-8'))

当我做data.collect时,我收到以下错误:

TypeError: can't pickle thread.lock objects

我似乎没有在网上找到任何帮助。也许有人设法解决了上述问题?

答案

您的s3_client不可序列化。

而不是flatMap使用mapPartitions,并初始化lambda体内的s3_client以避免开销。那会:

  1. 每个worker上的init s3_client
  2. 减少初始化开销
另一答案

以下是如何使用mapPartitions并初始化lambda体内的s3_client以避免开销。

下面以并行化方法提取S3数据的动机来自本文:How NOT to pull from S3 using Apache Spark

注意:get_matching_s3_objects(..)方法和get_matching_s3_keys(..)方法的归功于Alex Chan,在这里:Listing S3 Keys

可能有一种更简单/更好的方法来列出密钥并将它们并行化,但这对我有用。此外,我强烈建议您不要像在此简化示例中那样以纯文本格式传输AWS_SECRET或AWS_ACCESS_KEY_ID。这里有关于如何正确保护代码(通过Boto3访问AWS)的良好文档:Boto 3 Docs - Configuration and Credentials

首先,导入和字符串变量:

import boto3
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

AWS_ACCESS_KEY_ID = 'DONT_DO_THIS_ESPECIALLY-IN-PRODUCTION'
AWS_SECRET = 'ALSO_DONT_DO_THIS_ESPECIALLY-IN-PRODUCTION'
bucket_name = 'my-super-s3-bucket-example-name'
appName = 'mySuperAppExample'

那么,我上面提到的第一个链接的方法:

def get_matching_s3_objects(s3, bucket, prefix='', suffix=''):
    """
    Generate objects in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch objects whose key starts with
        this prefix (optional).
    :param suffix: Only fetch objects whose keys end with
        this suffix (optional).
    """
    kwargs = {'Bucket': bucket}

    # If the prefix is a single string (not a tuple of strings), we can
    # do the filtering directly in the S3 API.
    if isinstance(prefix, str):
        kwargs['Prefix'] = prefix

    while True:

        # The S3 API response is a large blob of metadata.
        # 'Contents' contains information about the listed objects.
        resp = s3.list_objects_v2(**kwargs)

        try:
            contents = resp['Contents']
        except KeyError:
            return

        for obj in contents:
            key = obj['Key']
            if key.startswith(prefix) and key.endswith(suffix):
                yield obj

        # The S3 API is paginated, returning up to 1000 keys at a time.
        # Pass the continuation token into the next response, until we
        # reach the final page (when this field is missing).
        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break


def get_matching_s3_keys(s3, bucket, prefix='', suffix=''):
    """
    Generate the keys in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch keys that start with this prefix (optional).
    :param suffix: Only fetch keys that end with this suffix (optional).
    """
    for obj in get_matching_s3_objects(s3, bucket, prefix, suffix):
        yield obj['Key']

然后,我编写一个方法来构造一个与.mapPartitions(..)兼容的闭包的函数:

# Again, please don't transmit your keys in plain text. 
#   I did this here just for the sake of completeness of the example 
#   so that the code actually works.

def getObjsFromMatchingS3Keys(AWS_ACCESS_KEY_ID, AWS_SECRET, bucket_name):
    def getObjs(s3Keys):
        for key in s3Keys:
            session = boto3.session.Session(AWS_ACCESS_KEY_ID, AWS_SECRET)
            s3_client = session.client('s3')
            body = s3_client.get_object(Bucket=bucket_name, Key=key)['Body'].read().decode('utf-8')
            yield body
    return getObjs

然后,设置SparkContext并获取S3对象键列表:

conf = SparkConf().setAppName(appName)
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
session = boto3.session.Session(AWS_ACCESS_KEY_ID, AWS_SECRET) 
# For the third time, please don't transmit your credentials in plain text like this. 
#  Hopefully you won't need another warning. 
s3_client = session.client('s3')
func = getObjsFromMatchingS3Keys(AWS_ACCESS_KEY_ID, AWS_SECRET, bucket_name)

myFileObjs = []
for fName in get_matching_s3_keys(s3_client, bucket_name):
    myFileObjs.append(fName)

旁注:我们需要构建一个SparkSession,以便.toDF()可用于PipelinedRDD类型,因为猴子补丁,如下所述:PipelinedRDD object has no attribute toDF in PySpark

最后,使用.mapPartitions(..)和我们构造的函数并行化S3对象键:

pathToSave = r'absolute_path_to_your_desired_file.json'
sc.parallelize(myFileObjs) 
    .mapPartitions(lambda keys: func(keys)) 
    .map(lambda x: (x, )) 
    .toDF() 
    .toPandas() 
    .to_json(pathToSave)

可能有一种更简洁的方法来写入目标输出文件,但此代码仍然有效。此外,使用map(lambda x: (x, ))的目的是强制模式推理,如下所述:Create Spark DataFrame - Cannot infer schema for type以这种方式强制模式推理可能不是适用于所有情况的最佳方法,但这对于此示例就足够了。

以上是关于Apache Spark读取S3:无法pickle thread.lock对象的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Apache Spark 访问 s3a:// 文件?

Spark 使用 sc.textFile ("s3n://...) 从 S3 读取文件

在 Python 中使用 Spark 读取 S3 文件时权限被拒绝

如何在 java 中使用 spark 从 AWS S3 读取 .xls 文件?并且无法读取 sheetName

无法在本地连接 S3 和 Spark

Apache Spark Hadoop S3A SignatureDoesNotMatch