Service Fabric Actor 订阅 Azure 服务总线主题

Posted

技术标签:

【中文标题】Service Fabric Actor 订阅 Azure 服务总线主题【英文标题】:Service Fabric Actor subscription to Azure Service Bus Topic 【发布时间】:2018-08-27 03:01:47 【问题描述】:

我正在考虑构建一个系统,该系统要求 Actor 使用特定于 Actor 实例的过滤器创建对 Azure 服务总线主题的订阅。我的问题是,如果演员(订阅了主题)在 Service Fabric 中被停用,它是否会被 Azure 服务总线发送的新消息(重新)激活?

谢谢

【问题讨论】:

【参考方案1】:

您的 Actor 不会因收到消息而被激活。它仅通过远程呼叫和提醒激活。所以这种方法行不通。

您可以做的是在Service 中接收消息,并将它们转发到Actor 实例。如果需要,调用 Actor 会即时创建实例。

【讨论】:

【参考方案2】:

基于Actor's lifecycle,它必须被激活。来自主题的 Azure 服务总线消息不会激活参与者。相反,您需要一个主管流程来执行此操作。消息可以包含一个属性来表示所需的参与者 ID。它还允许通过拥有单个主题和横向扩展的主管来简化您的 Azure 服务总线拓扑。

【讨论】:

【参考方案3】:

这可以通过提醒轻松实现。 由于需要先调用actor,所以可以这样做。

create 方法将设置连接字符串、主题名称、订阅名称并在需要时创建它们。提醒将检查订阅客户端是否不为空,如果是则创建它。提醒将始终在失败时执行,这样您将能够控制失败并在粉碎时重新启动它。

https://github.com/Huachao/azure-content/blob/master/articles/service-fabric/service-fabric-reliable-actors-timers-reminders.md

public async Task<bool> CreateAsync(BusOptions options, CancellationToken cancellationToken)
    
        if (options?.ConnectionString == null)
        
            return false;
        
        await StateManager.AddOrUpdateStateAsync("Options", options,(k,v) => v != options? options:v, cancellationToken);

        var client = new ManagementClient(options.ConnectionString);
        try
        
            var exist = await client.TopicExistsAsync(options.TopicName, cancellationToken);
            if (!exist)
            
               await client.CreateTopicAsync(options.TopicName, cancellationToken);
            
            exist = await client.SubscriptionExistsAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            if (!exist)
            
                await client.CreateSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            
            var rules =await client.GetRulesAsync(options.TopicName,options.SubscriptionName,cancellationToken: cancellationToken);
            if(rules.FirstOrDefault(x=>x.Name == options.RuleName) == null)
            
                SqlFilter filter = new SqlFilter(options.RuleFilterSqlValue);
                await client.CreateRuleAsync(options.TopicName, options.SubscriptionName, new RuleDescription(options.RuleName, filter));
            

        
        catch (Exception ex)
        
            ActorEventSource.Current.ActorMessage(this, ex.Message);                
        
        return true;
    
    public async Task DeleteAsync(BusOptions options, CancellationToken cancellationToken)
    
        var client = new ManagementClient(options.ConnectionString);
        try
        
            await client.DeleteRuleAsync(options.TopicName, options.SubscriptionName, options.RuleName, cancellationToken);
            await client.DeleteSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
        
        catch (Exception ex)
        
            ActorEventSource.Current.ActorMessage(this, ex.Message);
        

    
    private ISubscriptionClient subscriptionClient;       
    public async Task<bool> SendAsync(SendMessage message, CancellationToken cancellationToken)
    
        var options =await StateManager.TryGetStateAsync<BusOptions>("Options");
        if (!options.HasValue)
        
            ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
            return false;
        


        var client = new TopicClient(options.Value.ConnectionString,options.Value.TopicName);

        var msg = new Message(message.Body);
        if(message.UserProperties != null)
        
            foreach (var item in message.UserProperties)
            
                msg.UserProperties.Add(item);
            
        
        msg.Label = message.Label;



       await client.SendAsync(msg);
       await StateManager.AddOrUpdateStateAsync("Messages_Send", 1, (key, value) => 1 > value ? 1 : value, cancellationToken);

        return true;
    
    void RegisterOnMessageHandlerAndReceiveMessages()
    
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
                        
            MaxConcurrentCalls = 1,
            AutoComplete = false
        ;
        subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    
    async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
    
        ActorEventSource.Current.ActorMessage(this, message.Label);

        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);


    
    Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    

        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        ActorEventSource.Current.ActorMessage(this,
            string.Format("Exception context for troubleshooting: - Endpoint: 0- Entity Path: 1- Executing Action: 2 - MEssage: 3",
            context.Endpoint,context.EntityPath,context,exceptionReceivedEventArgs.Exception.Message));
        return Task.CompletedTask;
    
    protected override async Task OnActivateAsync()
    
        ActorEventSource.Current.ActorMessage(this, $"Actor 'Id.GetStringId()' activated.");

        IActorReminder Recieve_Message = await this.RegisterReminderAsync(
                        "Recieve_Message",
                        null,
                        TimeSpan.FromSeconds(1),    //The amount of time to delay before firing the reminder
                        TimeSpan.FromSeconds(1));


    
    public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
    
        if (reminderName.Equals("Recieve_Message"))
        
            if(subscriptionClient == null)
            
                var options = await StateManager.TryGetStateAsync<BusOptions>("Options");
                if (!options.HasValue)
                
                    ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
                    return;
                

                var conn = new ServiceBusConnectionStringBuilder(options.Value.ConnectionString);

                subscriptionClient = new SubscriptionClient(options.Value.ConnectionString, options.Value.TopicName, options.Value.SubscriptionName);

                RegisterOnMessageHandlerAndReceiveMessages();
            

        


            

【讨论】:

以上是关于Service Fabric Actor 订阅 Azure 服务总线主题的主要内容,如果未能解决你的问题,请参考以下文章