使用 Azure 服务总线队列侦听器定期在 RenewToken 上接收未经授权的访问错误

Posted

技术标签:

【中文标题】使用 Azure 服务总线队列侦听器定期在 RenewToken 上接收未经授权的访问错误【英文标题】:Receiving Unauthorized Access Error on RenewToken Periodically with Azure Service Bus Queue Listener 【发布时间】:2020-08-11 01:10:45 【问题描述】:

我在队列接收器中定期收到 Microsoft.Azure.ServiceBus.ServiceBusException(以下消息已删除敏感信息)。 SAS 密钥具有发送/侦听访问权限,并且该错误似乎无关紧要,因为处理继续正常进行。但是,该消息在我的仪表板中产生了信噪比问题(每天收到 10-70 个错误)。关于为什么会发生这种情况的任何想法?侦听器在 Azure 应用服务中运行,但我认为这并不重要。我已调整我的重试逻辑以使用具有 1 秒到 1 分钟回退的 RetryExponential 和 5 次重试。

Request for guidance from SDK developers

Net Core 3.1

Microsoft.Azure.ServiceBus, Version=4.1.3.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c

错误信息

链接 'xxx;xxx:xxx:xxx:source(address:xxx):xxx' 被强制分离。代码:更新令牌。详细信息:未经授权的访问。执行此操作需要“侦听”声明。资源:'sb://xxx.servicebus.windows.net/xxx'.. TrackingId:xxx, SystemTracker:xxx, Timestamp:2020-04-27T09:36:04 链接 'xxx;xxx:xxx:xxx:source (address:xxx):xxx' 被强制分离。代码:更新令牌。详细信息:未经授权的访问。执行此操作需要“侦听”声明。资源:'sb://xxx.servicebus.windows.net/xxx'.. TrackingId:xxx, SystemTracker:xxx, Timestamp:2020-04-27T09:36:04

来源

internal delegate TClient ClientFactory<out TClient>(string connectionString, string entityPath,
    RetryPolicy retryPolicy);

internal delegate Task OnMessageCallback<in TMessage>(TMessage message,
    CancellationToken cancellationToken = default) where TMessage : ICorrelative;

internal sealed class ReceiverClientWrapper<TMessage> : IReceiverClientWrapper<TMessage>
    where TMessage : ICorrelative

    // ReSharper disable once StaticMemberInGenericType
    private static readonly Regex TransientConnectionErrorRegex =
        new Regex(
            @"(The link '([a-f0-9-]+);([0-9]*:)*source\(address:([a-z0-9_]+)\):([a-z0-9_]+)' is force detached. Code: RenewToken. Details: Unauthorized access. 'Listen' claim\(s\) are required to perform this operation. Resource: 'sb:\/\/([a-z0-9-_.\/]+)'.. TrackingId:([a-z0-9_]+), SystemTracker:([a-z0-9]+), Timestamp:([0-9]4(-[0-9]2)2T([0-9]2:)2[0-9]2) )+",
            RegexOptions.Compiled | RegexOptions.Multiline | RegexOptions.IgnoreCase);

    private readonly IReceiverClient _receiverClient;
    private readonly IMessageConverter<TMessage> _messageConverter;
    private readonly ILogger _logger;
    private readonly int _maximumConcurrency;

    public ReceiverClientWrapper(IReceiverClient receiverClient, IMessageConverter<TMessage> messageConverter,
        ILogger logger, int maximumConcurrency)
    
        _receiverClient = receiverClient;
        _messageConverter = messageConverter;
        _logger = logger;
        _maximumConcurrency = maximumConcurrency;
    

    public Task SubscribeAsync(OnMessageCallback<TMessage> onMessageCallback,
        OnFailureCallback onFailureCallback, CancellationToken cancellationToken = default)
    
        var messageHandlerOptions = CreateMessageHandlerOptions(onFailureCallback, cancellationToken);

        async Task Handler(Message message, CancellationToken token)
        
            var convertedMessage = _messageConverter.Convert(message);

            await onMessageCallback(convertedMessage, cancellationToken);
            await _receiverClient.CompleteAsync(message.SystemProperties.LockToken);
        

        _receiverClient.RegisterMessageHandler(Handler, messageHandlerOptions);

        return Task.CompletedTask;
    

    private MessageHandlerOptions CreateMessageHandlerOptions(OnFailureCallback onFailureCallback,
        CancellationToken cancellationToken)
    
        async Task HandleExceptionAsync(ExceptionReceivedEventArgs arguments)
        
            var exception = arguments.Exception;

            if (TransientConnectionErrorRegex.IsMatch(exception.Message))
            
                _logger.LogWarning(exception, @"Transient connectivity error occurred");

                return;
            

            await onFailureCallback(exception, cancellationToken);
        

        return new MessageHandlerOptions(HandleExceptionAsync)
        
            AutoComplete = false,
            MaxConcurrentCalls = _maximumConcurrency
        ;
    

    public async ValueTask DisposeAsync()
    
        await _receiverClient.CloseAsync();
    


internal sealed class SenderClientWrapper<TMessage> : ISenderClientWrapper<TMessage> where TMessage : ICorrelative

    private readonly ISenderClient _senderClient;
    private readonly IMessageConverter<TMessage> _messageConverter;

    public SenderClientWrapper(ISenderClient senderClient, IMessageConverter<TMessage> messageConverter)
    
        _senderClient = senderClient;
        _messageConverter = messageConverter;
    

    public Task SendAsync(TMessage message, CancellationToken cancellationToken = default)
    
        var internalMessage = _messageConverter.Convert(message);

        return _senderClient.SendAsync(internalMessage);
    

    public Task SendAsync(IEnumerable<TMessage> messages, CancellationToken cancellationToken = default)
    
        var internalMessages = messages
            .Select(_messageConverter.Convert)
            .ToImmutableArray();

        return _senderClient.SendAsync(internalMessages);
    

    public async ValueTask DisposeAsync()
    
        await _senderClient.CloseAsync();
    


internal abstract class AbstractClientWrapperFactory

    private const int MaximumRetryCount = 5;
    private static readonly TimeSpan MinimumRetryBackOff = TimeSpan.FromSeconds(1);
    private static readonly TimeSpan MaximumRetryBackOff = TimeSpan.FromMinutes(1);

    protected AbstractClientWrapperFactory(IOptions<MessageBusConfiguration> options)
    
        Options = options;
    

    protected IOptions<MessageBusConfiguration> Options  get; 

    protected static string GetEntityPath<TMessage>() where TMessage : class
    
        var messageAttribute = typeof(TMessage).GetCustomAttribute<AbstractMessageAttribute>();

        if (messageAttribute == null)
        
            throw new ArgumentException($@"Message requires nameof(AbstractMessageAttribute)");
        

        return messageAttribute.EntityName;
    

    protected TClient CreateClientEntity<TMessage, TClient>(ClientFactory<TClient> clientFactory)
        where TMessage : class
    
        var entityPath = GetEntityPath<TMessage>();
        var retryPolicy = CreateRetryPolicy();

        return clientFactory(Options.Value.ConnectionString, entityPath, retryPolicy);
    

    protected static IQueueClient QueueClientFactory(string connectionString, string entityPath,
        RetryPolicy retryPolicy)
    
        return new QueueClient(connectionString, entityPath, retryPolicy: retryPolicy);
    

    private static RetryPolicy CreateRetryPolicy()
    
        return new RetryExponential(MinimumRetryBackOff, MaximumRetryBackOff, MaximumRetryCount);
    


internal sealed class SenderClientWrapperFactory : AbstractClientWrapperFactory, ISenderClientWrapperFactory

    private readonly IMessageConverterFactory _messageConverterFactory;

    public SenderClientWrapperFactory(IMessageConverterFactory messageConverterFactory,
        IOptions<MessageBusConfiguration> options) : base(options)
    
        _messageConverterFactory = messageConverterFactory;
    

    public ISenderClientWrapper<TEvent> CreateTopicClient<TEvent>() where TEvent : class, IEvent
    
        return CreateWrapper<TEvent, ITopicClient>(TopicClientFactory);
    

    public ISenderClientWrapper<TRequest> CreateQueueClient<TRequest>() where TRequest : class, IRequest
    
        return CreateWrapper<TRequest, IQueueClient>(QueueClientFactory);
    

    private ISenderClientWrapper<TMessage> CreateWrapper<TMessage, TClient>(ClientFactory<TClient> clientFactory)
        where TMessage : class, ICorrelative
        where TClient : ISenderClient
    
        var clientEntity = CreateClientEntity<TMessage, TClient>(clientFactory);
        var messageConverter = _messageConverterFactory.Create<TMessage>();

        return new SenderClientWrapper<TMessage>(clientEntity, messageConverter);
    

    private static ITopicClient TopicClientFactory(string connectionString, string entityPath,
        RetryPolicy retryPolicy)
    
        return new TopicClient(connectionString, entityPath, retryPolicy);
    


internal sealed class ReceiverClientWrapperFactory : AbstractClientWrapperFactory, IReceiverClientWrapperFactory

    private readonly IMessageConverterFactory _messageConverterFactory;
    private readonly ILogger<ReceiverClientWrapperFactory> _logger;

    public ReceiverClientWrapperFactory(IOptions<MessageBusConfiguration> options,
        IMessageConverterFactory messageConverterFactory,
        ILogger<ReceiverClientWrapperFactory> logger) : base(options)
    
        _messageConverterFactory = messageConverterFactory;
        _logger = logger;
    

    public IReceiverClientWrapper<TEvent> CreateTopicClient<TEvent>() where TEvent : class, IEvent
    
        return CreateReceiverClientWrapper<TEvent, ISubscriptionClient>(SubscriptionClientFactory);
    

    public IReceiverClientWrapper<TRequest> CreateQueueClient<TRequest>() where TRequest : class, IRequest
    
        return CreateReceiverClientWrapper<TRequest, IQueueClient>(QueueClientFactory);
    

    private IReceiverClientWrapper<TMessage> CreateReceiverClientWrapper<TMessage, TClient>(
        ClientFactory<TClient> clientFactory)
        where TMessage : class, ICorrelative
        where TClient : IReceiverClient
    
        var clientEntity = CreateClientEntity<TMessage, TClient>(clientFactory);
        var messageConverter = _messageConverterFactory.Create<TMessage>();

        return new ReceiverClientWrapper<TMessage>(clientEntity, messageConverter, _logger,
            Options.Value.MaximumConcurrency);
    

    private ISubscriptionClient SubscriptionClientFactory(string connectionString, string entityPath,
        RetryPolicy retryPolicy)
    
        return new SubscriptionClient(connectionString, entityPath, Options.Value.SubscriberName,
            retryPolicy: retryPolicy);
    


internal sealed class RequestService<TRequest> : IRequestService<TRequest> where TRequest : class, IRequest

    private readonly Lazy<ISenderClientWrapper<TRequest>> _senderClient;
    private readonly Lazy<IReceiverClientWrapper<TRequest>> _receiverClient;

    public RequestService(ISenderClientWrapperFactory senderClientWrapperFactory,
        IReceiverClientWrapperFactory receiverClientWrapperFactory)
    
        _senderClient =
            new Lazy<ISenderClientWrapper<TRequest>>(senderClientWrapperFactory.CreateQueueClient<TRequest>,
                LazyThreadSafetyMode.PublicationOnly);

        _receiverClient
            = new Lazy<IReceiverClientWrapper<TRequest>>(receiverClientWrapperFactory.CreateQueueClient<TRequest>,
                LazyThreadSafetyMode.PublicationOnly);
    

    public Task PublishRequestAsync(TRequest requestMessage, CancellationToken cancellationToken = default)
    
        return _senderClient.Value.SendAsync(requestMessage, cancellationToken);
    

    public Task PublishRequestAsync(IEnumerable<TRequest> requestMessages,
        CancellationToken cancellationToken = default)
    
        return _senderClient.Value.SendAsync(requestMessages, cancellationToken);
    

    public Task SubscribeAsync(OnRequestCallback<TRequest> onRequestCallback, OnFailureCallback onFailureCallback,
        CancellationToken cancellationToken = default)
    
        return _receiverClient
            .Value
            .SubscribeAsync((message, token) => onRequestCallback(message, cancellationToken), onFailureCallback,
                cancellationToken);
    

    public async ValueTask DisposeAsync()
    
        if (_senderClient.IsValueCreated)
        
            await _senderClient.Value.DisposeAsync();
        

        if (_receiverClient.IsValueCreated)
        
            await _receiverClient.Value.DisposeAsync();
        
    

    public Task ThrowIfNotReadyAsync(CancellationToken cancellationToken = default)
    
        return _senderClient.Value.SendAsync(ImmutableArray<TRequest>.Empty, cancellationToken);
    

【问题讨论】:

我认为您的 SAS 令牌即将过期。您应该尝试生成更长的 SAS SAS 策略在 Azure 服务总线中创建并且没有过期。在我不知道的 SAS 政策之上是否还有另一层租赁? 你是如何创建 SAS 的?每个 SAS 都有一个到期时间。 我正在使用 Azure 服务总线的 SAS 策略管理来生成策略并获取密钥。密钥没有过期。我将 SAS 密钥提供给 Microsoft.Azure.ServiceBus 包(版本 4.1.3.0,从 Nuget 检索)以进行连接。除此之外,我无法控制。 哦,好吧。这似乎没问题。然后,您可以尝试使用连接字符串,或者在运行时使用 MSI 生成令牌凭据。 【参考方案1】:

使用original Nuget package 从未解决此问题,但新的Azure.Messaging.ServiceBus package 似乎没有问题。我已经选择了。

【讨论】:

以上是关于使用 Azure 服务总线队列侦听器定期在 RenewToken 上接收未经授权的访问错误的主要内容,如果未能解决你的问题,请参考以下文章

如何配置 Azure 服务总线队列以将消息推送到客户端而不进行轮询?

Azure 服务总线队列以并行方式异步处理消息

为服务总线中继绑定设置 Azure Service Fabric 侦听器

使用订阅的 Azure 服务总线队列

Azure 服务总线队列性能

Azure 服务总线触发队列在本地调试中未命中代码