ActiveMQ:单个生产者,多个消费者

Posted

技术标签:

【中文标题】ActiveMQ:单个生产者,多个消费者【英文标题】:ActiveMQ: Single producer, multiple consumers 【发布时间】:2020-06-17 12:06:46 【问题描述】:

我有一个使用 ActiveMQ 的消息队列。 Web 请求将消息放入队列中,persistency=True。现在,我有 2 个消费者,它们都作为单独的会话连接到该队列。消费者 1 总是确认消息,但消费者 2 永远不会。

现在,我读到了这个http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html,其中指出:

JMS 队列实现负载平衡器语义。一条消息将 仅由一位消费者接收。如果没有消费者 在发送消息时可用,它将一直保存到 可以处理消息的消费者可用。如果一个消费者 收到一条消息并且在关闭之前没有确认它然后 消息将被重新传递给另一个消费者。一个队列可以有很多 在可用消费者之间实现消息负载均衡的消费者。

我从中了解到的是,我希望所有消息最终都由消费者 1 处理,因为它总是确认。由于消费者 2 没有确认,因此消息应该被发送给消费者 1。

但我注意到以下内容: 1.当我提交一个请求时,我只看到每个第二个请求到达消费者 1。另一个请求没有出现,它存储在 ActiveMQ 中。我想它去了不承认的消费者 2。那么接下来应该是消费者 1 吗?

我只需要确保消息仅由一位消费者处理。在我的例子中,这个消费者是 X 国家(站点)的一台机器。每条消息只需要在一个国家(机器)处理。但是所有国家(机器)都应该收到该消息。如果消息中的国家/地区 ID 匹配,它将确认。所以只会发送 1 条确认/消息。

我接收/处理消息的代码如下所示:

# --------------------------------------------- MODULE IMPORT ---------------------------------------------------------#
import argparse
import json
import logging
import multiprocessing as mp
import sys

import stomp
from tvpv_portal.services.msgbkr import MsgBkr
from utils import util


# --------------------------------------------- DEVELOPMENT CODE ------------------------------------------------------#
log = logging.getLogger(__name__)


class MessageProcessingListener(stomp.ConnectionListener):
    """This class is responsible for processing (consuming) the messages from ActiveMQ."""

    def __init__(self, conn, cb):
        """Initialization.

        Args:
            conn -- Connection object
            cb   -- Callback function
        """

        self._conn = conn
        self._cb = cb

    def on_error(self, headers, body):
        """When we get an error.

        Args:
            headers -- Message header
            body    -- Message body
        """

        log.error('Received error=%s', body)

    def on_message(self, headers, body):
        """When we receive a message.

        Args:
            headers -- Message header
            body    -- Message body
        """

        log.info('Received message')

        # Deserialize the message.
        item = json.loads(body)

        import pprint
        pprint.pprint(item)

        # TODO: check if msg is to be handled by this SITE. If so, acknowledge and queue it. Otherwise, ignore.

        # Put message into queue via callback (queue.put) function.
        #self._cb(item)

        # TODO: we only send acknowledge if we are supposed to process this message.
        # Send acknowledgement to ActiveMQ indicating message is consumed.
        self._conn.ack(headers['message-id'], headers['subscription'])


def worker(q):
    """Worker to retrieve item from queue and process it.

    Args:
        q -- Queue
    """

    # Run in an infinite loop. Get an item from the queue to process it. We MUST call q.task_done() to indicate
    # that item is processed to prevent deadlock.
    while True:
        try:
            item = q.get()

            # TODO: We will call external script from here to run on Netbatch in the future.
            log.info('Processed message')

        finally:
            q.task_done()


def flash_mq_rst_handler_main():
    """Main entry to the request handler."""

    # Define arguments.
    parser = argparse.ArgumentParser(description='Flash message queue request handler script',
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter,
                                     add_help=False)

    opts = parser.add_argument_group('Options')
    opts.add_argument('-h', '--help', action='help',
                      help='Show this help message and exit')
    opts.add_argument('--workers', metavar='val', type=int, default=4,
                      help='Number of worker processes')
    opts.add_argument('--log', metavar='file', type=util.get_resolved_abspath, default='flash_mq_rst_handler.log',
                      help='Log file')

    # Parse arguments.
    args = parser.parse_args()

    # Setup logger.
    util.configure_logger(args.log)
    log.info('Command line %s', ' '.join(map(str, sys.argv)))

    # Create a managed queue to store messages retrieved from message queue.
    queue = mp.Manager().JoinableQueue()

    # Instantiate consumer message broker + ensure connection.
    consumer = MsgBkr(producer=False)
    if not consumer.is_connected():
        log.critical('Unable to connect to message queue; please debug')
        sys.exit(1)

    # Register listener with consumer + queue.put as the callback function to trigger when a message is received.
    consumer.set_listener('message_processing_listener', MessageProcessingListener, cb=queue.put)

    # Run in an infinite loop to wait form messages.
    try:
        log.info('Create pool with worker=%d to process messages', args.workers)
        with mp.Pool(processes=args.workers) as pool:
            p = pool.apply_async(worker, (queue,))
            p.get()
    except KeyboardInterrupt:
        pass

    # See MsgBkr. It will close the connection during exit() so we don't have to do it.
    sys.exit(0)


if __name__ == '__main__':
    flash_mq_rst_handler_main()

【问题讨论】:

***.com/questions/32864644/…。看起来我需要研究 ActiveMQ 中的 Fanout 交换等价物。 好的,我需要一个 Queue -> N Queue bridge 来做路由:见activemq.apache.org/jms-to-jms-bridge 如何在 Flask 应用程序中将其实现为一个独立的进程,持续监控队列并消费消息 【参考方案1】:

这已通过 JMS 网桥解决:https://activemq.apache.org/components/artemis/documentation/1.1.0/jms-bridge.html

能够让 IT 配置创建 N+1 队列。源(传入)队列是放置所有消息的地方。根据消息中的某些选择器(例如标头中的 'some_key': 'some_value'),可以将消息路由到 N 个目标(传出)队列之一。然后,每个站点都可以侦听特定队列的消息。同一目标队列上的多个消费者将以循环方式获取消息。

【讨论】:

以上是关于ActiveMQ:单个生产者,多个消费者的主要内容,如果未能解决你的问题,请参考以下文章

消息队列 ActiveMQ

activeMq初步学习

架构设计:系统间通信(23)——提高ActiveMQ工作性能(中)

activeMq与spring整合

activeMQ中queue 与 topic 区别

跨多个队列的 ActiveMQ 消息组消费者选择?