注册IConsumer 使用Autofac在Masstransit中依赖
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了注册IConsumer 使用Autofac在Masstransit中依赖相关的知识,希望对你有一定的参考价值。
我有2个系统,一个用于发布消息,另一个用于消费它们。两者都使用Masstransit(使用RabbitMQ)并使用ASP.Net web api 2和OWIN(以及Autofac作为IoC容器)实现。如果我的消费者没有依赖关系,一切正常,但是当我将依赖注入到我的使用者中时,从不执行Consume方法(在初始化期间没有抛出错误)。
这是相关的发布商代码:
//Startup.cs
public class Startup
{
public void Configuration(IAppBuilder app)
{
HttpConfiguration config = new HttpConfiguration();
IContainer container = null;
var builder = new ContainerBuilder();
builder.Register(context =>
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings =>
{
settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]);
settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]);
});
});
return busControl;
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();
// Register Web API controllers
builder.RegisterApiControllers(Assembly.GetExecutingAssembly());
// Resolve dependencies
container = builder.Build();
config.DependencyResolver = AutofacWebApiDependencyResolver(container);
WebApiConfig.Register(config);
SwaggerConfig.Register(config);
app.UseCors(CorsOptions.AllowAll);
// Register the Autofac middleware FIRST.
app.UseAutofacMiddleware(container);
app.UseWebApi(config);
// Starts MassTransit Service bus, and registers stopping of bus on app dispose
var bus = container.Resolve<IBusControl>();
var busHandle = bus.StartAsync();
var properties = new AppProperties(app.Properties);
if (properties.OnAppDisposing != CancellationToken.None)
{
properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30)));
}
}
}
// Controller
public IHttpActionResult Post()
{
_bus.Publish<IFooMessage>(new
{
Foo = "Foo"
});
return Ok();
}
这是相关的消费者代码:
// Startup.cs
public class Startup
{
public void Configuration(IAppBuilder app)
{
HttpConfiguration config = new HttpConfiguration();
IContainer container = null;
var builder = new ContainerBuilder();
builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest();
builder.RegisterModule<BusModule>();
builder.RegisterModule<ConsumersModule>();
// Register Web API controllers
builder.RegisterApiControllers(Assembly.GetExecutingAssembly());
// Resolve dependencies
container = builder.Build();
config.DependencyResolver = new AutofacWebApiDependencyResolver(container);
WebApiConfig.Register(config);
SwaggerConfig.Register(config);
app.UseCors(CorsOptions.AllowAll);
// Register the Autofac middleware FIRST.
app.UseAutofacMiddleware(container);
app.UseWebApi(config);
// Starts MassTransit Service bus, and registers stopping of bus on app dispose
var bus = container.Resolve<IBusControl>();
var busHandle = bus.StartAsync();
var properties = new AppProperties(app.Properties);
if (properties.OnAppDisposing != CancellationToken.None)
{
properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30)));
}
}
}
// BusModule.cs
public class BusModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.Register(context =>
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings =>
{
settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]);
settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]);
});
cfg.ReceiveEndpoint(rabbitMqHost, "IP.AgilePoint.queue", ec =>
{
ec.LoadFrom(context);
});
});
return busControl;
})
.SingleInstance()
.As<IBusControl>()
.As<IBus>();
}
}
// ConsumerModule.cs
public class ConsumersModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterType<FooConsumer>();
}
}
// FooConsumer.cs
public class FooConsumer : IConsumer<IFooMessage>
{
private IFooService _service;
public FooConsumer(IFooService service)
{
_service = service;
}
public Task Consume(ConsumeContext<IFooMessage> context)
{
IFooMessage @event = context.Message;
_service.DoStuff(@event.Foo);
return Task.FromResult(context.Message);
}
}
注意我的FooConsumer在IFooService上有一个依赖项(构造函数)。我跟着Masstransit documentation,但我不能让它工作。我究竟做错了什么?
框架版本:
- .Net Framework 4.6.1
- Autofac:3.5.2
- Masstransit:3.5.7
更新:
我建议查看特定于Autofac的文档,它是通过扩展库完全支持的容器。
http://masstransit-project.com/MassTransit/usage/containers/autofac.html
包裹:https://www.nuget.org/packages/masstransit.autofac
最后我发现了我做错了什么。出于某种原因,我无法在RabbitMQ Management中看到我的队列。当我能够看到错误队列时,我注意到de follow错误:
我以这种方式注册我的IFooService:
builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest();
InstancePerRequest()
导致错误。如果我用builder.RegisterType<FooService>().As<IFooService>()
注册服务一切正常。我认为这是因为我的总线实例是单身(注册为SingleIsntace()
)。使用web api项目来托管我的公共汽车/消费者这一事实让我感到困惑。
无论如何,感谢@Chris Patterson和@Alexey Zimarev指出我正确的实施方向(使用MT扩展来注册消费者)。
以上是关于注册IConsumer 使用Autofac在Masstransit中依赖的主要内容,如果未能解决你的问题,请参考以下文章