AWS Step Functions:如何访问在 catch 块中生成异常的状态的输入?

Posted

技术标签:

【中文标题】AWS Step Functions:如何访问在 catch 块中生成异常的状态的输入?【英文标题】:AWS Step Functions: How to access the input of the state that generated an exception in a catch block? 【发布时间】:2020-11-27 20:57:11 【问题描述】:

我正在尝试使用状态机语言中定义的 Parallel 和 Catch 块在我的步进函数流中添加错误处理。

以下是我的步骤函数的流程图:

由于我想要一个用于所有步进函数的通用错误处理程序,我将它们包装在一个 Parallel 块中,并添加了一个通用 Catch 块来捕获任何步进函数中的任何错误。在浏览各种示例和博客时,我关注了this 链接并实现了类似的方法。

我观察到的是,每当任何状态引发异常时,控件都会进入 catch 块。 catch 块的输入是引发的异常,该异常包含 JSON 对象中的错误和原因。由于我想要错误以及传递给该状态的输入,因此我在 catch 块中添加了 ResultPath 作为 "$.error"。以下是定义状态机的 JSON 规范。

    
  "StartAt": "Try",
  "States": 
    "Try": 
      "Type": "Parallel",
      "Branches": [
        
          "StartAt": "Step-1",
          "States": 
            "Step-1": 
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-1-lambda",
              "Next": "Step-2"
            ,
            "Step-2": 
              "Type": "Choice",
              "Choices": [
                
                  "Variable": "$.some_variable",
                  "StringEquals": "some_string",
                  "Next": "Step-3"
                ,
                
                  "Variable": "$.some_variable",
                  "StringEquals": "some_other_string",
                  "Next": "Step-4"
                
              ],
              "Default": "Step-6"
            ,
            "Step-3": 
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-3-lambda",
              "Next": "Step-6"
            ,
            "Step-4": 
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-4-lambda",
              "Next": "Step-6"
            ,
            "Step-6": 
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-6-lambda",
              "End": true
            
          
        
      ],
      "Catch": [
        
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.error",
          "Next": "ErrorHandler"
        
      ],
      "Next": "UnwrapOutput"
    ,
    "UnwrapOutput": 
      "Type": "Pass",
      "InputPath": "$[0]",
      "End": true
    ,
    "ErrorHandler": 
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-7-lambda",
      "End": true
    
  

例如,考虑第 4 步生成异常。这个状态的输入是:


   "foo": "abc",
   "bar": "def"

触发状态机的输入是:


  "boo": "jkl",
   "baz": "mno"

在 ErrorHandler 中,由于步骤 4 生成异常,我期望 ErrorHandler 状态的输入是:


  "foo": "abc",
   "bar": "def",
   "error": 
       "Error": "SomeError",
       "Cause": "SomeCause"
   

但是,接收到的输入包含用于触发流程的原始输入。


  "boo": "jkl",
   "baz": "mno",
   "error": 
       "Error": "SomeError",
       "Cause": "SomeCause"
   

我需要在 ErrorHandler 中访问导致异常的状态的输入字段。使用“$”它提供用于触发流程的输入。有什么办法可以实现吗?

任何帮助将不胜感激,我一直在努力解决这个问题。

【问题讨论】:

感谢您提供图表和详细信息。我相信以下内容可以帮助您:***.com/a/64436403 或 dev.to/aws-builders/… 【参考方案1】:

我只晚了 10 个月,没那么多哈哈,但我希望你已经找到了解决方案,无论如何,我会分享我的两分钱,以便我可以帮助另一个开发人员,或者更好的是,有人可以告诉我一个更好的方法来做到这一点!

首先,让我们看看我们有哪些场景:

同步作业执行 异步作业执行

我们的目标:以某种方式访问​​触发错误的作业

第一个解决方案 - 适用于所有场景:

基本上,将自定义 try catch 添加到您的所有作业资产中,换句话说,您的 lambda 函数应该抛出一个错误,提供有关它正在使用的作业的信息。我不太喜欢这种方法,因为您正在更改孤立的函数以在状态机中实现一些逻辑。最后,您将两个独立的概念结合在一起,您的状态机不应该需要外部工具来操作和记录自己的上下文。我在这里可能是错的,但这只是我的两分钱,请随意冒犯我的家人(开个玩笑,但请按您的意愿纠正我)。

第二种解决方案 - 应用于 Sych 作业执行

当您在状态机中添加“addCatch”时,默认行为是错误输出以覆盖步骤输入。要解决这个问题,您只需要更改 addCatch resultPath,这样您就可以将错误输出与步骤输入一起存储。

EX:“抓住”:[ “ErrorEquals”:[“States.All”], “下一个”:“错误处理程序” "ResultPath": "$.error-info" ]

但是为什么这很重要??????
这样您就可以访问 errorHandlerJob 中的步骤输入,这意味着您始终可以将 stepName 传递给下一步输入,这样您就可以始终知道哪个作业失败了。而且您不会通过直接更改 lambda 函数来做到这一点,而是通过使用作业的属性来解决耦合问题!但这在 ASYNC 场景中是行不通的,接下来我会解释。

第三种解决方案——应用于异步作业执行

之前的解决方案在这里不起作用,因为在这种情况下,您只能访问原始输入,因为您使用的是并行分支。所以我在这里所做的与上一个案例类似。我在并行分支中添加了 Pass 状态,这些 Pass 状态负责同步调用我的作业,而且我的所有作业都有自己的 errorHandlingJob 而不是不同的 LAMBDA 函数。我没有在 AWS 上创建新资源,只有一个 HandleError Lambda 函数,因此我可以将监控重点放在那个特定函数上。但是,我使用它为状态机必须执行的每个作业创建一个 errorHandlingJob。 缺点是您的状态机现在拥有庞大的图表,但好处是您现在可以记录哪个作业失败了。
如果没有任何抽象,它会是这样的“使用 CDK”
    const job1 = new tasks.LambdaInvoke(scope, 'First Job -- PASS', 
        lambdaFunction: function1,
        outputPath: '$.Payload'
    )

    const job2 = new tasks.LambdaInvoke(scope, 'Second Job -- PASS', 
        lambdaFunction: function2,
        outputPath: '$.Payload'
    )

    const job3 = new tasks.LambdaInvoke(scope, 'Third Job -- PASS', 
        lambdaFunction: function3,
        outputPath: '$.Payload'
    )

    const generateHandleErrorJob = () => new tasks.LambdaInvoke(scope, `Handle Error Job $Math.random() * 160000000`, 
        lambdaFunction: functionError,
        outputPath: '$.Payload'
    )

    const jobToThrowError = new tasks.LambdaInvoke(scope, 'Job To Throw Error -- PASS', 
        lambdaFunction: fucntionThrowError,
        outputPath: '$.Payload',
    )

    const generatePassCheckSetep = (stepName: string) => new sfn.Pass(scope, `Pass: $stepName`, 
        resultPath: '$.step-info',
        result: sfn.Result.fromObject(
            step: stepName
        )
    )

    const definition = new sfn.Parallel(scope, 'Parallel Execution -- PASS')
        .branch(generatePassCheckSetep('job1').next(job1.addCatch(generateHandleErrorJob(), resultPath: '$.error-info')))
        .branch(generatePassCheckSetep('jobToThrowError').next(jobToThrowError.addCatch(generateHandleErrorJob(), resultPath: '$.error-info')))
        .branch(generatePassCheckSetep('job2').next(job2.addCatch(generateHandleErrorJob(), resultPath: '$.error-info')))
        .next(job3)

    new sfn.StateMachine(scope, id, 
        definition,
        timeout: cdk.Duration.minutes(3)
    )

但我还创建了一个抽象“ParallelStateMachineCatch”,因此您可以像这样使用:

this.definition = new ParallelStateMachineCatch(this, 
, handleErrorFunction)
  .branchCatch(job1)
  .branchCatch(job2)
  .branchCatch(job3)
  .branchCatch(job4)
  .branchCatch(job5)
  .branchCatch(job6)
  .next(final)

这是 ParallelStateMachineCatch 代码:

import  Construct, Duration  from 'monocdk'
import  NodejsFunction  from 'monocdk/aws-lambda-nodejs'
import  Pass,Result, Parallel, ParallelProps  from 'monocdk/aws-stepfunctions'
import  LambdaInvoke  from 'monocdk/aws-stepfunctions-tasks'

export interface DefinitionProps 
  sonosEnvironment: string
  region: string
  accountNumber: string


export class ParallelStateMachineCatch extends Parallel 
  private errorHandler: NodejsFunction

  constructor(scope: Construct, id: string, props: ParallelProps, errorHandler: NodejsFunction) 
    super(scope, id, props)
    this.errorHandler = errorHandler
  



  branchCatch(task: LambdaInvoke): ParallelStateMachineCatch 
    const randomId = Math.random().toString().replace('0.', '')
    const passInputJob = ParallelStateMachineCatch.generatePassInput(this, task.id, randomId)
    const handleErrorJob = ParallelStateMachineCatch.generateHandleErrorJob(this, this.errorHandler, randomId)
    const resultPath = '$.error-info'

    this.branch(passInputJob.next(task.addCatch(handleErrorJob,  resultPath )))

    return this
  

  private static generateHandleErrorJob(scope: Construct, errorHandler: NodejsFunction, randomId: string): LambdaInvoke 
    return new LambdaInvoke(scope, `Handle Error $ randomId `, 
      lambdaFunction: errorHandler,
      outputPath: '$.Payload',
      timeout: Duration.seconds(5),
    )
  

  private static generatePassInput(scope: Construct, stepName: string, randomId: string): Pass 
    return new Pass(scope, `Pass Input $ randomId `, 
      resultPath: '$.step-info',
      result: Result.fromObject(
        name: stepName
      )
    )
  



无论如何,我希望我可以帮助某人,这就是我设法解决这个问题的方法。请随时教我更好的方法! Tks 祝你好运和好代码

【讨论】:

以上是关于AWS Step Functions:如何访问在 catch 块中生成异常的状态的输入?的主要内容,如果未能解决你的问题,请参考以下文章

AWS SQS 触发 Step Functions

由 CDK 定义时,是不是可以在本地运行 AWS Step Functions?

从上传到 S3 的文件触发的无服务器框架和 AWS Step Functions(AWS 状态机)

工作流引擎比较:AirflowAzkabanConductorOozie和 Amazon Step Functions

从java lambda调用aws Step函数

Step Functions