Step Function 中的 AWS Batch 作业执行结果
Posted
技术标签:
【中文标题】Step Function 中的 AWS Batch 作业执行结果【英文标题】:AWS Batch Job Execution Results in Step Function 【发布时间】:2021-04-26 08:54:28 【问题描述】:我是 AWS Step Functions 和 AWS Batch 的新手。我正在尝试将 AWS Batch Job 与 Step Function 集成。 AWS Batch Job 执行简单的 Python 脚本,输出字符串值(高级简化要求)。我需要让 python 脚本输出可用于 step 函数的下一个状态。我应该如何做到这一点。 AWS Batch Job 输出不包含 python 脚本的结果。相反,它包含所有与容器相关的信息以及输入值。
示例:AWS Batch Job 执行输出“Hello World”的 python 脚本。我需要“Hello World”可用于 step 函数的下一个状态,以执行与其关联的 lambda。
【问题讨论】:
【参考方案1】:您可以将步骤函数执行 ID ($$.Execution.ID) 传递给批处理,然后您的批处理可以使用执行 ID 和主键(或其他文件)将其响应写入 DynamoDB。然后,您需要执行后续步骤以从 DynamoDB 读取 directly 并捕获进程响应。
我一直在寻找一种无需后续步骤即可做到这一点的方法,但到目前为止还没有骰子。
【讨论】:
【参考方案2】:我能够做到,下面是我的状态机,我采用示例项目来运行批处理作业 Manage a Batch Job (AWS Batch, Amazon SNS) 并将其修改为两个 lambdas 以传递输入/输出。
"Comment": "An example of the Amazon States Language for notification on an AWS Batch job completion",
"StartAt": "Submit Batch Job",
"TimeoutSeconds": 3600,
"States":
"Submit Batch Job":
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters":
"JobName": "BatchJobNotification",
"JobQueue": "arn:aws:batch:us-east-1:1234567890:job-queue/BatchJobQueue-737ed10e7ca3bfd",
"JobDefinition": "arn:aws:batch:us-east-1:1234567890:job-definition/BatchJobDefinition-89c42b1f452ac67:1"
,
"Next": "Notify Success",
"Catch": [
"ErrorEquals": [
"States.ALL"
],
"Next": "Notify Failure"
]
,
"Notify Success":
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:1234567890:function:readcloudwatchlogs",
"Parameters":
"LogStreamName.$": "$.Container.LogStreamName"
,
"ResultPath": "$.lambdaOutput",
"Next": "ConsumeLogs"
,
"ConsumeLogs":
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:1234567890:function:consumelogs",
"Parameters":
"randomstring.$": "$.lambdaOutput.logs"
,
"End": true
,
"Notify Failure":
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters":
"Message": "Batch job submitted through Step Functions failed",
"TopicArn": "arn:aws:sns:us-east-1:1234567890:StepFunctionsSample-BatchJobManagement17968f39-e227-47ab-9a75-08a7dcc10c4c-SNSTopic-1GR29R8TUHQY8"
,
"End": true
读取日志的关键是在包含LogStreamName
的Submit Batch Job
输出中,我将其传递给名为function:readcloudwatchlogs
的lambda 并读取日志,然后最终将读取日志传递给名为function:consumelogs
的下一个函数.您可以在随附的屏幕截图中看到consumelogs
打印日志的功能。
"Attempts": [
"Container":
"ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243",
"ExitCode": 0,
"LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b",
"NetworkInterfaces": [],
"TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b"
,
"StartedAt": 1611329367577,
"StatusReason": "Essential container in task exited",
"StoppedAt": 1611329367748
],
"Container":
"Command": [
"echo",
"Hello world"
],
"ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243",
"Environment": [
"Name": "MANAGED_BY_AWS",
"Value": "STARTED_BY_STEP_FUNCTIONS"
],
"ExitCode": 0,
"Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
"LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b",
"TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b",
..
,
..
"Tags":
"resourceArn": "arn:aws:batch:us-east-1:1234567890:job/d36ba07a-54f9-4acf-a4b8-3e5413ea5ffc"
读取日志 Lambda 代码:
import boto3
client = boto3.client('logs')
def lambda_handler(event, context):
print(event)
response = client.get_log_events(
logGroupName='/aws/batch/job',
logStreamName=event.get('LogStreamName')
)
log = 'logs': response['events'][0]['message']
return log
使用日志 Lambda 代码
import json
print('Loading function')
def lambda_handler(event, context):
print(event)
【讨论】:
所以基本上你从日志中读取值。没有别的办法了 是的,我查看了Step Function CallBack 模式,但涉及在定义中添加wait
,我不想这样做。
这是一种创新的方式。我正在处理的用例可能会生成大量日志,读取日志以查找作业的输出可能会适得其反,但这很有帮助。另一种方法是将输出保存到表中并在其他状态下引用它。
我只是建立在被问到的问题之上,所以我能够做问题中的事情。考虑到您没有超过AWS Step Functions payload size 256KB,您仍然可以传递日志。当然你可以这样做,但这又涉及在你的状态机中添加一个等待,我已经提到我避免了。
当我们谈到将输出保存到表格时,您可以简单地Using Lambda with CloudWatch Logs 然后将其保存到表格中。但是您必须在状态机中等待,并且您永远无法确定数据何时写入表中,这可能会导致竞争条件。以上是关于Step Function 中的 AWS Batch 作业执行结果的主要内容,如果未能解决你的问题,请参考以下文章
如何将无服务器 Step Function/状态机/Lambda 构建嵌套到现有 AWS CloudFormation ElasticBeanstalk 应用程序中?
如何使用 Step Function 在 Amazon EMR 中添加步骤