Service Fabric Actor 订阅 Azure 服务总线主题
Posted
技术标签:
【中文标题】Service Fabric Actor 订阅 Azure 服务总线主题【英文标题】:Service Fabric Actor subscription to Azure Service Bus Topic 【发布时间】:2018-08-27 03:01:47 【问题描述】:我正在考虑构建一个系统,该系统要求 Actor 使用特定于 Actor 实例的过滤器创建对 Azure 服务总线主题的订阅。我的问题是,如果演员(订阅了主题)在 Service Fabric 中被停用,它是否会被 Azure 服务总线发送的新消息(重新)激活?
谢谢
【问题讨论】:
【参考方案1】:您的 Actor 不会因收到消息而被激活。它仅通过远程呼叫和提醒激活。所以这种方法行不通。
您可以做的是在Service 中接收消息,并将它们转发到Actor 实例。如果需要,调用 Actor 会即时创建实例。
【讨论】:
【参考方案2】:基于Actor's lifecycle,它必须被激活。来自主题的 Azure 服务总线消息不会激活参与者。相反,您需要一个主管流程来执行此操作。消息可以包含一个属性来表示所需的参与者 ID。它还允许通过拥有单个主题和横向扩展的主管来简化您的 Azure 服务总线拓扑。
【讨论】:
【参考方案3】:这可以通过提醒轻松实现。 由于需要先调用actor,所以可以这样做。
create 方法将设置连接字符串、主题名称、订阅名称并在需要时创建它们。提醒将检查订阅客户端是否不为空,如果是则创建它。提醒将始终在失败时执行,这样您将能够控制失败并在粉碎时重新启动它。
https://github.com/Huachao/azure-content/blob/master/articles/service-fabric/service-fabric-reliable-actors-timers-reminders.md
public async Task<bool> CreateAsync(BusOptions options, CancellationToken cancellationToken)
if (options?.ConnectionString == null)
return false;
await StateManager.AddOrUpdateStateAsync("Options", options,(k,v) => v != options? options:v, cancellationToken);
var client = new ManagementClient(options.ConnectionString);
try
var exist = await client.TopicExistsAsync(options.TopicName, cancellationToken);
if (!exist)
await client.CreateTopicAsync(options.TopicName, cancellationToken);
exist = await client.SubscriptionExistsAsync(options.TopicName, options.SubscriptionName, cancellationToken);
if (!exist)
await client.CreateSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
var rules =await client.GetRulesAsync(options.TopicName,options.SubscriptionName,cancellationToken: cancellationToken);
if(rules.FirstOrDefault(x=>x.Name == options.RuleName) == null)
SqlFilter filter = new SqlFilter(options.RuleFilterSqlValue);
await client.CreateRuleAsync(options.TopicName, options.SubscriptionName, new RuleDescription(options.RuleName, filter));
catch (Exception ex)
ActorEventSource.Current.ActorMessage(this, ex.Message);
return true;
public async Task DeleteAsync(BusOptions options, CancellationToken cancellationToken)
var client = new ManagementClient(options.ConnectionString);
try
await client.DeleteRuleAsync(options.TopicName, options.SubscriptionName, options.RuleName, cancellationToken);
await client.DeleteSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
catch (Exception ex)
ActorEventSource.Current.ActorMessage(this, ex.Message);
private ISubscriptionClient subscriptionClient;
public async Task<bool> SendAsync(SendMessage message, CancellationToken cancellationToken)
var options =await StateManager.TryGetStateAsync<BusOptions>("Options");
if (!options.HasValue)
ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
return false;
var client = new TopicClient(options.Value.ConnectionString,options.Value.TopicName);
var msg = new Message(message.Body);
if(message.UserProperties != null)
foreach (var item in message.UserProperties)
msg.UserProperties.Add(item);
msg.Label = message.Label;
await client.SendAsync(msg);
await StateManager.AddOrUpdateStateAsync("Messages_Send", 1, (key, value) => 1 > value ? 1 : value, cancellationToken);
return true;
void RegisterOnMessageHandlerAndReceiveMessages()
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
MaxConcurrentCalls = 1,
AutoComplete = false
;
subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
ActorEventSource.Current.ActorMessage(this, message.Label);
await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
ActorEventSource.Current.ActorMessage(this,
string.Format("Exception context for troubleshooting: - Endpoint: 0- Entity Path: 1- Executing Action: 2 - MEssage: 3",
context.Endpoint,context.EntityPath,context,exceptionReceivedEventArgs.Exception.Message));
return Task.CompletedTask;
protected override async Task OnActivateAsync()
ActorEventSource.Current.ActorMessage(this, $"Actor 'Id.GetStringId()' activated.");
IActorReminder Recieve_Message = await this.RegisterReminderAsync(
"Recieve_Message",
null,
TimeSpan.FromSeconds(1), //The amount of time to delay before firing the reminder
TimeSpan.FromSeconds(1));
public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
if (reminderName.Equals("Recieve_Message"))
if(subscriptionClient == null)
var options = await StateManager.TryGetStateAsync<BusOptions>("Options");
if (!options.HasValue)
ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
return;
var conn = new ServiceBusConnectionStringBuilder(options.Value.ConnectionString);
subscriptionClient = new SubscriptionClient(options.Value.ConnectionString, options.Value.TopicName, options.Value.SubscriptionName);
RegisterOnMessageHandlerAndReceiveMessages();
【讨论】:
以上是关于Service Fabric Actor 订阅 Azure 服务总线主题的主要内容,如果未能解决你的问题,请参考以下文章
从 .Net Core Stateless Service 调用 .Net 框架 Service Fabric Actor
同一个 Service Fabric Actor 应用程序中是不是有大量的 Actor?
Service Fabric 运行时未从 Actor 服务实例回收未使用的内存