Azure Durable 编排功能触发两次

Posted

技术标签:

【中文标题】Azure Durable 编排功能触发两次【英文标题】:Azure Durable orchestration function triggering twice 【发布时间】:2019-02-16 16:55:51 【问题描述】:

我正在尝试实现 Azure Durable Function 工作流。

每 6 分钟,我有一个 Azure TimerTrigger 函数调用一个 Azure 编排函数 (OrchestrationTrigger),该函数又启动多个活动函数 (ActivityTrigger)。

但是,有时,Orchestration 函数会在几秒钟内被调用两次!这是一个大问题,因为我的活动函数不是幂等的!

下面是我的代码是如何被调用的。

定时器触发函数:

[FunctionName("StartupFunc")]
public static async Task Run([TimerTrigger("0 */6 * * * *", RunOnStartup = true, UseMonitor = false)]TimerInfo myStartTimer, [OrchestrationClient] DurableOrchestrationClient orchestrationClient, TraceWriter log)

    List<OrchestrationModel> ExportModels = await getData();

    string id = await orchestrationClient.StartNewAsync("OrchestratorFunc", ExportModels);

编排功能:

[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)

    var dataList = context.GetInput<List<OrchestrationModel>>();
    var tasks = new List<Task>();

    foreach (var data in dataList)
    
        tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
    
    await Task.WhenAll(tasks);

活动功能:

[FunctionName("TransformToSql")]
[public static async Task<string> RunTransformation([ActivityTrigger] DurableActivityContext context, TraceWriter log)

    TransformModel = context.GetInput<TransformModel>();

    //Do some work with TransformModel

【问题讨论】:

【参考方案1】:

这种行为非常好 - 这就是 Durable Functions 的设计原理。

您有以下编排:

[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)

    var dataList = context.GetInput<List<OrchestrationModel>>();
    var tasks = new List<Task>();

    foreach (var data in dataList)
    
        tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
    
    await Task.WhenAll(tasks);

当一个 Activity 被调用时,流程返回到一个名为 Dispatcher 的概念——它是 Durable Functions 的内部存在,负责维护您的编排流程。在等待任务完成之前,编排会被临时解除分配。任务完成后,将重播整个编排,直到下一个 await 发生。

重要的是,尽管重放了编排,但不会再次调用活动 - 它的结果是从存储中获取并使用的。使事情更明确,请考虑以下示例:

[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)

    var dataList = context.GetInput<List<OrchestrationModel>>();
    var tasks = new List<Task>();

    foreach (var data in dataList)
    
        await context.CallActivityAsync<string>("TransformToSql1", new TransformModel(data);
        await context.CallActivityAsync<string>("TransformToSql2", new TransformModel(data);
    

TransformToSql1 处于等待状态时,编排被解除分配,整个流程一直等待,直到此活动完成。然后重放编排 - 它再次等待TransformToSql1,但由于它的结果被保存,它只是回到编排并等待TransformToSql2 - 然后重复该过程。

【讨论】:

我知道活动的重播功能,并且工作得很好,没有任何问题。我的问题是 Orchestrator 函数的两个并发执行。 @kamil-mrzyglod - 所以如果我的编排功能需要执行一些一次性代码,在单个活动请求执行之前而不是重新运行之前发生,你会建议做什么?在单独的活动中执行它,预先调用? @StephenHolt - 如果您的编排在运行前需要某种“预热”,它应该从外部源获取数据(例如,它应该作为编排输入传递)。【参考方案2】:

编排函数将在由 Durable Function 框架重放时更频繁地运行。你看到你的活动函数也被再次触发了吗?如果是这样,那么这确实是一种奇怪的行为。

Durable Functions 使用storage queues 和表来控制编排的流程和捕获状态。

每次触发编排功能时(通过从控制队列接收消息),它都会重播编排。您可以使用DurableOrchestrationContext 上的IsReplaying 属性在代码中检查这一点。

if (!context.IsReplaying)

   // this code will only run during the first execution of the workflow

不要使用IsReplaying 来确定运行活动,而是将其用于记录/调试目的。

当达到一个活动功能时,一条消息被放入工作项队列。然后,Durable Functions 框架将查看历史记录表以确定此活动函数之前是否已经运行过(使用给定的参数)。如果是,那么它将不再运行活动功能,它将继续执行其余的编排功能。

Durable Functions 的检查点和重放功能仅在编排代码具有确定性时才有效。因此,切勿使用基于 DateTime 或随机数的决策。

更多信息:https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay

【讨论】:

我看到第二个编排函数(尝试)执行与第一个编排函数完全相同的活动。这会导致很多异常和中止工作。

以上是关于Azure Durable 编排功能触发两次的主要内容,如果未能解决你的问题,请参考以下文章

每次触发警报时,连接到逻辑应用 Webhook 的 Azure 警报操作都会触发两次

Azure Devops 管道通过生成验证触发两次

用于文件共享的 Azure 触发功能

使用 Durable Functions 推送到服务总线的消息计数不可靠

触发 Azure 数据工厂管道 - Blob 上传 ADLS Gen2(以编程方式)

涉及时间触发 azure 功能的验收测试功能