SpringBoot 搭建 Rabbitmq
Posted 技术博客这里开始
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot 搭建 Rabbitmq相关的知识,希望对你有一定的参考价值。
本文的Spring boot版本是2.1.7.RELEASE。
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
virtual-host: /
host: localhost
port: 5672
username: guest
password: 123456
listener:
simple:
concurrency: 10
max-concurrency: 20
prefetch: 5
mq:
env: test
cart:
place:
order:
queue: ${mq.env}.cart.place.order.queue
exchange: ${mq.env}.cart.place.order.exchange
routing:
key: ${mq.env}.cart.place.order.routing.key
Queue.config
@Configuration
public class QueueConfig {
@Autowired
private Environment environment;
/**
* cart place order apply
*/
/**
* durable = "true" persistent Rabbitmq does not need to create a new queue when restarting
*/
@Bean
public Queue placeOrderQueue() {
return new Queue(environment.getProperty("cart.place.order.queue"), true);
}
/**
* durable = "true" Rabbitmq does not need to create a new exchange when restarting
*
* 1.Direct exchange: relatively simple, the matching rule is: if the routing keys match, the message is delivered to the relevant queue
* 2.Fanout exchange: There is no concept of routing keys. He sends messages to all queues bound to this exchange.。
* 3.Topic exchange: uses the principle of fuzzy matching routing keys to forward messages to the queue
*
* Key: The key value of the queue in the direct-exchange. When the message is sent to the specified key in the direct-exchange, the
* message will be forwarded to the message queue specified by the queue parameter.
*/
@Bean
public DirectExchange placeOrderExchange() {
return new DirectExchange(environment.getProperty("cart.place.order.exchange"), true, false);
}
@Bean
public Binding placeOrderBinding() {
return BindingBuilder.bind(placeOrderQueue()).to(placeOrderExchange()).with(environment.getProperty("cart.place.order.routing.key"));
}
}
Rabbitmq.config
@Configuration
public class RabbitmqConfig {
private static final Logger logger = LogManager.getLogger(RabbitmqConfig.class);
@Autowired
private Environment environment;
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
@Autowired
private ObjectMapper objectMapper;
/**
* singleton can‘t set multi times callback
* @return
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
logger.info("message send succeed:correlationData({}),ack({}),cause({})",correlationData, ack, cause);
} else {
logger.info("message send failed:correlationData({}),ack({}),cause({})",correlationData, ack, cause);
}
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
logger.info("message lose:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory singleListenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory, cachingConnectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.concurrency", int.class));
factory.setMaxConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.max-concurrency", int.class));
factory.setPrefetchCount(environment.getProperty("spring.rabbitmq.listener.simple.prefetch", int.class));
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
/**
* Message persistent: Set the deliveryMode of the message to 2 and the consumer can continue to consume
* the messages after persistence after restarting;
* Use convertAndSend to send a message. The message is persistent by default. The following is the source code:
* new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2;
*
* @param exchangeName
* @param routingKeyName
* @param content
* @param flag
* @param messageId
* @param <T>
*/
public <T> void sendMessage(String exchangeName, String routingKeyName, T content, boolean flag, String messageId) {
logger.info("message send :messageId({}), exchangeName({}), routingKeyName({}), content({}), flag({})",
messageId, exchangeName, routingKeyName, content);
CorrelationData correlationData = new CorrelationData();
try {
if (flag) {
MessageProperties properties = new MessageProperties();
properties.setCorrelationId(messageId);
correlationData.setId(messageId);
this.rabbitTemplate().convertAndSend(exchangeName, routingKeyName,
MessageBuilder.withBody(objectMapper.writeValueAsBytes(content)).andProperties(properties).build(), correlationData);
} else {
this.rabbitTemplate().convertAndSend(exchangeName, routingKeyName, content, correlationData);
}
} catch (Exception e) {
logger.error("error message :e.getMessage({})", e.getMessage());
}
}
}
生产者只需要你调取RabbitmqConfig中的sendMessage()方法,其中如果你的消费者需要correlationDataId这个参数,那么就将flag设置成true,messageId设置成correlationDataId,就可以在消费端获取了。
String messageId = UUID.randomUUID().toString();
rabbitmqConfig.sendMessage(environment.getProperty("cart.create.order.result.exchange"), environment.getProperty("cart.create.order.result.routing.key"), response, flag, messageId);
消费者
@Autowired
private RabbitmqConfig rabbitmqConfig;
@Autowired
private ObjectMapper objectMapper;
@RabbitListener(queues = "${cart.place.order.queue}", containerFactory = "multiListenerContainer")
public void placeOrder(Message message, Channel channel) {
try {
logger.info("place order : message ({})" + objectMapper.readValue(message.getBody(), RequestInventoryItems.class));
logger.info("place order : message ({})" + message.getMessageProperties().getCorrelationId());
// logger.info("place order : message ({})" + );
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception es) {
logger.error("error message :es.getMessage({})", es.getMessage());
try {
//The third parameter is whether to return to the queue.Pay attention to the endless loop.
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (Exception e) {
logger.error("error message :e.getMessage({})", e.getMessage());
}
}
}
以上就是集成Rabbitmq的全部步骤了。
以上是关于SpringBoot 搭建 Rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot整合RabbitMQ消息队列基本环境搭建
springboot+elasticsearch + rabbitMQ实现全文检索(项目搭建)
spring cloudgradle父子项目微服务框架搭建---rabbitMQ延时队列