rabbitmq - 不会获取队列中的所有消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq - 不会获取队列中的所有消息相关的知识,希望对你有一定的参考价值。

我正在尝试使用本教程(https://spring.io/guides/gs/messaging-rabbitmq/)玩rabbitmq,我不知道为什么我不能立即收到所有消息。

代码片段:

runner.Java

package hello;

import java.util.concurrent.TimeUnit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
private final ConfigurableApplicationContext context;

public Runner(Receiver receiver, RabbitTemplate rabbitTemplate,
        ConfigurableApplicationContext context) {
    this.receiver = receiver;
    this.rabbitTemplate = rabbitTemplate;
    this.context = context;
}

@Override
public void run(String... args) throws Exception {
    System.out.println("Sending message...");
    rabbitTemplate.convertAndSend(Application.queueName, "Hello from 1");
    rabbitTemplate.convertAndSend(Application.queueName, "Hello from 2");
    rabbitTemplate.convertAndSend(Application.queueName, "Hello from 3");
    rabbitTemplate.convertAndSend(Application.queueName, "Hello from 4");
   // receiver.getLatch().await(1, TimeUnit.MILLISECONDS);
    context.close();
}

}

receiver.Java

package hello;

import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Component
public class Receiver implements MessageListener{

private CountDownLatch latch = new CountDownLatch(1);


@Override
public void onMessage(Message message) {
    System.out.println("Received <" + message + ">");
    latch.countDown();
}

public CountDownLatch getLatch() {
    return latch;
}

}

application.Java

package hello;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;



import hello.Receiver;


@SpringBootApplication
public class Application {

final static String queueName = "spring-boot";

@Bean
@Qualifier("pubsubQueue")
Queue queue() {
    return new Queue(queueName, true, false, false);
}

@Bean
@Qualifier("eventExchange")
FanoutExchange exchange() {
    return new FanoutExchange("spring-boot-exchange",false,false);
}

@Bean
@Qualifier("pubsubQueueBinding")
Binding binding(@Qualifier("pubsubQueue") Queue queue,@Qualifier("eventExchange") FanoutExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange);
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Receiver receiver,@Qualifier("pubsubQueue") Queue queue) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueues(queue);
    //container.setQueueNames(queueName);
    container.setMessageListener(receiver);
    return container;
}




public static void main(String[] args) throws InterruptedException {
    SpringApplication.run(Application.class, args);
}







}

我的输出有时会返回2个消息,有时候是3和有时4.我如何确保每次都获得所有4个消息。谢谢,感谢您的帮助。

样本输出:

Test Run 1 : 
  Hello from 1 
  Hello from 2 
Test Run 2 : 
  Hello from 3 
  Hello from 4 
  Hello from 1 
  Hello from 2 
  Hello from 3 
Test Run 3: 
  Hello from 4 
  Hello from 1 
  Hello from 2 
  Hello from 3 

Expected output(all the time)
  Hello from 1 
  Hello from 2
  Hello from 3 
  Hello from 4 
答案
context.close();

您在发送4条消息后立即关闭应用程序上下文;关闭上下文会停止侦听器容器。

你需要等到他们全部收到;你有一个锁存器,但await()被注释掉,锁存器只被初始化为1;需要是4。

以上是关于rabbitmq - 不会获取队列中的所有消息的主要内容,如果未能解决你的问题,请参考以下文章

java如何获取rabbitmq队列中消息数量

如何修改在rabbitmq queue中的消息

如何用php获取rabbitmq指定队列中的未处理消息数量

如何用php获取rabbitmq指定队列中的未处理消息数量

获取 RabbitMQ 队列中的消息数

简述RabbitMQ的架构设计