Competing Consumers Pattern (竞争消费者模式)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Competing Consumers Pattern (竞争消费者模式)相关的知识,希望对你有一定的参考价值。

Enable multiple concurrent consumers to process messages received on the same messaging channel. This pattern enables a system to process multiple messages concurrently to optimize throughput, to improve scalability and availability, and to balance the workload.

使多个并发的消费者来处理同一消息收发信道接收到的消息。这种模式使系统能够同时处理多个消息以优化吞吐量,提高扩展性和可用性,并且以平衡的工作量。

Context and Problem 背景与问题

An application running in the cloud may be expected to handle a large number of requests. Rather than process each request synchronously, a common technique is for the application to pass them through a messaging system to another service (a consumer service) that handles them asynchronously. This strategy helps to ensure that the business logic in the application is not blocked while the requests are being processed.

在云中运行的应用程序可以预期处理大量的请求。而不是过程中的每个请求的同步,常用的技术是应用程序通过消息系统来处理这些异步其它服务(消费者服务)来传递他们。这种策略有助于确保当请求正在处理中的应用程序的业务逻辑没有被阻塞。

The number of requests could vary significantly over time for many reasons. A sudden burst in user activity or aggregated requests coming from multiple tenants may cause unpredictable workload. At peak hours a system might need to process many hundreds of requests per second, while at other times the number could be very small. Additionally, the nature of the work performed to handle these requests might be highly variable. Using a single instance of the consumer service might cause that instance to become flooded with requests or the messaging system may be overloaded by an influx of messages coming from the application. To handle this fluctuating workload, the system can run multiple instances of the consumer service. However these consumers must be coordinated to ensure that each message is only delivered to a single consumer. The workload also needs to be load balanced across consumers to prevent an instance from becoming a bottleneck.

请求的数目可能会随着时间的推移等原因变化比较显著。在用户活动或自多个租户的请求突然爆发可能会导致不可预知的工作负荷。在高峰时间系统可能需要每秒处理数以百计的请求,而在其他时间的可能是非常小的。此外,工作性质执行以处理这些请求可能是高度可变的。利用单一消费者服务可能导致该实例充斥着请求或消息系统过载当从一个涌入而来的消息应用。为了处理这种波动的工作负荷,系统可以运行多个消费者服务实例。然而,这些消费者必须协调,以确保每个消息只传送到单一的消费者。工作负荷也需要负载跨越消费者平衡,以防止一个实例成为瓶颈。

Solution 解决方案

Use a message queue to implement the communication channel between the application and the instances of the consumer service. The application posts requests in the form of messages to the queue, and the consumer service instances receive messages from the queue and process them. This approach enables the same pool of consumer service instances to handle messages from any instance of the application. Figure 1 illustrates this architecture.

使用消息队列来实现应用和消费者服务实例之间的通信信道。应用程序提交标段的消息请求到消息队列,以及消费者服务实例从队列接收消息并对其进行处理。这种方法使同一池的消费者服务实例处理任何应用程序实例的消息。图1示出了该架构。

技术分享

Figure 1 - Using a message queue to distribute work to instances of a service

This solution offers the following benefits:该解决方案具有以下优点:

  • It enables an inherently load-leveled system that can handle wide variations in the volume of requests sent by application instances. The queue acts as a buffer between the application instances and the consumer service instances, which can help to minimize the impact on availability and responsiveness for both the application and the service instances (as described by the Queue-based Load Leveling pattern). Handling a message that requires some long-running processing to be performed does not prevent other messages from being handled concurrently by other instances of the consumer service.
  • 它是一种固有的负荷调平系统,可以处理应用程序发送批量请求的变化比较大的情况。队列在应用程序实例和客户服务的之间充当一个缓冲,它可以最大限度减少应用程序和服务实例在可用性和响应的影响。处理一个消息需要长时间运行只到被处理完毕但不会阻止消费者服务实例的其他消息并发。
  • It improves reliability. If a producer communicates directly with a consumer instead of using this pattern, but does not monitor the consumer, there is a high probability that messages could be lost or fail to be processed if the consumer fails. In this pattern messages are not sent to a specific service instance, a failed service instance will not block a producer, and messages can be processed by any working service instance.
  • 它提高了可靠性。当然生产者与消费者直接通信而取代使用这种模式,但不监视消费者,消息会可能丢失或失败的可能性比较高,如果消费者无法进行处理。在这种模式消息不发送到一个特定的服务实例,失败的服务实例将不会阻止一个生产者,且消息可以通过任何工作服务实例进行处理。
  • It does not require complex coordination between the consumers, or between the producer and the consumer instances. The message queue ensures that each message is delivered at least once.
  • 它不需要在消费者之间,或生产者和消费者的实例之间进行复杂的协调。消息队列确保每个消息被传递至少一次。
  • It is scalable. The system can dynamically increase or decrease the number of instances of the consumer service as the volume of messages fluctuates.
  • 它是可伸缩的。该系统可以动态增加或减少消费者服务的实例数目满足消息的体积波动。
  • It can improve resiliency if the message queue provides transactional read operations. If a consumer service instance reads and processes the message as part of a transactional operation, and if this consumer service instance subsequently fails, this pattern can ensure that the message will be returned to the queue to be picked up and handled by another instance of the consumer service.
  • 它可以提高应变能力如果消息队列的提供事务读取操作时。如果消费者服务实例读取和处理该消息作为事务操作的一部分,并且如果这种消费服务实例随后发生故障时,该模式可以确保消息将被返回到队列并被另一个消费者服务实例拾起和处理。

Issues and Considerations 问题和注意事项

Consider the following points when deciding how to implement this pattern:

在决定如何实施这一模式时,请考虑以下几点:

Message Ordering. The order in which consumer service instances receive messages is not guaranteed, and does not necessarily reflect the order in which the messages were created. Design the system to ensure that message processing is idempotent because this will help to eliminate any dependency on the order in which messages are handled. For more information about idempotency, see Idempotency Patterns on Jonathon Oliver’s blog.

?消息订购。在消费者服务实例接收消息的顺序不能保证,并不一定反映在其中创建的消息的顺序。设计系统,确保信息处理等幂的,因为这将有助于消除在消息的处理顺序上的任何依赖。有关幂等的详细信息,请参阅乔纳森·奥利弗的博客幂等模式。

Note:

Microsoft Azure Service Bus Queues can implement guaranteed first-in-first-out ordering of messages by using message sessions. For more information, see Messaging Patterns Using Sessions on MSDN.

Designing Services for Resiliency. If the system is designed to detect and restart failed service instances, it may be necessary to implement the processing performed by the service instances as idempotent operations to minimize the effects of a single message being retrieved and processed more than once.

?设计服务弹性。如系统被设计为检测和重新启动当服务初始化失败,可能有必要实施处理并由服务实例作为幂运算执行,以尽量减少检索和处理一次以上的单个消息的影响的处理。

Detecting Poison Messages. A malformed message, or a task that requires access to resources that are not available, may cause a service instance to fail. The system should prevent such messages being returned to the queue, and instead capture and store the details of these messages elsewhere so that they can be analyzed if necessary.

?检测有害消息。格式不正确的消息或需要访问不可用的资源的任务,可能会造成服务实例失败。该系统应防止这样的消息被返回到队列,而是捕获和在别处存储这些消息的细节,以便它们可以在需要进行分析。

Handling Results. The service instance handling a message is fully decoupled from the application logic that generates the message, and they may not be able to communicate directly. If the service instance generates results that must be passed back to the application logic, this information must be stored in a location that is accessible to both and the system must provide some indication of when processing has completed to prevent the application logic from retrieving incomplete data.

?处理结果。服务实例处理的消息是从生成该消息的应用程序逻辑完全解耦,并且它们未必能够直接进行通信。如果服务实例生成的结果必须传递回应用程序逻辑,此信息必须被存储在是两者可访问的位置,并且系统必须提供当处理已经完成,以防止从检索不完全数据的应用程序逻辑的一些指示。

Note:

If you are using Azure, a worker process may be able to pass results back to the application logic by using a dedicated message reply queue. The application logic must be able to correlate these results with the original message. This scenario is described in more detail in the Asynchronous Messaging Primer.

Scaling the Messaging System. In a large-scale solution, a single message queue could be overwhelmed by the number of messages and become a bottleneck in the system. In this situation, consider partitioning the messaging system to direct messages from specific producers to a particular queue, or use load balancing to distribute messages across multiple message queues.
?伸缩的消息系统。在一个大型可伸缩性的解决方案中,一个消息队列由消息的数量未知而不知所措,成为在系统中的瓶颈。在这种情况下,考虑分割该消息系统以引导从特定生产者的信息到一个特定的队列,或使用负载平衡通过多个消息队列分发消息。

Ensuring Reliability of the Messaging System. A reliable messaging system is needed to guarantee that, once the application enqueues a message, it will not be lost. This is essential for ensuring that all messages are delivered at least once.
?消息系统的可靠性保障。需要一个可靠的消息传递系统来保证,一旦应用程序排队的消息,它也不会丢失。这是确保所有消息都至少有一次交付至关重要。

When to Use this Pattern 什么时候用这个模式

Use this pattern when:使用此模式时:

  • The workload for an application is divided into tasks that can run asynchronously.
  • 一个应用程序的工作负荷被分成可以异步运行任务。
  • Tasks are independent and can run in parallel.
  • 任务是独立的,并且可以并行地运行。
  • The volume of work is highly variable, requiring a scalable solution.
  • 工作容积是高度可变的,因此需要一种可扩展的解决方案。
  • The solution must provide high availability, and must be resilient if the processing for a task fails.
  • 该解决方案必须提供高可用性,并且如果一个任务的处理失败,必须是有弹性的。

This pattern may not be suitable when:这种模式可能不适合时:

  • It is not easy to separate the application workload into discrete tasks, or there is a high degree of dependence between tasks.
  • 对应用程序的工作负荷分割成离散的任务是不容易的,或者任务之间有高度的依赖。
  • Tasks must be performed synchronously, and the application logic must wait for a task to complete before continuing.
  • 任务必须同步进行,并且应用程序逻辑必须等待任务完成后再继续。
  • Tasks must be performed in a specific sequence.
  • 任务必须以特定的顺序来执行。

Note:注意:

Some messaging systems support sessions that enable a producer to group messages together and ensure that they are all handled by the same consumer. This mechanism can be used with prioritized messages (if they are supported) to implement a form of message ordering that delivers messages in sequence from a producer to a single consumer.

一些消息系统支持会话,使生产者对消息进行分组在一起,并确保它们都被同一个接收者处理。该机制可以与优先消息中使用(如果它们支持)来实现消息排序的形式,从生产者传送消息中的序列到单个消费者。

Example 例子

Azure provides storage queues and Service Bus queues that can act as a suitable mechanism for implementing this pattern. The application logic can post messages to a queue, and consumers implemented as tasks in one or more roles can retrieve messages from this queue and process them. For resiliency, a Service Bus queue enables a consumer to use PeekLock mode when it retrieves a message from the queue. This mode does not actually remove the message, but simply hides it from other consumers. The original consumer can delete the message when it has finished processing it. If the consumer should fail, the peek lock will time out and the message will become visible again, allowing another consumer to retrieve it.

Azure提供存储队列和服务总线队列,可以作为适当的机制实施此模式。应用逻辑可以发布消息到队列,而消费者实现为在一个或多个角色的任务可以从这个队列中检索消息并进行处理。对于弹性,服务总线队列使消费者使用PeekLock模式时,它就从队列中的消息。这种模式实际上并没有删除该消息,而只是隐藏它从其他消费者。当处理完它原来的消费者可以删除邮件。如果消费者失败,PeekLock将超时,消息将再次变得可见,让另一个消费者进行取回。

Note:

For detailed information on using Azure Service Bus queues, see Service Bus Queues, Topics, and Subscriptions on MSDN. For information on using Azure storage queues, see How to use the Queue Storage Service on MSDN.

The following code shows from the QueueManager class in CompetingConsumers solution of the examples available for download for this guidance shows how you can create a queue by using a QueueClient instance in the Start event handler in a web or worker role.

下面的代码从竞争的可供下载的例子中的CompetingConsumers解决方案QueueManager类展示了这个指导说明了如何可以通过在网络或辅助角色Start的事件处理程序使用QueueClient实例创建队列。

private string queueName = ...;
private string connectionString = ...;
...

public async Task Start()
{
  // Check if the queue already exists.
  var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
  if (!manager.QueueExists(this.queueName))
  {
    var queueDescription = new QueueDescription(this.queueName);

    // Set the maximum delivery count for messages in the queue. A message 
    // is automatically dead-lettered after this number of deliveries. The
    // default value for dead letter count is 10.
    queueDescription.MaxDeliveryCount = 3;

    await manager.CreateQueueAsync(queueDescription);
  }
  ...

  // Create the queue client. By default the PeekLock method is used.
  this.client = QueueClient.CreateFromConnectionString(
    this.connectionString, this.queueName);
}

The next code snippet shows how an application can create and send a batch of messages to the queue.

下面的代码片段展示了一个应用程序如何创建和发送一批消息队列。

public async Task SendMessagesAsync()
{
  // Simulate sending a batch of messages to the queue.
  var messages = new List<BrokeredMessage>();

  for (int i = 0; i < 10; i++)
  {
    var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
    messages.Add(message);
  }
  await this.client.SendBatchAsync(messages);
}

The following code shows how a consumer service instance can receive messages from the queue by following an event-driven approach. The processMessageTask parameter to the ReceiveMessages method is a delegate that references the code to run when a message is received. This code is run asynchronously.

下面的代码演示了如何消费服务实例可以从队列遵循事件驱动方式接收消息。该processMessageTask参数来ReceiveMessages方法是引用代码收到消息时运行的委托。此代码异步运行。

private ManualResetEvent pauseProcessingEvent;
...

public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
{
  // Set up the options for the message pump.
  var options = new OnMessageOptions();

  // When AutoComplete is disabled it is necessary to manually
  // complete or abandon the messages and handle any errors.
  options.AutoComplete = false;
  options.MaxConcurrentCalls = 10;
  options.ExceptionReceived += this.OptionsOnExceptionReceived;

  // Use of the Service Bus OnMessage message pump. 
  // The OnMessage method must be called once, otherwise an exception will occur.
  this.client.OnMessageAsync(
    async (msg) =>
    {
      // Will block the current thread if Stop is called.
      this.pauseProcessingEvent.WaitOne();

      // Execute processing task here.
      await processMessageTask(msg);
    },
    options);
}
...

private void OptionsOnExceptionReceived(object sender, 
  ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
  ...
}

Note that autoscaling features, such as those available in Azure, can be used to start and stop role instances as the queue length fluctuates. For more information, see Autoscaling Guidance. In addition, it is not necessary to maintain a one-to-one correspondence between role instances and worker processes—a single role instance can implement multiple worker processes. For more information, see Compute Resource Consolidation pattern.

注意,自动缩放的功能,如在Azure中那些可用,可作为队列长度波动来启动和停止角色实例。欲了解更多信息,请参阅自动缩放指导。此外,这是没有必要维持角色实例和工人之间的一对一的对应关系的过程,一个单一的角色实例可以实现多个工作进程。欲了解更多信息,请参阅计算资源整合模式。

Related Patterns and Guidance 相关模式和指导

The following patterns and guidance may be relevant when implementing this pattern:

实施这一模式时,以下模式和指导可能是相关的:

  • Asynchronous Messaging Primer. Message queues are an inherently asynchronous communications mechanism. If a consumer service needs to send a reply to an application, it may be necessary to implement some form of response messaging. The Asynchronous Messaging Primer provides information on how to implement request/reply messaging by using message queues.
  • 异步消息传递机制。消息队列是一种固有的异步通信机制。如果消费者服务需要一个答复发送给一个应用程序,可能有必要实现某种形式的响应消息的。异步消息传递机制提供有关如何实施请求/使用消息队列的回复消息的信息。
  • Autoscaling Guidance. It may be possible to start and stop instances of a consumer service as the length of the queue to which applications post messages varies. Autoscaling can help to maintain throughput during times of peak processing.
  • 自动缩放指引。它可能会启动和停止一消费者服务作为队列长度的实例的应用程序发布消息而变化。自动配置功能可以帮助在高峰处理的时间,以保持吞吐量。
  • Compute Resource Consolidation Pattern. It may be possible to consolidate multiple instances of a consumer service into a single process to reduce costs and management overhead. The Compute Resource Consolidation pattern describes the benefits and tradeoffs of following this approach.
  • 计算资源整合模式。它可能对消费者的服务的多个实例合并成一个单一的过程,以降低成本和管理开销。计算资源整合模式描述的好处,下面这种方法的权衡。
  • Queue-based Load Leveling Pattern. Introducing a message queue can add resiliency to the system, enabling service instances to handle widely varying volumes of requests from application instances. The message queue effectively acts as a buffer which levels the load. The Queue-based Load Leveling pattern describes this scenario in more detail.
  • 计算资源整合模式。它可能对消费者的服务的多个实例合并成一个单一的过程,以降低成本和管理开销。计算资源整合模式描述的好处,下面这种方法的权衡。

More Information更多信息

This pattern has a sample application associated with it. You can download the "Cloud Design Patterns – Sample Code" from the Microsoft Download Center at http://aka.ms/cloud-design-patterns-sample.



以上是关于Competing Consumers Pattern (竞争消费者模式)的主要内容,如果未能解决你的问题,请参考以下文章

JZOI5245 Competing Souls

R语言使用cmprsk包的crr函数进行生存资料的多因素竞争风险分析(Competing Risks Regression)

什么是竞争风险模型(Competing Risk Model)为什么选用竞争风险模型?为什么会高估风险?cmprsk包进行竞争风险分析regplot包的regplot函数可视化回归模型的列线图

来自 Consumers.py 外部的 Django 通道消息

dubbo——consumers

Kafka Eagle Consumers不显示