使用 Durable Functions 推送到服务总线的消息计数不可靠

Posted

技术标签:

【中文标题】使用 Durable Functions 推送到服务总线的消息计数不可靠【英文标题】:Unreliable count of messages pushed to Service Bus using Durable Functions 【发布时间】:2021-11-29 12:23:07 【问题描述】:

我在使用 Durable azure 函数向 azure 服务总线提交消息时遇到了这个奇怪的问题。

我的代码是一个简单的扇出实现

REST 触发器获取要提交的消息数量并交给协调器。 Orchestrator 存储将创建消息并将消息提交到服务总线的调用活动。

问题是当我发送 REST 参数要求添加 3000 条消息时,添加了超过 3000 条消息。 更糟糕的是,它也不是同一个数字 - 3104、3100、3286 任何东西......

见下面的代码:

[FunctionName("Function1_HttpStart")]
//public static async Task<HttpResponseMessage> HttpStart(
public static async Task<IActionResult> HttpStart(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequest req,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)

    String type = req.Query["type"];
    if(!long.TryParse(req.Query["count"], out var count))
    
        return new ObjectResult($"Parse failed for parameter 'count' (req.Query["count"]) to Int.")  StatusCode = 400;
    

    var restInputs = new RestInputs() 
         Type = type, Count = count ;

    // Function input comes from the request content.
    string instanceId = await starter.StartNewAsync
        ("EmailQueueSubmitter_OrchestratorSingleton"
        , restInputs);

    log.LogInformation($"Started orchestration with ID = 'instanceId'.");

    return starter.CreateCheckStatusResponse(req, instanceId);


[FunctionName("EmailQueueSubmitter_OrchestratorSingleton")]
public static async Task<List<string>> EmailQueueSubmitter_OrchestratorSingleton(
    [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)

    var outputs = new List<string>();
    try
    
        var restInputs = context.GetInput<RestInputs>();

        var parallelTasks = new List<Task>();
        long runBatchLen;
        long i_batch, i_iter, batchCount = 0;

        for (i_batch = 0; i_batch < restInputs.Count; i_batch++)
        
            parallelTasks.Add(context.CallActivityAsync("EmailQueueSubmitter_ActivitySendMessageBatchSingleton", i_batch.ToString()));
            log.LogWarning($"Message i_batch Added");
        

        log.LogWarning($"Awaiting parallelTasks.Count tasks");
        await Task.WhenAll(parallelTasks);
        var doneTaskCount = parallelTasks.Where(t => t.IsCompleted).ToList().Count;
        var successTaskCount = parallelTasks.Where(t => t.IsCompletedSuccessfully).ToList().Count;
        var faultedTaskCount = parallelTasks.Where(t => t.IsFaulted).ToList().Count;
        var exceptionTaskCount = parallelTasks.Where(t => t.Exception != null).ToList().Count;
        log.LogWarning($"Done:doneTaskCount, Success: successTaskCount, Fault:faultedTaskCount, Exception:exceptionTaskCount");

        log.LogWarning($"Acheived Completion.");
    
    catch (Exception ex)
    
        log.LogError(ex.Message);
        throw new InvalidOperationException(ex.Message);
    

    return outputs;

[FunctionName("EmailQueueSubmitter_ActivitySendMessageBatchSingleton")]
public static async Task EmailQueueSubmitter_ActivitySendMessageBatchSingleton([ActivityTrigger] IDurableActivityContext activityContext, ILogger log)

    log.LogWarning($"Starting Activity.");
    var payload = activityContext.GetInput<String>();
    
    await ServiceBus_Sender.SendMessageBatch(payload);
    log.LogWarning($"Finished Activity.");

public static ServiceBusMessage CreateMessage(String Payload)

    try
    
        var sbMsg = new ServiceBusMessage(Payload)
        
            MessageId = Guid.NewGuid().ToString(),
            ContentType = "text/plain"
        ;
        //sbMsg.ApplicationProperties.Add("RequestType", "Publish");

        return sbMsg;
    
    catch (Exception ex)
    
        throw new InvalidOperationException(ex.Message, ex);
    

【问题讨论】:

docs.microsoft.com/en-us/azure/azure-functions/durable/… -> "Orchestrator、entity 和 Activity 函数都由函数应用任务中心中的内部队列触发。以这种方式使用队列提供了可靠的 "at-least-once " 消息传递保证。" -> 强调我的。消息是否重复完全由您自己决定。 @CamiloTerevinto :感谢您的评论:但是我们如何在服务总线中运行重复检查?我上面的代码是一个虚拟的,实际上消息将从数据库记录构建,并且必须推送到 SB。生产场景一次最多可以发送 100 万条记录。 您需要某种 ID。您可以生成一个 Guid 并将其与数据一起发送,然后检查该 Guid 之前是否未被处理 【参考方案1】:

感谢@Camilo Terevinto 提供的信息,我将其转换为答案,以便对其他社区成员有所帮助:

按照 cmets 中的建议,要运行重复检查,您可以生成 Guid 并将其与数据一起发送,然后检查 Guid 是否不存在之前已经处理过了。希望这能解决您的问题。

OP 编辑​​:通过将服务总线队列更改为启用会话并打开重复数据删除来启用重复检查。提交的消息的 MessageId 在每个会话中设置为唯一的。这是我能想到的处理at-least-once保证的唯一方法......

【讨论】:

以上是关于使用 Durable Functions 推送到服务总线的消息计数不可靠的主要内容,如果未能解决你的问题,请参考以下文章

错误“snapshot.val”不是 Google Cloud Functions 中的函数

当另一个客户端发送到服务器时,如何让 UDP 服务器推送到客户端

如何使用 AWS Kinesis Firehose 将嵌套结构推送到 Redshift

rsync+nfs企业实战案例

如何使用 Airwatch 将凭证推送到 APP

Fastlane - 将证书推送到回购