java - RabbitMQ 消费者不接收消息

Posted

技术标签:

【中文标题】java - RabbitMQ 消费者不接收消息【英文标题】:java - RabbitMQ consumer doesn't receive messages 【发布时间】:2016-05-13 14:31:39 【问题描述】:

我正在使用 RabbitMQ 使用 Java。 我有两台 RabbitMQ 服务器,配置相同,一台是开发环境,一台是生产环境。 这是消费者声明:

    /*
     * Connection and channel declaration
     */
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri(prop.getProperty("ConnectionURI"));
    connection = factory.newConnection();
    channel = connection.createChannel();

    /*
     * Queue declaration and exchange binding
     */
    channel.exchangeDeclare(prop.getProperty("printExchange"), "topic", false, false, false, new HashMap<>());
    queueName = prop.getProperty("printQueue");
    routing_key = "print." + codCliente + "." + idCassa;
    channel.queueDeclare(queueName, false, false, false, null);
    channel.queueBind(queueName, prop.getProperty("printExchange"), routing_key);

这里开始监听队列:

JAyronPOS.LOGGER.info("Waiting for a message on the queue -> " + queueName + " with routingkey -> " + routing_key);
Consumer consumer = new DefaultConsumer(channel) 
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
        JAyronPOS.LOGGER.info("This is the received message -> " + queueName + ": " + new String(body, "UTF-8"));
        Map<String, Object> headers = properties.getHeaders();
        if (envelope.getRoutingKey().equals(routing_key)) 
            JAyronPOS.LOGGER.info("Message is for me, because it has my routing key");
            channel.basicAck(envelope.getDeliveryTag(), false);
            if (headers != null) 
                if (headers.containsKey("command")) 
                    JAyronPOS.LOGGER.info("It's a command!");
                    JAyronPOS.LOGGER.info(headers.get("command").toString());
                    if ("requestClose".equals(headers.get("command").toString())) 
                        ChiusuraFiscaleConfirm confirm = gson.fromJson(new String(body, "UTF-8"), ChiusuraFiscaleConfirm.class);
                        if (confirm.isCanClose()) 
                            eseguiChiusuraFiscale();
                         else 
                            JOptionPane.showMessageDialog(null, "Can't close", "Error", JOptionPane.ERROR_MESSAGE);
                        
                     else 
                        JAyronPOS.LOGGER.info("Can't handle the message");
                    
                
             else 
                System.out.println("It's a ticket");
                TicketWrapper ticket = gson.fromJson(new String(body, "UTF-8"), TicketWrapper.class);
                printTicket(ticket);
            
        else
            JAyronPOS.LOGGER.info("The message isn't for me, because it has the routingkey: "+envelope.getRoutingKey());
        
    
;
channel.basicConsume(queueName, false, consumer);

在开发环境中,我最多有 5 个队列,而在生产环境中,我有 150-200 个队列。 消息由交换机发送,带有个人 routing_key。发送消息的数量不高(压力时不超过 10 msg/s)。 当我在开发环境中测试消费者时,一切正常: - 我发送一个 RPC 调用,服务器对其进行处理并回复。消费者阅读回复并调用正确的方法。大约 1-2 秒。 当我在生产环境中使用该软件时(我仅通过注释/取消注释 config.properties 文件中的一行来更改环境),它不起作用: - 我发送 RPC 调用,服务器对其进行处理并在队列中发送回复。消费者永远不会收到消息(但我可以看到 Web 管理面板在队列中生成的消息)。

可能是什么问题?

编辑:我注意到,如果我发送 RPC 调用,在 RabbitMQ Web 面板中,回复队列中的“Deliver”(浅蓝色)下会有一条消息,而如果我发送 3 -4 RPC 调用(与上一个相同),经过一些调用,回复队列上有一条消息在 Publish(黄色)下,消费者收到回复。

【问题讨论】:

【参考方案1】:

您没有提供发布代码或拓扑,因此很难猜测您的拓扑是如何工作的。

但是在消费者上匹配路由密钥是个坏主意,交易所应该为你做。消费者可以为所需的路由键创建和绑定队列。 在您的代码 else 分支中,消费者不确认或拒绝消息,这可能导致它挂在 delivered 状态并且永远不会被其他消费者接收。

Rabbitmq 教程有有用的 RPC 部分https://www.rabbitmq.com/tutorials/tutorial-six-java.html

【讨论】:

以上是关于java - RabbitMQ 消费者不接收消息的主要内容,如果未能解决你的问题,请参考以下文章

rabbitMQ 学习

rabbitMQ学习

rabbit mq 手动重试机制

RabbitMQ/JAVA (发布/订阅模式)

python使用rabbitMQ介绍三(发布订阅模式)

rabbitmq