AWS Step Functions:如何访问在 catch 块中生成异常的状态的输入?
Posted
技术标签:
【中文标题】AWS Step Functions:如何访问在 catch 块中生成异常的状态的输入?【英文标题】:AWS Step Functions: How to access the input of the state that generated an exception in a catch block? 【发布时间】:2020-11-27 20:57:11 【问题描述】:我正在尝试使用状态机语言中定义的 Parallel 和 Catch 块在我的步进函数流中添加错误处理。
以下是我的步骤函数的流程图:
由于我想要一个用于所有步进函数的通用错误处理程序,我将它们包装在一个 Parallel 块中,并添加了一个通用 Catch 块来捕获任何步进函数中的任何错误。在浏览各种示例和博客时,我关注了this 链接并实现了类似的方法。
我观察到的是,每当任何状态引发异常时,控件都会进入 catch 块。 catch 块的输入是引发的异常,该异常包含 JSON 对象中的错误和原因。由于我想要错误以及传递给该状态的输入,因此我在 catch 块中添加了 ResultPath 作为 "$.error"。以下是定义状态机的 JSON 规范。
"StartAt": "Try",
"States":
"Try":
"Type": "Parallel",
"Branches": [
"StartAt": "Step-1",
"States":
"Step-1":
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-1-lambda",
"Next": "Step-2"
,
"Step-2":
"Type": "Choice",
"Choices": [
"Variable": "$.some_variable",
"StringEquals": "some_string",
"Next": "Step-3"
,
"Variable": "$.some_variable",
"StringEquals": "some_other_string",
"Next": "Step-4"
],
"Default": "Step-6"
,
"Step-3":
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-3-lambda",
"Next": "Step-6"
,
"Step-4":
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-4-lambda",
"Next": "Step-6"
,
"Step-6":
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-6-lambda",
"End": true
],
"Catch": [
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.error",
"Next": "ErrorHandler"
],
"Next": "UnwrapOutput"
,
"UnwrapOutput":
"Type": "Pass",
"InputPath": "$[0]",
"End": true
,
"ErrorHandler":
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-7-lambda",
"End": true
例如,考虑第 4 步生成异常。这个状态的输入是:
"foo": "abc",
"bar": "def"
触发状态机的输入是:
"boo": "jkl",
"baz": "mno"
在 ErrorHandler 中,由于步骤 4 生成异常,我期望 ErrorHandler 状态的输入是:
"foo": "abc",
"bar": "def",
"error":
"Error": "SomeError",
"Cause": "SomeCause"
但是,接收到的输入包含用于触发流程的原始输入。
"boo": "jkl",
"baz": "mno",
"error":
"Error": "SomeError",
"Cause": "SomeCause"
我需要在 ErrorHandler 中访问导致异常的状态的输入字段。使用“$”它提供用于触发流程的输入。有什么办法可以实现吗?
任何帮助将不胜感激,我一直在努力解决这个问题。
【问题讨论】:
感谢您提供图表和详细信息。我相信以下内容可以帮助您:***.com/a/64436403 或 dev.to/aws-builders/… 【参考方案1】:我只晚了 10 个月,没那么多哈哈,但我希望你已经找到了解决方案,无论如何,我会分享我的两分钱,以便我可以帮助另一个开发人员,或者更好的是,有人可以告诉我一个更好的方法来做到这一点!
首先,让我们看看我们有哪些场景:
同步作业执行 异步作业执行我们的目标:以某种方式访问触发错误的作业
第一个解决方案 - 适用于所有场景:
基本上,将自定义 try catch 添加到您的所有作业资产中,换句话说,您的 lambda 函数应该抛出一个错误,提供有关它正在使用的作业的信息。我不太喜欢这种方法,因为您正在更改孤立的函数以在状态机中实现一些逻辑。最后,您将两个独立的概念结合在一起,您的状态机不应该需要外部工具来操作和记录自己的上下文。我在这里可能是错的,但这只是我的两分钱,请随意冒犯我的家人(开个玩笑,但请按您的意愿纠正我)。第二种解决方案 - 应用于 Sych 作业执行
当您在状态机中添加“addCatch”时,默认行为是错误输出以覆盖步骤输入。要解决这个问题,您只需要更改 addCatch resultPath,这样您就可以将错误输出与步骤输入一起存储。
EX:“抓住”:[ “ErrorEquals”:[“States.All”], “下一个”:“错误处理程序” "ResultPath": "$.error-info" ]
但是为什么这很重要??????
这样您就可以访问 errorHandlerJob 中的步骤输入,这意味着您始终可以将 stepName 传递给下一步输入,这样您就可以始终知道哪个作业失败了。而且您不会通过直接更改 lambda 函数来做到这一点,而是通过使用作业的属性来解决耦合问题!但这在 ASYNC 场景中是行不通的,接下来我会解释。第三种解决方案——应用于异步作业执行
之前的解决方案在这里不起作用,因为在这种情况下,您只能访问原始输入,因为您使用的是并行分支。所以我在这里所做的与上一个案例类似。我在并行分支中添加了 Pass 状态,这些 Pass 状态负责同步调用我的作业,而且我的所有作业都有自己的 errorHandlingJob 而不是不同的 LAMBDA 函数。我没有在 AWS 上创建新资源,只有一个 HandleError Lambda 函数,因此我可以将监控重点放在那个特定函数上。但是,我使用它为状态机必须执行的每个作业创建一个 errorHandlingJob。 缺点是您的状态机现在拥有庞大的图表,但好处是您现在可以记录哪个作业失败了。如果没有任何抽象,它会是这样的“使用 CDK”
const job1 = new tasks.LambdaInvoke(scope, 'First Job -- PASS',
lambdaFunction: function1,
outputPath: '$.Payload'
)
const job2 = new tasks.LambdaInvoke(scope, 'Second Job -- PASS',
lambdaFunction: function2,
outputPath: '$.Payload'
)
const job3 = new tasks.LambdaInvoke(scope, 'Third Job -- PASS',
lambdaFunction: function3,
outputPath: '$.Payload'
)
const generateHandleErrorJob = () => new tasks.LambdaInvoke(scope, `Handle Error Job $Math.random() * 160000000`,
lambdaFunction: functionError,
outputPath: '$.Payload'
)
const jobToThrowError = new tasks.LambdaInvoke(scope, 'Job To Throw Error -- PASS',
lambdaFunction: fucntionThrowError,
outputPath: '$.Payload',
)
const generatePassCheckSetep = (stepName: string) => new sfn.Pass(scope, `Pass: $stepName`,
resultPath: '$.step-info',
result: sfn.Result.fromObject(
step: stepName
)
)
const definition = new sfn.Parallel(scope, 'Parallel Execution -- PASS')
.branch(generatePassCheckSetep('job1').next(job1.addCatch(generateHandleErrorJob(), resultPath: '$.error-info')))
.branch(generatePassCheckSetep('jobToThrowError').next(jobToThrowError.addCatch(generateHandleErrorJob(), resultPath: '$.error-info')))
.branch(generatePassCheckSetep('job2').next(job2.addCatch(generateHandleErrorJob(), resultPath: '$.error-info')))
.next(job3)
new sfn.StateMachine(scope, id,
definition,
timeout: cdk.Duration.minutes(3)
)
但我还创建了一个抽象“ParallelStateMachineCatch”,因此您可以像这样使用:
this.definition = new ParallelStateMachineCatch(this,
, handleErrorFunction)
.branchCatch(job1)
.branchCatch(job2)
.branchCatch(job3)
.branchCatch(job4)
.branchCatch(job5)
.branchCatch(job6)
.next(final)
这是 ParallelStateMachineCatch 代码:
import Construct, Duration from 'monocdk'
import NodejsFunction from 'monocdk/aws-lambda-nodejs'
import Pass,Result, Parallel, ParallelProps from 'monocdk/aws-stepfunctions'
import LambdaInvoke from 'monocdk/aws-stepfunctions-tasks'
export interface DefinitionProps
sonosEnvironment: string
region: string
accountNumber: string
export class ParallelStateMachineCatch extends Parallel
private errorHandler: NodejsFunction
constructor(scope: Construct, id: string, props: ParallelProps, errorHandler: NodejsFunction)
super(scope, id, props)
this.errorHandler = errorHandler
branchCatch(task: LambdaInvoke): ParallelStateMachineCatch
const randomId = Math.random().toString().replace('0.', '')
const passInputJob = ParallelStateMachineCatch.generatePassInput(this, task.id, randomId)
const handleErrorJob = ParallelStateMachineCatch.generateHandleErrorJob(this, this.errorHandler, randomId)
const resultPath = '$.error-info'
this.branch(passInputJob.next(task.addCatch(handleErrorJob, resultPath )))
return this
private static generateHandleErrorJob(scope: Construct, errorHandler: NodejsFunction, randomId: string): LambdaInvoke
return new LambdaInvoke(scope, `Handle Error $ randomId `,
lambdaFunction: errorHandler,
outputPath: '$.Payload',
timeout: Duration.seconds(5),
)
private static generatePassInput(scope: Construct, stepName: string, randomId: string): Pass
return new Pass(scope, `Pass Input $ randomId `,
resultPath: '$.step-info',
result: Result.fromObject(
name: stepName
)
)
无论如何,我希望我可以帮助某人,这就是我设法解决这个问题的方法。请随时教我更好的方法! Tks 祝你好运和好代码
【讨论】:
以上是关于AWS Step Functions:如何访问在 catch 块中生成异常的状态的输入?的主要内容,如果未能解决你的问题,请参考以下文章
由 CDK 定义时,是不是可以在本地运行 AWS Step Functions?
从上传到 S3 的文件触发的无服务器框架和 AWS Step Functions(AWS 状态机)