如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器

Posted

技术标签:

【中文标题】如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器【英文标题】:How to register a generic consumer adapter in MassTransit if I have a list of message types 【发布时间】:2019-03-19 04:38:31 【问题描述】:

我成功地将 MassTransit 用于一个愚蠢的示例应用程序,在该示例应用程序中,我从 Publisher 控制台应用程序发布了一条消息(一个事件),并在两个不同的消费者处接收它,这两个消费者也是使用 RabbitMq 的控制台应用程序。

这是整个示例项目 git repo: https://gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus

我想要一个包含 MassTransit 功能的项目,以便我的 Publisher 和 Consumers 项目对 MassTransit 一无所知。依赖项应该朝这个方向发展:

DiDrDe.MessageBus ==> 大众运输 DiDrDe.MessageBus ==> DiDrDe.Contracts DiDrDe.Model ==> DiDrDe.Contracts DiDrDe.Publisher ==> DiDrDe.MessageBus DiDrDe.Publisher ==> DiDrDe.Contracts DiDrDe.Publisher ==> DiDrDe.Model DiDrDe.ConsumerOne ==> DiDrDe.Contracts DiDrDe.ConsumerOne ==> DiDrDe.MessageBus DiDrDe.ConsumerOne ==> DiDrDe.Model DiDrDe.ConsumerTwo ==> DiDrDe.Contracts DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus DiDrDe.ConsumerTwo ==> DiDrDe.Model

请注意 DiDrDe.MessageBus 对 DiDrDe.Model 一无所知,因为它是一个通用项目,应该对任何消息类型都有效。

为了实现这一点,我正在实现适配器模式,以便我的自定义接口IEventDtoBus(用于发布事件)和IEventDtoHandler<TEventDto>(用于消费事件)都是我的发布者和消费者所知道的。 MassTransit 包装器项目(称为 DiDrDe.MessageBus)使用由 IEventDtoBusEventDtoHandlerAdapter<TEventDto> 组成的 EventDtoBusAdapter 作为我唯一由 IEventDtoHandler<TEventDto> 组成的通用 IConsumer<TEventDto> 来实现适配器

我遇到的问题是 MassTransit 要求注册消费者的方式,因为我的消费者是通用消费者,并且 MassTransit 包装器在编译时不应该知道它的类型。

我需要找到一种方法将EventDtoHandlerAdapter<TEventDto> 注册为我在运行时传递的每个 TEventD 类型的消费者(例如,作为类型的集合)。 请查看我的存储库了解所有详细信息。

MassTransit 支持接受类型的重载方法(很好!正是我想要的),但它还需要第二个参数 Func<type, object> consumerFactory,我不知道如何实现它。

更新 1问题是我无法像这样注册这个通用消费者:

consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>();

因为我得到一个编译错误

严重性代码描述项目文件行抑制状态 错误 CS0310 'EventDtoHandlerAdapter' 必须是 具有公共无参数构造函数的非抽象类型,以便 在泛型类型或方法中将其用作参数“TConsumer” 'ConsumerExtensions.Consumer(IReceiveEndpointConfigurator, Action>)' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs

更新 2:我已经尝试了几件事,并在我的 repo 上更新了项目。这些是我在 MassTransit 包装器项目中的尝试。请注意,如果我向要处理的每条消息(事件)添加依赖项,我是如何让一切正常工作的。但我不希望那样......我不希望这个项目知道任何关于它可以处理的消息的信息。如果我能注册只知道消息类型的消费者就好了..

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>

    //THIS WORKS
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));

    // DOES NOT WORK
    //var typeEventDtoHandler = typeof(IEventDtoHandler<>).MakeGenericType(typeof(ThingHappened));
    //var eventDtoHandler = context.Resolve(typeEventDtoHandler);
    //consumer.Consumer(eventDtoHandler);

    // DOES NOT WORK
    //consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);

    // DOES NOT WORK
    //var consumerGenericType = typeof(IConsumer<>);
    //var consumerThingHappenedType = consumerGenericType.MakeGenericType(typeof(ThingHappened));
    //consumer.Consumer(consumerThingHappenedType,  null);
);

更新 3: 按照 Igor 的建议,我尝试执行以下操作:

var adapterType = typeof(EventDtoHandlerAdapter<>).MakeGenericType(typeof(ThingHappened));
                            consumer.Consumer(adapterType, (Type x) => context.Resolve(x));

但我收到一个运行时错误提示

请求的服务 'DiDrDe.MessageBus.EventDtoHandlerAdapter`1[[DiDrDe.Model.ThingHappened, DiDrDe.Model,版本=1.0.0.0,文化=中性,PublicKeyToken=null]]' 尚未注册。为避免此异常,请注册一个 提供服务的组件,检查服务注册使用 IsRegistered(),或使用 ResolveOptional() 方法来解决 可选依赖。

我什至尝试将EventDtoHandlerAdapter&lt;&gt; 单独注册为 IConsumer,以防万一这是问题但没有运气。

builder
    .RegisterGeneric(typeof(EventDtoHandlerAdapter<>))
    .As(typeof(IConsumer<>))
    .SingleInstance();

还有:

builder
  .RegisterType<EventDtoHandlerAdapter<ThingHappened>>()
  .AsSelf();

它告诉我

System.ObjectDisposedException: '这个解析操作已经 结束了。使用 lambda 注册组件时, 无法存储 lambda 的 IComponentContext 'c' 参数。 相反,要么从“c”再次解析 IComponentContext,要么解析一个 基于 Func 的工厂从

创建后续组件

澄清一下,我唯一需要注册的消费者是我的EventDtoHandlerAdapter&lt;TEventDto&gt;。它是通用的,所以本质上它会存在我支持的每个 TEventDto 的注册。问题是我不需要提前类型,所以我需要使用类型。

更新 4: Igor 建议的新尝试。这次是“代理”。我已经用最后一次尝试更新了我的回购所有细节。 我有我的消费者和标记界面:

public interface IEventDtoHandler



public interface IEventDtoHandler<in TEventDto>
    : IEventDtoHandler
        where TEventDto : IEventDto

    Task HandleAsync(TEventDto eventDto);

我自己实现了一个对 MassTransit 一无所知的消费者:

public class ThingHappenedHandler
    : IEventDtoHandler<ThingHappened>

    public Task HandleAsync(ThingHappened eventDto)
    
        Console.WriteLine($"Received eventDto.Name " +
                            $"eventDto.Description at consumer one that uses an IEventDtoHandler");
        return Task.CompletedTask;
    

现在我的“包装消费者”就是我所说的适配器,因为它知道 MassTransit(它实现了 MassTransit IConsumer)。

public class EventDtoHandlerAdapter<TConsumer, TEventDto>
    : IConsumer<TEventDto>
        where TConsumer : IEventDtoHandler<TEventDto>
        where TEventDto : class, IEventDto

    private readonly TConsumer _consumer;

    public EventDtoHandlerAdapter(TConsumer consumer)
    
        _consumer = consumer;
    

    public async Task Consume(ConsumeContext<TEventDto> context)
    
        await _consumer.HandleAsync(context.Message);
    

现在最后一步是向 MassTransit 注册我的“包装消费者”。但由于它是通用的,我不知道该怎么做。这就是问题所在。

我可以按照以下建议在 Autofac 中扫描和注册我的所有消费者类型:

var interfaceType = typeof(IEventDtoHandler);
var consumerTypes =
    AppDomain.CurrentDomain.GetAssemblies()
        .SelectMany(x => x.GetTypes())
        .Where(x => interfaceType.IsAssignableFrom(x)
                    && !x.IsInterface
                    && !x.IsAbstract)
        .ToList();

所以现在我有了所有的消费者类型(IEventDtoHandler 的所有实现,包括我的ThingHappenedHandler)。现在怎么办?如何注册?

类似以下的不起作用

foreach (var consumerType in consumerTypes)

    consumer.Consumer(consumerType, (Type x) => context.Resolve(x));

但是我想不正常是正常的,因为我要注册的是我的EventDtoHandlerAdapter,也就是真正的IConsumer

所以,我想我没有理解你的建议。对不起!

我需要的是这样的:

//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<IEventDtoHandler<ThingHappened>, ThingHappened>(eventDtoHandler));

但不使用 ThingHappened 模型,因为该模型不应该是已知的。这是我仍然卡住的地方

更新 5: Chris Patterson 建议的新尝试(他的解决方案已合并到我的仓库中的 master 中),但问题仍然存在。

为了澄清,DiDrDe.MessageBus 必须与任何发布者、消费者和模型无关。它应该只依赖于 MassTransit 和 DiDrDe.Contracts,而 Chris 的解决方案有这样一行:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>

    consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
);

这直接依赖于ThingHappened 模型。这是不允许的,它实际上与我已经拥有的解决方案没有太大区别:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>

    //THIS works, but it uses ThingHappened explicitly and I don't want that dependency
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
);

抱歉,如果这不够清楚,但 DiDrDe.MessageBus 最终将成为许多不同的消费者和发布者项目之间共享的 nuGet 包,它不应该对任何特定的消息/模型有任何依赖。

更新 6: 问题已解决。非常感谢 Igor 和 Chris 的时间和帮助。 我已将解决方案推送到我的仓库中。

PS:不幸的是,当我在同一个消费者中有两个处理程序处理同一个事件时,此解决方案有其局限性,因为似乎只有一个处理程序正在执行(两次)。我希望两个处理程序都被执行或只执行一个,但只执行一次(不是两次)。但这已经是另一个主题了:)

【问题讨论】:

你看过builder.RegisterGeneric吗? 是的,我有。不知道 RegisterGeneric 会如何玩这个。请参阅更新 2 @iberodev 我也在寻找类似的方法。我可以使用你的 gitlab 项目吗?它是否完整或存在任何已知问题?如果是,你能更新你拥有的最新代码吗? @venkat.bommina 随意克隆我的 repo 并根据您的需要进行调整 【参考方案1】:

在此处查看自定义消费者约定:https://github.com/MassTransit/MassTransit/tree/develop/src/MassTransit.Tests/Conventional

创建您自己的 IMyConsumerInterface,IMyMessageInterface 将其插入该测试的代码中。 在创建总线之前注册它ConsumerConvention.Register&lt;CustomConsumerConvention&gt;();。它应该可以工作。

此外,您可以围绕消费者上下文创建自己的包装器,并将其与消息一起传递。

LoadFrom (MassTransit.AutofacIntegration) 不适用于我的自定义约定,因此我必须手动注册消费者

foreach (var consumer in consumerTypes)
  cfg.Consumer(consumer, (Type x) => _container.Resolve(x));

或者如果您想使用“代理”方法,请执行以下操作:

public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
    where TMessage : class, IMyMessageInterface 
    where TConsumer : IMyConsumerInterface<TMessage>

    private readonly TConsumer _consumer;

    public WrapperConsumer(TConsumer consumer)
    
        _consumer = consumer;
    

    public Task Consume(ConsumeContext<TMessage> context)
    
        return _consumer.Consume(context.Message);
    

...

// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));


适用于这两种方法的附加代码
// marker interface
public interface IMyConsumerInterface



public interface IMyConsumerInterface<T> : IMyConsumerInterface
    where T : IMyMessageInterface 

    Task Consume(T message);


...

builder.RegisterAssemblyTypes(ThisAssembly)
           .AssignableTo<IMyConsumerInterface>()
           .AsSelf()
           .As<IMyConsumerInterface>();

...

var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
    .Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
    .ToList();


回复:更新 5
builder.Register(context =>

    var ctx = context.Resolve<IComponentContext>();
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    
        cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
        
            foreach (var adapterType in adapterTypes)
                consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
        );
    );
    return busControl;
)

【讨论】:

我正在努力理解您的评论。请参阅我的更新 2。您在 consumerTypes 中有什么? IConsumer 的类型? (在我的情况下是 eventDtoHandlerAdapter?)如果是这样,我是否必须使用以下内容进行显式注册:builder .RegisterGeneric(typeof(EventDtoHandlerAdapter&lt;&gt;))As(typeof(IConsumer&lt;&gt;)).InstancePerDependency(); 不确定我理解,抱歉 请参阅更新 3。我尝试了您的第二个建议,但没有任何运气。它在运行时失败,告诉我我的消费者尚未在 Autofac 中注册。 我的回答是完全改变您的设计并使用自定义消费者约定抽象出公共交通。如果你想要替代品,我会更新我的答案。 (re: UPDATE 5, can't comment on Chris' answer) 你快到了,而不是consumer.Consumer&lt;EventDtoHandlerAdapter&lt;ThingHappened&gt;&gt;(context); 使用foreach (var adapterType in adapterTypes) consumer.Consumer(adapterType, (Type type) =&gt; ctx.Resolve(adapterType));。你不需要模型只是合同。删除模型并添加合同 你真的不应该让每个处理程序都使用同一个队列,它们是不相关的命令或事件,随着负载的增加,从一个队列中处理它们最终会成为一个问题。【参考方案2】:

我向您的问题提交了一个对我有用的拉取请求,消费者从没有问题开始。

如果需要,您可以扩展它以包括所有消费者在他们自己的端点上的自动注册。

诀窍是使用适当的通用接口类型正确注册处理程序类型。有点类型的机制,但它对我有用。

【讨论】:

谢谢你,克里斯。合并为大师。我在您的解决方案中看到的唯一问题是 DiDrDe.MessageBus 依赖于 DiDrDe.Model 并且这对于我想要的来说是不允许的。最终 DiDrDe.MessageBus 将成为一个对任何模型一无所知的独立 NuGet 包。只是关于 DiDrDe.Contracts 和 MassTransit。所以consumer.Consumer&lt;EventDtoHandlerAdapter&lt;ThingHappened&gt;&gt;(context);这行还是有问题的。 问题依然存在。请参阅更新 5。实际上,根据更新 4,我已经有一个不足的“解决方案”,但它的价格是 DiDrDe.MessageBus 直接取决于 DiDrDe.Model,我不希望这样。 感谢您的回答,克里斯为我指出了正确的方向,我已经解决了这个问题。 可能this right here 注册您自己的约定。 @kuldeep 在我的仓库gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus/-/tree/master/…

以上是关于如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器的主要内容,如果未能解决你的问题,请参考以下文章

MetaTrader5 中不同类型的变量参数列表

GCP PubSub:通过 CURL 类型的请求发布消息

如何在单个数据表中显示来自不同对象类型的不同值?

聊天视图中的重复消息。如何清除视图?

如果两列在其行中具有相等的值,我如何实现对话框消息?

如何在.net中获取类型的定义运算符