MassTransit:发布者不再工作。为啥?

Posted

技术标签:

【中文标题】MassTransit:发布者不再工作。为啥?【英文标题】:MassTransit: publisher no longer work. Why?MassTransit:发布者不再工作。为什么? 【发布时间】:2022-01-24 06:46:17 【问题描述】:

在之前的项目中,以下代码正在运行。 但现在我需要使用 MultiBus: - EventsConsumer 位于当前项目中。 - 支付消费者位于外部项目中。 为什么这种方法不再有效?

=================== 错误:

内部异常 1: InvalidOperationException:验证服务描述符“ServiceType:Publisher.Services.Abstract.IBusPublisher Lifetime:Singleton ImplementationType:Publisher.Services.Concrete.BusPublisher”时出错:尝试激活“Publisher”时无法解析“MassTransit.IBus”类型的服务。 Services.Concrete.BusPublisher'。

内部异常 2: InvalidOperationException:尝试激活“Publisher.Services.Concrete.BusPublisher”时无法解析“MassTransit.IBus”类型的服务。

// =================== Startup

using System;
using MassTransit;
using MassTransit.MultiBus;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Microsoft.OpenApi.Models;
using CommonTypes.Options;
using Publisher.Consumers;
using Publisher.Services.Abstract;
using Publisher.Services.Concrete;

namespace Publisher

    public class Startup
    
        public Startup(IConfiguration configuration)
        
            Configuration = configuration;
        

        public interface IEventsBus : IBus  
        public interface IPaymentBus : IBus  

        public IConfiguration Configuration  get; 

        public void ConfigureServices(IServiceCollection services)
        
            services.AddControllers();
            services.AddSwaggerGen(c =>
            
                c.SwaggerDoc("v1", new OpenApiInfo  Title = "Publisher", Version = "v1" );
            );

            #region MassTransit 

            services.AddSingleton<IBusPublisher, BusPublisher>();

            services.Configure<EventsBusOptions>(Configuration.GetSection("EventsBusOptions"));
            services.Configure<PaymentBusOptions>(Configuration.GetSection("PaymentBusOptions"));

            services.AddScoped<EventsConsumer>();

            // EventsConsumer located into current project
            services.AddMassTransit<IEventsBus>(x =>
            
                x.AddConsumer<EventsConsumer>();

                x.UsingRabbitMq((context, cfg) =>
                
                    var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;

                    cfg.Host(new Uri(_options.HostUri), h =>
                    
                        h.Username(_options.UserName);
                        h.Password(_options.Password);
                    );

                    cfg.ReceiveEndpoint(_options.QueueName, ep =>
                    
                        ep.PrefetchCount = _options.PrefetchCount ?? 15;
                        ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;

                        ep.ConfigureConsumer<EventsConsumer>(context);
                    );

                    cfg.ConfigureEndpoints(context);
                );
            );

            // Consumer for Payment is located in outter project
            // Did I properly describe Bus for outter consumer?
            services.AddMassTransit<IPaymentBus>(x =>
            
                x.UsingRabbitMq((context, cfg) =>
                
                    var _options = context.GetRequiredService<IOptions<PaymentBusOptions>>().Value;

                    cfg.Host(new Uri(_options.HostUri), h =>
                    
                        h.Username(_options.UserName);
                        h.Password(_options.Password);
                    );


                    cfg.ReceiveEndpoint(_options.QueueName, ep =>
                    
                        ep.PrefetchCount = _options.PrefetchCount ?? 15;
                        ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;
                    );

                    cfg.ConfigureEndpoints(context);
                );

            );

            services.AddMassTransitHostedService();
            services.AddGenericRequestClient();

            #endregion
        

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        
            if (env.IsDevelopment())
            
                app.UseDeveloperExceptionPage();
                app.UseSwagger();
                app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Publisher v1"));
            

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            
                endpoints.MapControllers();
            );
        
    



// =================== interface 


using System.Threading.Tasks;

namespace Publisher.Services.Abstract

    public interface IBusPublisher
    
        Task Publish<Tin>(Tin request) where Tin : class;

        Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class;
    


// =================== class

using System;
using MassTransit;
using System.Threading.Tasks;
using Publisher.Services.Abstract;
using Publisher.Contracts;
using Microsoft.Extensions.DependencyInjection;

namespace Publisher.Services.Concrete

    public class BusPublisher : IBusPublisher
    
        readonly IServiceProvider _provider;
        readonly IBus _bus;

        public BusPublisher(IServiceProvider provider, IBus bus)
        
            _provider = provider;
            _bus = bus;
        

        public async Task Publish<Tin>(Tin request) where Tin : class
        
            try
            
                await _bus.Publish(request);
            
            catch (Exception ex)
            
                Console.WriteLine(ex.Message);
                throw;
            
        


        public async Task<Tout> GetResponse<Tin, Tout>(Tin request)
            where Tin : class
            where Tout : class
        
            try
            
                using (var _scope = _provider.CreateScope())
                
                    var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();
                    var response = await client.GetResponse<Tout>(request);
                    return response.Message;
                

            
            catch (Exception ex)
            
                Console.WriteLine(ex.Message);
                throw;
            
        
    

【问题讨论】:

【参考方案1】:

我已经解决了我的问题。现在我可以向 EventsConsumer 发布消息了。 但是我在第二个总线队列(IPaymentBus)中看不到消息。 也许我犯了一些错误。请帮我在两个队列中发布消息。

//// ==================================== BusPublisher 
namespace Publisher.Services.Concrete

    public class BusPublisher : IBusPublisher
    
        readonly IServiceProvider _provider;
        readonly IPublishEndpoint _bus;
        

        public BusPublisher(IServiceProvider provider, IPublishEndpoint bus)
        
            _provider = provider;
            _bus = bus;
        

        public async Task Publish<Tin>(Tin request) where Tin : class
        
            try
            
                await _bus.Publish(request);
            
            catch (Exception ex)
            
                Console.WriteLine(ex.Message);
                throw;
            
        


        public async Task<Tout> GetResponse<Tin, Tout>(Tin request)
            where Tin : class
            where Tout : class
        
            try
            
                using (var _scope = _provider.CreateScope())
                
                    var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();

                    var response = await client.GetResponse<Tout>(request);

                    return response.Message;
                

            
            catch (Exception ex)
            
                Console.WriteLine(ex.Message);
                throw;
            
        


    


//// ==================================== Startup
namespace Publisher

    public class Startup
    
        public Startup(IConfiguration configuration)
        
            Configuration = configuration;
        
        public IConfiguration Configuration  get; 

        public interface IPaymentBus : IBus  

        public void ConfigureServices(IServiceCollection services)
        
        ....

            #region MassTransit 

            services.Configure<EventsBusOptions>(Configuration.GetSection("EventsBusOptions"));
            services.Configure<PaymentBusOptions>(Configuration.GetSection("PaymentBusOptions"));

            services.AddScoped<EventsConsumer>();
            services.AddScoped<IBusPublisher, BusPublisher>();

            // EventsConsumer located into current project
            //services.AddMassTransit<IEventsBus>(x =>
            services.AddMassTransit(x =>
            
                x.AddConsumer<EventsConsumer>();

                x.UsingRabbitMq((context, cfg) =>
                
                    var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;

                    cfg.Host(new Uri(_options.HostUri), h =>
                    
                        h.Username(_options.UserName);
                        h.Password(_options.Password);
                    );

                    cfg.ReceiveEndpoint(_options.QueueName, ep =>
                    
                        ep.PrefetchCount = _options.PrefetchCount ?? 15;
                        ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;

                        ep.ConfigureConsumer<EventsConsumer>(context);
                    );

                    cfg.ConfigureEndpoints(context);
                );
            );

            // Consumer for Payment is located in outter project
            // Did I properly describe Bus for outter consumer?
            // I need just publish message to queue with _options.QueueName 
            services.AddMassTransit<IPaymentBus>(x =>
            
                x.UsingRabbitMq((context, cfg) =>
                
                    var _options = context.GetRequiredService<IOptions<PaymentBusOptions>>().Value;

                    cfg.Host(new Uri(_options.HostUri), h =>
                    
                        h.Username(_options.UserName);
                        h.Password(_options.Password);
                    );


                    cfg.ReceiveEndpoint(_options.QueueName, ep =>
                    
                        ep.PrefetchCount = _options.PrefetchCount ?? 15;
                        ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;
                    );

                    cfg.ConfigureEndpoints(context);
                );

            );

            services.AddMassTransitHostedService();
            services.AddGenericRequestClient();

            #endregion
        

    

【讨论】:

不要使用答案类型来提出新问题 - 没有人会在其中寻找您的问题。

以上是关于MassTransit:发布者不再工作。为啥?的主要内容,如果未能解决你的问题,请参考以下文章

MassTransit / RabbitMQ - 为啥跳过这么多消息?

MassTransit 6.2.3中的可疑ConnectHandler扩展方法问题

为啥 StopForumSpam nuget 包不再工作?

如何在实体框架中持久保存 MassTransit 状态数据?

为啥在使用扩展语法复制对象后 getter/setter 不再工作?

为啥 Heroku-accounts 插件不再工作,并且我不再能够在 CLI 中的多个帐户之间切换?