Service Fabric Actor 订阅 Azure 服务总线主题

Posted

技术标签:

【中文标题】Service Fabric Actor 订阅 Azure 服务总线主题【英文标题】:Service Fabric Actor subscription to Azure Service Bus Topic 【发布时间】:2018-08-27 03:01:47 【问题描述】:

我正在考虑构建一个系统,该系统要求 Actor 使用特定于 Actor 实例的过滤器创建对 Azure 服务总线主题的订阅。我的问题是,如果演员(订阅了主题)在 Service Fabric 中被停用,它是否会被 Azure 服务总线发送的新消息(重新)激活?

谢谢

【问题讨论】:

【参考方案1】:

您的 Actor 不会因收到消息而被激活。它仅通过远程呼叫和提醒激活。所以这种方法行不通。

您可以做的是在Service 中接收消息,并将它们转发到Actor 实例。如果需要,调用 Actor 会即时创建实例。

【讨论】:

【参考方案2】:

基于Actor's lifecycle,它必须被激活。来自主题的 Azure 服务总线消息不会激活参与者。相反,您需要一个主管流程来执行此操作。消息可以包含一个属性来表示所需的参与者 ID。它还允许通过拥有单个主题和横向扩展的主管来简化您的 Azure 服务总线拓扑。

【讨论】:

【参考方案3】:

这可以通过提醒轻松实现。 由于需要先调用actor,所以可以这样做。

create 方法将设置连接字符串、主题名称、订阅名称并在需要时创建它们。提醒将检查订阅客户端是否不为空,如果是则创建它。提醒将始终在失败时执行,这样您将能够控制失败并在粉碎时重新启动它。

https://github.com/Huachao/azure-content/blob/master/articles/service-fabric/service-fabric-reliable-actors-timers-reminders.md

public async Task<bool> CreateAsync(BusOptions options, CancellationToken cancellationToken)
    
        if (options?.ConnectionString == null)
        
            return false;
        
        await StateManager.AddOrUpdateStateAsync("Options", options,(k,v) => v != options? options:v, cancellationToken);

        var client = new ManagementClient(options.ConnectionString);
        try
        
            var exist = await client.TopicExistsAsync(options.TopicName, cancellationToken);
            if (!exist)
            
               await client.CreateTopicAsync(options.TopicName, cancellationToken);
            
            exist = await client.SubscriptionExistsAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            if (!exist)
            
                await client.CreateSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            
            var rules =await client.GetRulesAsync(options.TopicName,options.SubscriptionName,cancellationToken: cancellationToken);
            if(rules.FirstOrDefault(x=>x.Name == options.RuleName) == null)
            
                SqlFilter filter = new SqlFilter(options.RuleFilterSqlValue);
                await client.CreateRuleAsync(options.TopicName, options.SubscriptionName, new RuleDescription(options.RuleName, filter));
            

        
        catch (Exception ex)
        
            ActorEventSource.Current.ActorMessage(this, ex.Message);                
        
        return true;
    
    public async Task DeleteAsync(BusOptions options, CancellationToken cancellationToken)
    
        var client = new ManagementClient(options.ConnectionString);
        try
        
            await client.DeleteRuleAsync(options.TopicName, options.SubscriptionName, options.RuleName, cancellationToken);
            await client.DeleteSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
        
        catch (Exception ex)
        
            ActorEventSource.Current.ActorMessage(this, ex.Message);
        

    
    private ISubscriptionClient subscriptionClient;       
    public async Task<bool> SendAsync(SendMessage message, CancellationToken cancellationToken)
    
        var options =await StateManager.TryGetStateAsync<BusOptions>("Options");
        if (!options.HasValue)
        
            ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
            return false;
        


        var client = new TopicClient(options.Value.ConnectionString,options.Value.TopicName);

        var msg = new Message(message.Body);
        if(message.UserProperties != null)
        
            foreach (var item in message.UserProperties)
            
                msg.UserProperties.Add(item);
            
        
        msg.Label = message.Label;



       await client.SendAsync(msg);
       await StateManager.AddOrUpdateStateAsync("Messages_Send", 1, (key, value) => 1 > value ? 1 : value, cancellationToken);

        return true;
    
    void RegisterOnMessageHandlerAndReceiveMessages()
    
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
                        
            MaxConcurrentCalls = 1,
            AutoComplete = false
        ;
        subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    
    async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
    
        ActorEventSource.Current.ActorMessage(this, message.Label);

        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);


    
    Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    

        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        ActorEventSource.Current.ActorMessage(this,
            string.Format("Exception context for troubleshooting: - Endpoint: 0- Entity Path: 1- Executing Action: 2 - MEssage: 3",
            context.Endpoint,context.EntityPath,context,exceptionReceivedEventArgs.Exception.Message));
        return Task.CompletedTask;
    
    protected override async Task OnActivateAsync()
    
        ActorEventSource.Current.ActorMessage(this, $"Actor 'Id.GetStringId()' activated.");

        IActorReminder Recieve_Message = await this.RegisterReminderAsync(
                        "Recieve_Message",
                        null,
                        TimeSpan.FromSeconds(1),    //The amount of time to delay before firing the reminder
                        TimeSpan.FromSeconds(1));


    
    public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
    
        if (reminderName.Equals("Recieve_Message"))
        
            if(subscriptionClient == null)
            
                var options = await StateManager.TryGetStateAsync<BusOptions>("Options");
                if (!options.HasValue)
                
                    ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
                    return;
                

                var conn = new ServiceBusConnectionStringBuilder(options.Value.ConnectionString);

                subscriptionClient = new SubscriptionClient(options.Value.ConnectionString, options.Value.TopicName, options.Value.SubscriptionName);

                RegisterOnMessageHandlerAndReceiveMessages();
            

        


            

【讨论】:

以上是关于Service Fabric Actor 订阅 Azure 服务总线主题的主要内容,如果未能解决你的问题,请参考以下文章

Service Fabric Actor 计时器性能影响

从 .Net Core Stateless Service 调用 .Net 框架 Service Fabric Actor

同一个 Service Fabric Actor 应用程序中是不是有大量的 Actor?

Service Fabric 运行时未从 Actor 服务实例回收未使用的内存

通过 URI 调用 Service Fabric Actor Service 时找不到 V2Listener

Service Fabric Actor State:在持久模式下从哪里读取状态?