RabbitMQ RPC 以异步方式?
Posted
技术标签:
【中文标题】RabbitMQ RPC 以异步方式?【英文标题】:RabbitMQ RPC in an async way? 【发布时间】:2020-09-04 07:08:38 【问题描述】:我正在为一个使用 RabbitMQ 和 RPC 的客户开发一个项目,我不太了解 RabbitMQ,我正在努力在互联网上找到一些不错的示例。我需要实现一些异步操作,我会更好地解释自己..
在当前状态下,我有一个生产者发送 RPC 请求并等待消费者的答复,到目前为止一切正常,一切正常。我的问题是我不想等待答案,我仍然需要答案,但我不想在我的制片人那里等待。我将在此处发布我的生产者和消费者代码。
制作人
using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitProducer
public class RpcClient
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient()
ConnectionFactory factory = new ConnectionFactory() HostName = "192.168.68.17" ;
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ConfirmSelect();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
string correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
var body = ea.Body.ToArray();
string response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
respQueue.Add(response);
;
channel.BasicAcks += (sender, ea) =>
;
channel.BasicNacks += (sender, ea) =>
;
public string Call(string message)
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "Ciccio",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return respQueue.Take();
public void Close()
connection.Close();
class Program
public static void Main()
RpcClient rpcClient = new RpcClient();
Random random = new Random();
int a = random.Next(10, 50);
Console.WriteLine("Ciccio");
Console.WriteLine(a.ToString());
string response = rpcClient.Call(a.ToString());
Console.WriteLine(" [.] Got '0'", response);
rpcClient.Close();
Console.ReadLine();
消费者
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
namespace RabbitConsumer
class Program
public static void Main()
var factory = new ConnectionFactory() HostName = "192.168.68.17" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare(queue: "Ciccio", durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "Ciccio",
autoAck: false, consumer: consumer);
Console.WriteLine("Ciccio");
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
string response = null;
System.Threading.Thread.Sleep(5000);
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib(0)", message);
response = fib(n).ToString();
catch (Exception e)
Console.WriteLine(" [.] " + e.Message);
response = "";
finally
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
;
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
private static int fib(int n)
if (n == 0 || n == 1)
return n;
return fib(n - 1) + fib(n - 2);
【问题讨论】:
【参考方案1】:好吧,显然我最近工作太多以至于忘记了正确使用任务......这就是你可以实现我想要的东西的方法。
异步生产者
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitProducer
public class RpcClient
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient()
ConnectionFactory factory = new ConnectionFactory() HostName = "192.168.68.17" ;
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ConfirmSelect();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
string correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
var body = ea.Body.ToArray();
string response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
respQueue.Add(response);
;
channel.BasicAcks += (sender, ea) =>
;
channel.BasicNacks += (sender, ea) =>
;
public string Call(string message)
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "Ciccio",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return respQueue.Take();
public void Close()
connection.Close();
class Program
private static void Send(int n)
Console.WriteLine("SEND " + n);
Task taskA = new Task(() =>
RpcClient rpcClient = new RpcClient();
string resp = rpcClient.Call(n.ToString());
Console.WriteLine(n + " [.] Got '0'", resp);
rpcClient.Close();
);
taskA.Start();
public static void Main()
Random random = new Random();
int a = random.Next(10, 50);
Send(a);
int b = random.Next(10, 50);
Send(b);
int c = random.Next(10, 50);
Send(c);
Console.ReadLine();
【讨论】:
以上是关于RabbitMQ RPC 以异步方式?的主要内容,如果未能解决你的问题,请参考以下文章
使用 EventMachine 和 RabbitMQ 的 RPC