源文来自 RabbitMQ 英文官网的教程(2.Work Queues),其示例代码采用了 .NET C# 语言。
In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we‘ll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.
在第一篇教程中,我们编写了程序从一个具名(已明确命名的)队列中发送和接收消息。在这一篇中,我们会在多个工作单元之间创建一个工作队列来分配耗时的任务。
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
工作队列(又称:任务队列)背后的主旨是为了避免立即执行一项资源密集型任务,从而导致不得不等待它的完成。相反,我们安排任务稍后再完成。我们将任务封装成一个消息并发送给队列,由一个后台工作进程负责弹出该任务并最终执行这项工作。如果有多项工作单元在同时运行,这些任务会在它们之间平均分配。
This concept is especially useful in web applications where it‘s impossible to handle a complex task during a short HTTP request window.
上述这一概念在 Web 应用程序中尤其有用,因为在一个简短的 HTTP 请求视窗中几乎不可能处理一项复杂任务。
Preparation
准备事宜
In the previous part of this tutorial we sent a message containing "Hello World!". Now we‘ll be sending strings that stand for complex tasks. We don‘t have a real-world task, like images to be resized or pdf files to be rendered, so let‘s fake it by just pretending we‘re busy - by using the Thread.Sleep() function (you will need to add using System.Threading; near the top of the file to get access to the threading APIs). We‘ll take the number of dots in the string as its complexity; every dot will account for one second of "work". For example, a fake task described by Hello... will take three seconds.
在本教程的之前部分,我们发送了一个包含"Hello World!"的消息。现在,我们将要发送一个代表复杂任务的字符串。我们并没有一个现实世界的任务,诸如图片尺寸修整,或者 pdf 文件的渲染,所以让我们通过伪造忙碌来模拟它,使用 Thread.Sleep() 函数即可(你需要在文件的顶部追加 using System.Threading 命名空间)。我们会采取点的字符数量来表达复杂性,每一个点将表明一秒钟的“工作”,比如模拟任务被描述为“Hello...”时就表示运行了 3 秒钟。
We will slightly modify the Send program from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let‘s name it NewTask:
我们会在之前的示例中轻微地修改发送程序,以允许任意的消息可以从命令行中被发送。该程序会安排任务至工作队列,所以让我们给它命名为 NewTask 吧:
Like tutorial one we need to generate two projects.
如同第一篇教程,我们需要生成两个工程项目。
dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Worker
dotnet add package RabbitMQ.Client
dotnet restore
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Some help to get the message from the command line argument:
一些从命令行参数获取消息的帮助类:
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
Our old Receive.cs script also requires some changes: it needs to fake a second of work for every dot in the message body. It will handle messages delivered by RabbitMQ and perform the task, so let‘s copy it to the Worker project and modify:
我们在旧的 Receive.cs 代码中也需要做一些改变:它需要在消息体中针对每一个点模拟一秒钟刻度的工作,同时会处理经由 RabbitMQ 递送过来的消息以及运行任务,所以让我们先把代码复制到工程项目,并做一些修改:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
Our fake task to simulate execution time:
我们开始模拟一下仿真执行时间:
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Round-robin dispatching
循环分发
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
使用任务队列的其中一项优势就是可以很容易地开展并行工作。如果(队列中)出现大量地工作积压,我们可以通过该途径增添更多的工作单元,所以扩展规模很方便。
First, let‘s try to run two Worker instances at the same time. They will both get messages from the queue, but how exactly? Let‘s see.
首先,让我们尝试同时运行两个工作实例。他们将同时从队列中获取消息,不过结果将会如何呢?一起拭目以待吧。
You need three consoles open. Two will run the Worker program. These consoles will be our two consumers - C1 and C2.
你需要开启 3 个控件台,其中两个将用来运行工作程序,它们将成为消费者 - C1 和 C2。
# shell 1
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
In the third one we‘ll publish new tasks. Once you‘ve started the consumers you can publish a few messages:
在第 3 个控制台,我们将发布一个新任务。你一旦启动消费者程序,新任务就可以发布一些信息了:
# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."
Let‘s see what is delivered to our workers:
让我们一起看看到底递送了什么消息给工作单元:
# shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
默认情况下,RabbitMQ 会依次给下一个消费者逐个发送消息。平均算下来,每一个消费者将获得相同数量的消息,这种分发消息的方式就被称作循环。好了,试一试开启 3 个或更多的工作单元吧。
Message acknowledgment
消息确认
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We‘ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
执行一项任务会用去一些时间,如果其中一个消费者启动了一个长任务并且执行到一部分就中止,你可能想知道究竟发生了什么。鉴于我们当前的代码,一旦 RabbitMQ 递送消息给消费者,它会立即将消息标记为删除。这样的话,如果你中止一个工作单元你将会失去正在运行中的消息。同时,我们也会失去所有已分发到当前指定工作单元中而未处理的消息。
But we don‘t want to lose any tasks. If a worker dies, we‘d like the task to be delivered to another worker.
但是,我们并不希望失去任何任务,如果一个工作单元中止了,我们希望这个任务会被递送给另一个工作单元。
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.
为了确保一个消息永远不会丢失,RabbitMQ 提供了消息确认。消费者将回发一个 ack 标识来告知 RabbitMQ 指定的消息已被接收和处理,然后就可以放心的删除它。
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn‘t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
如果一个消费者中止了(信道被关闭、连接被关闭,或者是 TCP 连接丢失),导致没有发送 ack 标识,RabbitMQ 将领会到该消息没有被完全处理,随后将对其重新分配。如果有其他的消息者同时在线,RabbitMQ 会迅速的重新递送该任务给另一个消息者。通过该方式,你就可以确信没有消息会被遗漏,即使这些工作单元偶然地中止。
There aren‘t any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It‘s fine even if processing a message takes a very, very long time.
没有任何消息会出现超时,RabbitMQ 会在消费者中止时重新递送消息。即使运行一个消息会花去非常非常长的时间,它仍然可以运行良好。
Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off by setting the autoAck ("automatic acknowledgement mode") parameter to true. It‘s time to remove this flag and manually send a proper acknowledgment from the worker, once we‘re done with a task.
默认情况下,手动的消息确认是打开的。在之前的例子里,我们通过设置“自动确认”为 true 值来显式的关闭了手动机制。现在是时候删除这个(自动)标记了,一旦我们的工作单元完成一个任务的时候,就手动地从工作单元发送一个恰当的确认。
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.
使用这段代码后,我们可以确信即使你使用 CTRL + C 命令中止一个正在处理消息的工作单元也不会丢失什么。这样,在工作单元中止不久,所有未被确认的消息将会被重新递送。
Forgotten acknowledgment
被遗忘的确认
It‘s a common mistake to miss the BasicAck. It‘s an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won‘t be able to release any unacked messages.
一个较为常见的错误是忽视了 BasicAck,尽管这种错误很低级,但后果相当严重。当客户端退出时,消息会被重新递送(可能看起来像是随机地重新递送),但是 RabbitMQ 会因为无法释放那些未经确认的消息而吃掉越来越多的内存。
In order to debug this kind of mistake you can use rabbitmqctl to print the messages_unacknowledged field:
为了调试这种类型的错误,你可以使用 rabbitmqctl 命令来打印 messages_unacknowledged 字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
On Windows, drop the sudo:
在 Windows 平台上,释放(执行)该 sudo 命令:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Message durability
消息持久性
We have learned how to make sure that even if the consumer dies, the task isn‘t lost. But our tasks will still be lost if RabbitMQ server stops.
我们已经学习了如何确保即使消费者(意外)中止时,也可以让任务不会丢失。但是在 RabbitMQ 服务端停止时,我们的任务仍然会丢失。
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren‘t lost: we need to mark both the queue and messages as durable.
当 RabbitMQ 退出或者崩溃时它将遗失队列和消息,除非你明确告知它不要这么做。做好两件必要的事情,也就是将队列和消息标记为可持久的,这样就可以确保消息不会遗失。
First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
首先,我们需要确保 RabbitMQ 不会丢失队列,为了做到这一点,我们需要声明它是可持久的:
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
Although this command is correct by itself, it won‘t work in our present setup. That‘s because we‘ve already defined a queue called hello which is not durable. RabbitMQ doesn‘t allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let‘s declare a queue with different name, for example task_queue:
尽管这个命令的本身是正确的,但在我们当前的设置中仍不能正常工作,那是因为我们已经定义过一个未被持久化的名叫“hello”的队列(参照第一章提到的幂等性)。
RabbitMQ不允许采用不同参数重新定义一个已存在的队列,任何程序试图这么做的话将被返回一个错误。不过倒是有一个变通方案,让我们来声明一个不同名称的队列就好了,比如叫“task_queue”:
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
This queueDeclare change needs to be applied to both the producer and consumer code.
该队列声明的变更需要同时作用于生产者和消费者两处的代码(参考第一章中 Receiving 这一节提到的“尝试从队列中消费消息时,确保队列总是已存在的”,因为无法保障会先打开哪一个终端,所以该队列声明的代码要写两处)。
At this point we‘re sure that the task_queue queue won‘t be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting IBasicProperties.SetPersistent to true.
此时此刻,我们可以确定即使 RabbitMQ 重启了,名为“task_queue”的队列也不再丢失。现在我们需要通过设置 IBasicProperties.SetPersistent 的值为 true,从而标识消息为可持久的。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Note on message persistence
注意消息的持久
Marking messages as persistent doesn‘t fully guarantee that a message won‘t be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn‘t saved it yet. Also, RabbitMQ doesn‘t do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren‘t strong, but it‘s more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.
将消息标识为持久并不能完全保证一个消息也不会被丢失。尽管该标识告诉 RabbitMQ 要将消息保存到磁盘,但是当 RabbitMQ 已经接到一个消息却尚未保存它之际,将仍然有一个很小的时间窗口。另外,很可能这还只是保存到了缓存而未实实在在地写入到磁盘。尽管该持久保障措施还不是很强,但对于我们简单的任务队列已经是绰绰有余。如果你需要一个更强大的保障,可以使用发布者确认机制。
Fair dispatch
公平分发
You might have noticed that the dispatching still doesn‘t work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn‘t know anything about that and will still dispatch messages evenly.
你可能也注意到了,这种分发模式也不如我们所期望。比如在有两个工作单元的情景下,(并有多条消息相继而来),假设奇数项的消息比较冗繁,而偶数项的消息相对轻巧些。这样,其中一个工作单元将会持续地繁忙,而另一个工作单元则几乎不做任何事情。然而,RabbitMQ 并不知情而且还会继续朝奇数方向分发消息。
This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn‘t look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.
之所以发生这样的情形,是因为当消息进入到队列时 RabbitMQ 就开始分发,而忽视了消费者这边未确认消息的数量,它只是盲目地向第 n 个消费者分发每一条消息。
In order to change this behavior we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don‘t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
为了改变这种行为,我们可以使用 BasicQos 方法的 prefetchCount = 1 设置。它将告诉 RabbitMQ 向工作单元分发消息时一次不要超过一个。或者换一句话来讲,直到一个工作单元已处理完成并确认过上一个消息时,才把消息发送给它。反之,RabbitMQ 会把消息分发给下一个并不繁忙的工作单元。(从而达到公平分发的效果。)
channel.BasicQos(0, 1, false);
Note about queue size
注意队列大小
If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.
如果所有的工作单元都很繁忙,你的队列将会被填满,这时就需要你密切注视它,也许可以添加更多的工作单元,或者采取其他的策略。
Putting it all together
融合一起
Final code of our NewTask.cs class:
NewTask.cs 类文件的最终代码:
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
And our Worker.cs:
Worker.cs 类文件:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
Using message acknowledgments and BasicQos you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.
使用消息确认和 BasicQos,你可以建立起一个工作队列。通过持续化选项,即使 RabbitMQ 重启也可以让任务继续存活。