ActiveMQ:单个生产者,多个消费者
Posted
技术标签:
【中文标题】ActiveMQ:单个生产者,多个消费者【英文标题】:ActiveMQ: Single producer, multiple consumers 【发布时间】:2020-06-17 12:06:46 【问题描述】:我有一个使用 ActiveMQ 的消息队列。 Web 请求将消息放入队列中,persistency=True。现在,我有 2 个消费者,它们都作为单独的会话连接到该队列。消费者 1 总是确认消息,但消费者 2 永远不会。
现在,我读到了这个http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html,其中指出:
JMS 队列实现负载平衡器语义。一条消息将 仅由一位消费者接收。如果没有消费者 在发送消息时可用,它将一直保存到 可以处理消息的消费者可用。如果一个消费者 收到一条消息并且在关闭之前没有确认它然后 消息将被重新传递给另一个消费者。一个队列可以有很多 在可用消费者之间实现消息负载均衡的消费者。
我从中了解到的是,我希望所有消息最终都由消费者 1 处理,因为它总是确认。由于消费者 2 没有确认,因此消息应该被发送给消费者 1。
但我注意到以下内容: 1.当我提交一个请求时,我只看到每个第二个请求到达消费者 1。另一个请求没有出现,它存储在 ActiveMQ 中。我想它去了不承认的消费者 2。那么接下来应该是消费者 1 吗?
我只需要确保消息仅由一位消费者处理。在我的例子中,这个消费者是 X 国家(站点)的一台机器。每条消息只需要在一个国家(机器)处理。但是所有国家(机器)都应该收到该消息。如果消息中的国家/地区 ID 匹配,它将确认。所以只会发送 1 条确认/消息。
我接收/处理消息的代码如下所示:
# --------------------------------------------- MODULE IMPORT ---------------------------------------------------------#
import argparse
import json
import logging
import multiprocessing as mp
import sys
import stomp
from tvpv_portal.services.msgbkr import MsgBkr
from utils import util
# --------------------------------------------- DEVELOPMENT CODE ------------------------------------------------------#
log = logging.getLogger(__name__)
class MessageProcessingListener(stomp.ConnectionListener):
"""This class is responsible for processing (consuming) the messages from ActiveMQ."""
def __init__(self, conn, cb):
"""Initialization.
Args:
conn -- Connection object
cb -- Callback function
"""
self._conn = conn
self._cb = cb
def on_error(self, headers, body):
"""When we get an error.
Args:
headers -- Message header
body -- Message body
"""
log.error('Received error=%s', body)
def on_message(self, headers, body):
"""When we receive a message.
Args:
headers -- Message header
body -- Message body
"""
log.info('Received message')
# Deserialize the message.
item = json.loads(body)
import pprint
pprint.pprint(item)
# TODO: check if msg is to be handled by this SITE. If so, acknowledge and queue it. Otherwise, ignore.
# Put message into queue via callback (queue.put) function.
#self._cb(item)
# TODO: we only send acknowledge if we are supposed to process this message.
# Send acknowledgement to ActiveMQ indicating message is consumed.
self._conn.ack(headers['message-id'], headers['subscription'])
def worker(q):
"""Worker to retrieve item from queue and process it.
Args:
q -- Queue
"""
# Run in an infinite loop. Get an item from the queue to process it. We MUST call q.task_done() to indicate
# that item is processed to prevent deadlock.
while True:
try:
item = q.get()
# TODO: We will call external script from here to run on Netbatch in the future.
log.info('Processed message')
finally:
q.task_done()
def flash_mq_rst_handler_main():
"""Main entry to the request handler."""
# Define arguments.
parser = argparse.ArgumentParser(description='Flash message queue request handler script',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
add_help=False)
opts = parser.add_argument_group('Options')
opts.add_argument('-h', '--help', action='help',
help='Show this help message and exit')
opts.add_argument('--workers', metavar='val', type=int, default=4,
help='Number of worker processes')
opts.add_argument('--log', metavar='file', type=util.get_resolved_abspath, default='flash_mq_rst_handler.log',
help='Log file')
# Parse arguments.
args = parser.parse_args()
# Setup logger.
util.configure_logger(args.log)
log.info('Command line %s', ' '.join(map(str, sys.argv)))
# Create a managed queue to store messages retrieved from message queue.
queue = mp.Manager().JoinableQueue()
# Instantiate consumer message broker + ensure connection.
consumer = MsgBkr(producer=False)
if not consumer.is_connected():
log.critical('Unable to connect to message queue; please debug')
sys.exit(1)
# Register listener with consumer + queue.put as the callback function to trigger when a message is received.
consumer.set_listener('message_processing_listener', MessageProcessingListener, cb=queue.put)
# Run in an infinite loop to wait form messages.
try:
log.info('Create pool with worker=%d to process messages', args.workers)
with mp.Pool(processes=args.workers) as pool:
p = pool.apply_async(worker, (queue,))
p.get()
except KeyboardInterrupt:
pass
# See MsgBkr. It will close the connection during exit() so we don't have to do it.
sys.exit(0)
if __name__ == '__main__':
flash_mq_rst_handler_main()
【问题讨论】:
***.com/questions/32864644/…。看起来我需要研究 ActiveMQ 中的 Fanout 交换等价物。 好的,我需要一个 Queue -> N Queue bridge 来做路由:见activemq.apache.org/jms-to-jms-bridge 如何在 Flask 应用程序中将其实现为一个独立的进程,持续监控队列并消费消息 【参考方案1】:这已通过 JMS 网桥解决:https://activemq.apache.org/components/artemis/documentation/1.1.0/jms-bridge.html
能够让 IT 配置创建 N+1 队列。源(传入)队列是放置所有消息的地方。根据消息中的某些选择器(例如标头中的 'some_key': 'some_value'),可以将消息路由到 N 个目标(传出)队列之一。然后,每个站点都可以侦听特定队列的消息。同一目标队列上的多个消费者将以循环方式获取消息。
【讨论】:
以上是关于ActiveMQ:单个生产者,多个消费者的主要内容,如果未能解决你的问题,请参考以下文章