在 ASP.Net Core 3 中使用 Rabbitmq 的 RPC
Posted
技术标签:
【中文标题】在 ASP.Net Core 3 中使用 Rabbitmq 的 RPC【英文标题】:RPC using Rabbitmq in ASP.Net Core 3 【发布时间】:2021-06-07 12:19:19 【问题描述】:我想使用单个请求实现多个队列
RPC 服务器.cs
如何从服务器发送多条消息 例如。我创建了实例: `
RPCServer rpcServer1 = new RPCServer();
rpcServer1.PublishMessage("customerContactPersonsList","customerContactPersons");`
`
RPCServer rpcServer2 = new RPCServer();
rpcServer2.PublishMessage("ProductInfoList", "projects");
public void PublishMessage(string message, string rpcQueueName)
var factory = new ConnectionFactory() HostName = "localhost" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare(queue: rpcQueueName, durable: false,
exclusive: true, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: rpcQueueName,
autoAck: false, consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
string response = null;
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
Console.WriteLine(" [.] Response message is)", message);
response = message;
catch (Exception e)
Console.WriteLine(" [.] " + e.Message);
response = "There was a error";
finally
var responseBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange:"topic", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
;
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
接收器.cs ** 接收端是通过队列检索响应的地方,它还调用客户端,客户端基本上发送请求和队列名称作为参数,以从声明相同队列的服务器端获取 json 数据,并且当两个名称队列匹配响应被发送到接收端。**所以例如。这里的项目作为参数是给定的队列名称,服务器端也提到了相同的名称。
var rpcClient = new RpcClient();
var customerContactPersons = await rpcClient.CallAsync("", "customerContactPersons");
var response = await rpcClient.CallAsync("","projects");
var factory = new ConnectionFactory() HostName = "localhost" ;
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out TaskCompletionSource<string> tcs))
return;
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
;
public Task<string> CallAsync(string message, string rpcQueueName, CancellationToken cancellationToken = default(CancellationToken))
IBasicProperties props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
callbackMapper.TryAdd(correlationId, tcs);
channel.BasicPublish(
exchange: "",
routingKey: rpcQueueName,
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out var tmp));
return tcs.Task;
`
【问题讨论】:
【参考方案1】:我通过为每个队列添加连接块并在最后 console.readline();保持连接打开以供使用的队列。
using (var connection = rpcServer.PublishMessage(customercontact, "customercontact_rpc_queue"))
using (var connectionObject = rpcServer.PublishMessage(result, "project_rpc_queue"))
using (var customersObject = rpcServer.PublishMessage(customersFromByD, "customer_rpc_queue"))
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
【讨论】:
以上是关于在 ASP.Net Core 3 中使用 Rabbitmq 的 RPC的主要内容,如果未能解决你的问题,请参考以下文章
将 OpenID Connect 与 .NET Core 3.0 ASP.NET Core API 服务一起使用时出错
Linq 查询在 ASP.NET-Core 3.0 及更高版本中对数字等字符串进行排序
如何在 ASP.NET Core 3.1 中启用多重身份验证?
属性路由在 asp.net core 3.0 中无法正常工作