masstransit请求/响应:在消费者中获取调用者超时

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了masstransit请求/响应:在消费者中获取调用者超时相关的知识,希望对你有一定的参考价值。

去年我开始在Akka.NET中使用actor模型。现在我开始使用MassTransit(v3.5.7)和RabbitMQ,我真的很喜欢这两个!

在请求/响应场景中,我的请求使用者通过将请求包装在新消息中并要求actor执行实际作业来执行其业务逻辑。所以基本上消费者等待演员的Ask方法。此(扩展)方法接受消息和超时作为参数。我想使用请求发起者使用的相同超时值。

是否有一种简单的方法可以在消费者上下文中获取调用者使用的原始超时,以便将其传递给actor的Ask方法? 注意:我想避免将超时添加到请求接口。

答案

最后我找到了解决方案!这很简单(一旦调查了MassTransit源代码:-)并且对我有用,但如果有人有一些建议或提示,请告诉我。

所以,基本上我为MassTransit创建了一个支持库,在那里我添加了一个带有两个扩展方法的类:

CreateRequestClientWithTimeoutHeader()方法创建一个客户端,并在消息头中存储传递的超时(以秒为单位)的字符串表示形式。这将由客户使用。

GetClientTimeout()方法从邮件头中检索值并将其转换为TimeSpan。这将在消费者中使用。

这是代码:

public static class MassTransitExtMethods
{
    private const string ClientTimeoutHeaderKey = "__ClientTimeout__";

    public static IRequestClient<TRequest, TResponse> CreateRequestClientWithTimeoutHeader<TRequest, TResponse>
    (
        this IBus bus,
        Uri address,
        TimeSpan timeout,
        TimeSpan? ttl = default(TimeSpan?), 
        Action<SendContext<TRequest>> callback = null
    )
        where TRequest : class
        where TResponse : class
    {
        return
            bus
            .CreateRequestClient<TRequest, TResponse>
            (
                address,
                timeout,
                ttl,
                context =>
                {
                    context
                    .Headers
                    .Set
                    (
                        ClientTimeoutHeaderKey,
                        timeout.TotalSeconds.ToString(CultureInfo.InvariantCulture)
                    );

                    callback?.Invoke(context);
                }
            );
    }

    public static TimeSpan? GetClientTimeout(this ConsumeContext consumeContext)
    {
        string headerValue =
            consumeContext
            .Headers
            .Get<string>(ClientTimeoutHeaderKey);

        if (string.IsNullOrEmpty(headerValue))
        {
            return null;
        }

        double timeoutInSeconds;

        if (double.TryParse(headerValue, NumberStyles.Any, CultureInfo.InvariantCulture, out timeoutInSeconds))
        {
            return TimeSpan.FromSeconds(timeoutInSeconds);
        }

        return null;
    }
}

要使用它,请使用新的扩展方法创建客户端:

var client =
    mybus
    .CreateRequestClientWithTimeoutHeader<IMyRequest, IMyResponse>
    (
        new Uri(serviceAddress),
        TimeSpan.FromSeconds(10.0)
    );

这是一个使用Akka.NET actor的消费者的一个非常简单的例子,它实现了业务逻辑(请注意,实现并不完整):

public class MyReqRespProcessor : IConsumer<IMyRequest>
{
    private readonly IActorRef _myActor;

    public async Task Consume(ConsumeContext<IMyRequest> context)
    {
        TimeSpan? clientTimeout = context.GetClientTimeout();

        var response = await 
            _myActor
            .Ask<IMyResponse>(context.Message, clientTimeout ?? PredefinedTimeout)
            .ConfigureAwait(false);

        await
            context
            .RespondAsync<IMyResponse>(response)
            .ConfigureAwait(false); 
    }
}

在真实场景中,对于大量请求,actor可以是router,根据端点配置(例如预取计数值)进行配置。

我知道这不是一个完美的解决方案,但它有助于在服务器端提供最大处理时间的度量。在网络延迟的情况下,客户端可以在actor停止处理请求之前接收超时。无论如何,演员最多会在客户指定的时间内处理该请求。这就是我想要伸出的东西。

以上是关于masstransit请求/响应:在消费者中获取调用者超时的主要内容,如果未能解决你的问题,请参考以下文章

MassTransit 使用队列中的所有消息

普通消费者可以在 MassTransit 中重试消息之前延长超时时间吗?

在 MasstTransit 中发送、发布和请求/响应

MassTransit:在消费者消费完所有消息后如何停止公共汽车?

如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器

无法使用 MassTransit 在 Windows 服务应用程序 (.Net Core 3.1) 中向消费者注入服务