使用 PySpark 和 Step Functions 处理 Sagemaker 作业

Posted

技术标签:

【中文标题】使用 PySpark 和 Step Functions 处理 Sagemaker 作业【英文标题】:Sagemaker processing job with PySpark and Step Functions 【发布时间】:2020-11-27 18:00:17 【问题描述】:

这是我的问题: 我必须使用用 PySpark 编写的自定义代码运行 Sagemaker 处理作业。我通过运行以下命令使用了 Sagemaker SDK:

spark_processor = sagemaker.spark.processing.PySparkProcessor(
        base_job_name="spark-preprocessor",
        framework_version="2.4",
        role=role_arn,
        instance_count=2,
        instance_type="ml.m5.xlarge",
        max_runtime_in_seconds=1800,
    )

    spark_processor.run(
        submit_app="processing.py",
        arguments=['s3_input_bucket', bucket_name,
                   's3_input_file_path', file_path
                   ]
    )

现在我必须使用 Step Functions 来自动化工作流程。为此,我编写了一个 lambda 函数来执行此操作,但收到以下错误:


  "errorMessage": "Unable to import module 'lambda_function': No module named 'sagemaker'",
  "errorType": "Runtime.ImportModuleError"

这是我的 lambda 函数:

import sagemaker

def lambda_handler(event, context):
    spark_processor = sagemaker.spark.processing.PySparkProcessor(
        base_job_name="spark-preprocessor",
        framework_version="2.4",
        role=role_arn,
        instance_count=2,
        instance_type="ml.m5.xlarge",
        max_runtime_in_seconds=1800,
    )

    spark_processor.run(
        submit_app="processing.py",
        arguments=['s3_input_bucket', event["bucket_name"],
                   's3_input_file_path', event["file_path"]
                   ]
    )

我的问题是:如何在我的状态机中创建一个步骤以使用 Sagemaker 处理运行 PySpark 代码?

谢谢

【问题讨论】:

【参考方案1】:

sagemaker sdk 默认未安装在 lambda 容器环境中:您应该将其包含在您上传到 s3 的 lambda zip 中。

有多种方法可以做到这一点,最简单的方法之一是使用 Serverless Application Model (SAM) cli 部署您的 lambda。在这种情况下,将 sagemaker 放在包含您的 lambda 代码的文件夹中的 requirements.txt 中可能就足够了,SAM 将确保依赖项包含在 zip 中。

或者,您可以使用pip install sagemaker -t lambda_folder 手动创建 zip,但您应该在 Amazon Linux 操作系统中执行此命令,例如使用具有适当映像的 EC2 或在 Docker 容器中。搜索“aws lambda 中的 python 依赖项”以获取更多信息。

【讨论】:

以上是关于使用 PySpark 和 Step Functions 处理 Sagemaker 作业的主要内容,如果未能解决你的问题,请参考以下文章

来自 ArrayType Pyspark 列的随机样本

如何在过滤器pyspark RDD中过滤掉某种模式[重复]

在pyspark数据框中的两个日期之间生成每月时间戳

如何使用 PySpark、SparkSQL 和 Cassandra?

使用 Spacy 使用 PySpark 和 Jupyter 解析文本时出错

使用 pyspark 和 aws 胶水进行数据转置