消息持久性
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃,它会忘记队列和消息,除非你不告诉它。需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为持久。
首先,我们需要确保RabbitMQ永远不会失去队列。为了做到这一点,我们需要宣布它是持久的
1、创建一个持久化的信息
//初始化一个连接 生产者 -> (发布者) var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //定义一个队列 它们的队列不能相同 channel.QueueDeclare(queue: "order_durable",
durable: true, //durable 持久化 定义一个队列 并且设置为持久性的 exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(message); IBasicProperties properties = channel.CreateBasicProperties(); //此时我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久的 // - 通过设置IBasicProperties.SetPersistent为true properties.Persistent = true; //生产一条信息 channel.BasicPublish(exchange: "", routingKey: "order_durable",
basicProperties: properties, body: body); Console.WriteLine("开始发送:--{0}", message);
2、接受一个持久化的信息
//初始化一个连接 生产者 -> (消费者) var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //对应的队列 channel.QueueDeclare(queue: "order_durable",
durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); //接受消息 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("接受到信息: {0} mode:{1}", message, model); }; channel.BasicConsume("order_durable", true, consumer);
Console.ReadLine(); }
注意消息持久性
将邮件标记为“永久”并不能完全保证邮件不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接收到消息并且还没有保存消息时,仍然有一个很短的时间窗口。此外,RabbitMQ不会为每个消息执行fsync(2) - 它可能只是保存到缓存中,而不是写入磁盘。持久性保证不强,但对于我们简单的任务队列已经足够了。如果您需要更强大的保证,那么您可以使用
发布确定
工作队列
(使用.NET客户端)
先决条件
本教程假定RabbitMQ已安装并在标准端口(5672)上的本地主机上运行。如果您使用不同的主机,端口或凭据,连接设置将需要调整。
在哪里得到帮助
如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系。
在第一个教程中,我们编写了用于从命名队列发送和接收消息的程序。在这一个中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待完成。相反,我们安排稍后完成任务。我们把一个任务封装 成一个消息并发送给一个队列。在后台运行的工作进程将弹出任务并最终执行作业。当你运行许多工人时,任务将在他们之间共享。
这个概念在Web应用程序中特别有用,在短的HTTP请求窗口中不可能处理复杂的任务。
制备
在本教程的前一部分,我们发送了一个包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如图像被调整大小或者PDF文件被渲染,所以让我们假装我们很忙,通过使用Thread.Sleep()函数来伪装它(你需要添加 使用System.Threading;靠近文件顶部以访问线程API)。我们将把字符串中的点数作为它的复杂度。每一个点将占到“工作”的一秒钟。例如,Hello ...描述的假任务 将需要三秒钟的时间。
我们稍微修改前面例子中的发送程序,以允许从命令行发送任意消息。这个程序将安排任务到我们的工作队列,所以让我们把它命名为 NewTask:
像教程一样,我们需要生成两个项目。
dotnet new console --name NewTask mv NewTask / Program.cs NewTask / NewTask.cs dotnet new console --name Worker mv Worker / Program.cs Worker / Worker.cs cd NewTask dotnet add package RabbitMQ.Client dotnet restore cd ../工人 dotnet添加包RabbitMQ.Client dotnet恢复
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties();
properties.Persistent = true ;
channel.BasicPublish(exchange:“”,
routingKey:“task_queue”,
basicProperties:properties,
body:body);
一些帮助从命令行参数获取消息:
private static string GetMessage(string [] args)
{
return((args.Length> 0)? string .Join( “”,args): “Hello World!”);
}
我们旧的Receive.cs脚本也需要做一些改变:它需要伪造邮件正文中每个点的第二个工作。它将处理由RabbitMQ提供的消息并执行任务,所以让我们将其复制到Worker项目并修改:
var consumer = new EventingBasicConsumer(channel);
consumer.Received + =(model,ea)=>
{ var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(“[x] Received {0}”,message); int dots = message.Split(‘。‘).Length - 1 ;
Thread.Sleep(dots * 1000);
Console.WriteLine(“[x] Done”);
};
channel.BasicConsume(队列:“task_queue”,autoAck:true,consumer:consumer);
我们假冒的任务来模拟执行时间:
int dots = message.Split(‘。‘).Length - 1 ;
Thread.Sleep(dots * 1000);
循环调度
使用任务队列的优点之一是能够轻松地平行工作。如果我们积压工作,我们可以增加更多的工人,这样可以轻松扩展。
首先,我们试着同时运行两个Worker实例。他们都会从队列中得到消息,但究竟是如何?让我们来看看。
您需要打开三个控制台。两个将运行工人程序。这些控制台将是我们的两个消费者 - C1和C2。
#shell 1 cd Worker dotnet run #=> [*]等待消息。要退出,请按CTRL + C
#shell 2 cd Worker dotnet run #=> [*]等待消息。要退出,请按CTRL + C
在第三个,我们将发布新的任务。一旦你开始了消费者,你可以发布一些消息:
#shell 3 cd NewTask
dotnet运行“第一条消息”。
dotnet运行“第二个消息..”
dotnet运行“第三个消息...”
dotnet运行“第四个消息....”
dotnet运行“第五个消息.....”
让我们看看交付给我们的员工:
#shell 1
#=> [*]等待消息。要退出,请按CTRL + C
#=> [x]接收到“第一条消息”。
#=> [x]收到‘第三条消息...‘
#=> [x]收到‘第五条消息.....‘
#shell 2
#=> [*]等待消息。退出按CTRL + C
#=> [x]收到‘第二条消息..‘
#=> [x]收到‘第四条消息....‘
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法(round-robin)。试试三个或更多的工人。
消息确认
做任务可能需要几秒钟的时间。你可能会想知道如果其中一个消费者开始一个长期的任务,并且只是部分完成而死亡会发生什么。使用我们当前的代码,一旦RabbitMQ向客户发送消息,立即将其标记为删除。在这种情况下,如果你杀了一个工人,我们将失去刚刚处理的信息。我们也将失去所有派发给这个特定工作人员但尚未处理的消息。
但我们不想失去任何任务。如果一名工人死亡,我们希望将任务交付给另一名工人。
为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发回确认(告知)告诉RabbitMQ已经收到,处理了一个特定的消息,并且RabbitMQ可以自由删除它。
如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ将理解消息未被完全处理,并将重新排队。如果有其他消费者同时上网,则会迅速重新发送给其他消费者。这样,即使工人偶尔死亡,也可以确保没有任何信息丢失。
没有任何消息超时; 当消费者死亡时,RabbitMQ将重新传递消息。即使处理消息需要非常很长的时间也没关系。
手动消息确认默认打开。在前面的示例中,我们通过将autoAck(“自动确认模式”)参数设置为true来明确地关闭它们。一旦我们完成了任务,现在是时候删除这个标志并且手动发送一个合适的确认信息给工作人员。
var consumer = new EventingBasicConsumer(channel);
consumer.Received + =(model,ea)=>
{ var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(“[x] Received {0}”,message); int dots = message.Split(‘。‘).Length - 1 ;
Thread.Sleep(dots * 1000);
Console.WriteLine(“[x] Done”);
channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);
};
channel.BasicConsume(队列:“task_queue”,autoAck:
假,消费者:消费者);
使用这段代码,我们可以确定,即使在处理消息的时候使用CTRL + C来杀死一个工作者,也不会有任何东西丢失。工人死后不久,所有未确认的消息将被重新发送。
忘记确认
错过BasicAck是一个常见的错误。这是一个容易的错误,但后果是严重的。当你的客户退出时,消息会被重新传递(这可能看起来像是随机的重新传递),但是RabbitMQ会占用越来越多的内存,因为它不能释放任何未被消息的消息。
为了调试这种错误,你可以使用rabbitmqctl 打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues名称messages_ready messages_unacknowledged
在Windows上,删除sudo:
rabbitmqctl.bat list_queues名称messages_ready messages_unacknowledged
消息持久性
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃,它会忘记队列和消息,除非你不告诉它。需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为持久。
首先,我们需要确保RabbitMQ永远不会失去队列。为了做到这一点,我们需要宣布它是持久的