MassTransit:发布者不再工作。为啥?
Posted
技术标签:
【中文标题】MassTransit:发布者不再工作。为啥?【英文标题】:MassTransit: publisher no longer work. Why?MassTransit:发布者不再工作。为什么? 【发布时间】:2022-01-24 06:46:17 【问题描述】:在之前的项目中,以下代码正在运行。 但现在我需要使用 MultiBus: - EventsConsumer 位于当前项目中。 - 支付消费者位于外部项目中。 为什么这种方法不再有效?
=================== 错误:
内部异常 1: InvalidOperationException:验证服务描述符“ServiceType:Publisher.Services.Abstract.IBusPublisher Lifetime:Singleton ImplementationType:Publisher.Services.Concrete.BusPublisher”时出错:尝试激活“Publisher”时无法解析“MassTransit.IBus”类型的服务。 Services.Concrete.BusPublisher'。
内部异常 2: InvalidOperationException:尝试激活“Publisher.Services.Concrete.BusPublisher”时无法解析“MassTransit.IBus”类型的服务。
// =================== Startup
using System;
using MassTransit;
using MassTransit.MultiBus;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Microsoft.OpenApi.Models;
using CommonTypes.Options;
using Publisher.Consumers;
using Publisher.Services.Abstract;
using Publisher.Services.Concrete;
namespace Publisher
public class Startup
public Startup(IConfiguration configuration)
Configuration = configuration;
public interface IEventsBus : IBus
public interface IPaymentBus : IBus
public IConfiguration Configuration get;
public void ConfigureServices(IServiceCollection services)
services.AddControllers();
services.AddSwaggerGen(c =>
c.SwaggerDoc("v1", new OpenApiInfo Title = "Publisher", Version = "v1" );
);
#region MassTransit
services.AddSingleton<IBusPublisher, BusPublisher>();
services.Configure<EventsBusOptions>(Configuration.GetSection("EventsBusOptions"));
services.Configure<PaymentBusOptions>(Configuration.GetSection("PaymentBusOptions"));
services.AddScoped<EventsConsumer>();
// EventsConsumer located into current project
services.AddMassTransit<IEventsBus>(x =>
x.AddConsumer<EventsConsumer>();
x.UsingRabbitMq((context, cfg) =>
var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;
cfg.Host(new Uri(_options.HostUri), h =>
h.Username(_options.UserName);
h.Password(_options.Password);
);
cfg.ReceiveEndpoint(_options.QueueName, ep =>
ep.PrefetchCount = _options.PrefetchCount ?? 15;
ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;
ep.ConfigureConsumer<EventsConsumer>(context);
);
cfg.ConfigureEndpoints(context);
);
);
// Consumer for Payment is located in outter project
// Did I properly describe Bus for outter consumer?
services.AddMassTransit<IPaymentBus>(x =>
x.UsingRabbitMq((context, cfg) =>
var _options = context.GetRequiredService<IOptions<PaymentBusOptions>>().Value;
cfg.Host(new Uri(_options.HostUri), h =>
h.Username(_options.UserName);
h.Password(_options.Password);
);
cfg.ReceiveEndpoint(_options.QueueName, ep =>
ep.PrefetchCount = _options.PrefetchCount ?? 15;
ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;
);
cfg.ConfigureEndpoints(context);
);
);
services.AddMassTransitHostedService();
services.AddGenericRequestClient();
#endregion
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
if (env.IsDevelopment())
app.UseDeveloperExceptionPage();
app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Publisher v1"));
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
endpoints.MapControllers();
);
// =================== interface
using System.Threading.Tasks;
namespace Publisher.Services.Abstract
public interface IBusPublisher
Task Publish<Tin>(Tin request) where Tin : class;
Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class;
// =================== class
using System;
using MassTransit;
using System.Threading.Tasks;
using Publisher.Services.Abstract;
using Publisher.Contracts;
using Microsoft.Extensions.DependencyInjection;
namespace Publisher.Services.Concrete
public class BusPublisher : IBusPublisher
readonly IServiceProvider _provider;
readonly IBus _bus;
public BusPublisher(IServiceProvider provider, IBus bus)
_provider = provider;
_bus = bus;
public async Task Publish<Tin>(Tin request) where Tin : class
try
await _bus.Publish(request);
catch (Exception ex)
Console.WriteLine(ex.Message);
throw;
public async Task<Tout> GetResponse<Tin, Tout>(Tin request)
where Tin : class
where Tout : class
try
using (var _scope = _provider.CreateScope())
var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();
var response = await client.GetResponse<Tout>(request);
return response.Message;
catch (Exception ex)
Console.WriteLine(ex.Message);
throw;
【问题讨论】:
【参考方案1】:我已经解决了我的问题。现在我可以向 EventsConsumer 发布消息了。 但是我在第二个总线队列(IPaymentBus)中看不到消息。 也许我犯了一些错误。请帮我在两个队列中发布消息。
//// ==================================== BusPublisher
namespace Publisher.Services.Concrete
public class BusPublisher : IBusPublisher
readonly IServiceProvider _provider;
readonly IPublishEndpoint _bus;
public BusPublisher(IServiceProvider provider, IPublishEndpoint bus)
_provider = provider;
_bus = bus;
public async Task Publish<Tin>(Tin request) where Tin : class
try
await _bus.Publish(request);
catch (Exception ex)
Console.WriteLine(ex.Message);
throw;
public async Task<Tout> GetResponse<Tin, Tout>(Tin request)
where Tin : class
where Tout : class
try
using (var _scope = _provider.CreateScope())
var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();
var response = await client.GetResponse<Tout>(request);
return response.Message;
catch (Exception ex)
Console.WriteLine(ex.Message);
throw;
//// ==================================== Startup
namespace Publisher
public class Startup
public Startup(IConfiguration configuration)
Configuration = configuration;
public IConfiguration Configuration get;
public interface IPaymentBus : IBus
public void ConfigureServices(IServiceCollection services)
....
#region MassTransit
services.Configure<EventsBusOptions>(Configuration.GetSection("EventsBusOptions"));
services.Configure<PaymentBusOptions>(Configuration.GetSection("PaymentBusOptions"));
services.AddScoped<EventsConsumer>();
services.AddScoped<IBusPublisher, BusPublisher>();
// EventsConsumer located into current project
//services.AddMassTransit<IEventsBus>(x =>
services.AddMassTransit(x =>
x.AddConsumer<EventsConsumer>();
x.UsingRabbitMq((context, cfg) =>
var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;
cfg.Host(new Uri(_options.HostUri), h =>
h.Username(_options.UserName);
h.Password(_options.Password);
);
cfg.ReceiveEndpoint(_options.QueueName, ep =>
ep.PrefetchCount = _options.PrefetchCount ?? 15;
ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;
ep.ConfigureConsumer<EventsConsumer>(context);
);
cfg.ConfigureEndpoints(context);
);
);
// Consumer for Payment is located in outter project
// Did I properly describe Bus for outter consumer?
// I need just publish message to queue with _options.QueueName
services.AddMassTransit<IPaymentBus>(x =>
x.UsingRabbitMq((context, cfg) =>
var _options = context.GetRequiredService<IOptions<PaymentBusOptions>>().Value;
cfg.Host(new Uri(_options.HostUri), h =>
h.Username(_options.UserName);
h.Password(_options.Password);
);
cfg.ReceiveEndpoint(_options.QueueName, ep =>
ep.PrefetchCount = _options.PrefetchCount ?? 15;
ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;
);
cfg.ConfigureEndpoints(context);
);
);
services.AddMassTransitHostedService();
services.AddGenericRequestClient();
#endregion
【讨论】:
不要使用答案类型来提出新问题 - 没有人会在其中寻找您的问题。以上是关于MassTransit:发布者不再工作。为啥?的主要内容,如果未能解决你的问题,请参考以下文章
MassTransit / RabbitMQ - 为啥跳过这么多消息?
MassTransit 6.2.3中的可疑ConnectHandler扩展方法问题
如何在实体框架中持久保存 MassTransit 状态数据?