Spring整合RocketMQ实现入门案例
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring整合RocketMQ实现入门案例相关的知识,希望对你有一定的参考价值。
此前我们学习了如何在linux安装RocketMQ,以及使用Java程序公网连接的一系列准备工作。
现在我们来学习Spring整合RocketMQ的第一个案例!
文章目录
1 创建maven项目
创建一个maven项目。引入springboot和rocketmq的pom依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--RocketMQ依赖-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
</dependencies>
建立如下目录结构:
项目启动类RocketMQApplication:
@SpringBootApplication
public class RocketMQApplication
public static void main( String[] args )
SpringApplication.run( RocketMQApplication.class, args );
2 配置文件
在配置文件中,我们编写rocketmq的配置信息。
apache:
rocketmq:
# 消费者的组名
consumer: FirstConsumer
# 生产者的组名
producer: FirstProducer
# NameServer地址
namesrvAddr: xx.xx.xx.xx:9876
#topic和tag
topic: FirstTopic
tag: first
3 生产者
通过创建一个DefaultMQProducer对象作为消息生产者,通过调用start方法启动生产者,并且将其交给spring容器管理。
/**
* @author liuxin95
* @date 2022/3/29 17:22
*/
@Service
public class Producer
private static final Logger logger = LoggerFactory.getLogger( Producer.class );
@Value( "$apache.rocketmq.producer" )
private String producerGroup;
@Value( "$apache.rocketmq.namesrvAddr" )
private String namesrvAddr;
@Bean
public DefaultMQProducer defaultMQProducer()
//生产者的组名
DefaultMQProducer producer = new DefaultMQProducer( producerGroup );
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr( namesrvAddr );
try
//调用start初始化
producer.start();
catch( Exception e )
logger.error( "error", e );
producer.shutdown();
return producer;
4 消费者
通过创建一个DefaultMQPushConsumer的对象作为消费者,还需要配置一个消息监听器的实例。当消费者收到消息之后,会自动调用消息监听器MessageListener的consumeMessage方法进行消息消费。最后通过start方法启动消费者。
/**
* @author liuxin95
* @date 2022/3/29 17:22
*/
@Service
public class Consumer
private static final Logger logger = LoggerFactory.getLogger( Consumer.class );
@Value( "$apache.rocketmq.consumer" )
private String consumerGroup;
@Value( "$apache.rocketmq.namesrvAddr" )
private String namesrvAddr;
@Value( "$apache.rocketmq.topic" )
private String topic;
@Value( "$apache.rocketmq.tag" )
private String tag;
@PostConstruct
public void defaultMQPushConsumer()
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( consumerGroup );
//NameServer地址,多个地址以;隔开
consumer.setNamesrvAddr( namesrvAddr );
try
//订阅名为的firstTopic的topic消息,且tag为first
consumer.subscribe( topic, tag );
//如果第一次启动该消费者,那么从头开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET );
/*
* 注册一个并行消息监听器
*/
consumer.registerMessageListener( ( MessageListenerConcurrently )( list, context ) ->
try
System.out.println( "size: " + list.size() );
for( MessageExt messageExt : list )
System.out.println( "messageExt: " + messageExt );
String messageBody = new String( messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET );
System.out.println( "messageBody: " + messageBody );
if( "1".equals( messageBody ) )
throw new Exception( "consume error" );
catch( Exception e )
logger.error( "error", e );
//并行失败&稍后重试
//实际会放入重试队列,默认最大重试16次,默认从level3的延迟开始,最终还是失败则存入死信队列
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
//启动消费者
consumer.start();
catch( Exception e )
logger.error( "error", e );
consumer.shutdown();
5 控制器
由于生产者交给了Spring管理,因此我们可以直接引入producer的实例,然后我们在接口中发送消息,如果消息为“1”,则表示错误消息,将会出席拿消息重试的现象。其他值则会消费成功。
/**
* @author liuxin95
* @date 2022/3/31 17:23
*/
@RestController
public class MqController
@Resource
private DefaultMQProducer defaultMQProducer;
@Value( "$apache.rocketmq.topic" )
private String topic;
@Value( "$apache.rocketmq.tag" )
private String tag;
@GetMapping( "/testMq/flag" )
public String testMq( @PathVariable @NotNull String flag )
throws UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException, MQClientException
//创建一个消息实例,包含 topic、tag 和 消息体
Message message = new Message( topic, tag, flag.getBytes( RemotingHelper.DEFAULT_CHARSET ) );
//发送该消息,获取发送结果
SendResult result = defaultMQProducer.send( message );
return result.getMsgId() + "----" + result.getSendStatus();
6 测试
启动项目。访问http://localhost:8080/testMq/xxx发送消息,可以观察到消息消费成功,访问http://localhost:8080/testMq/1发送错误消息,可以观察到消息消费失败,并且重试消费。关于消息重试:RocketMQ的消息重试(消息重投)。
以上是关于Spring整合RocketMQ实现入门案例的主要内容,如果未能解决你的问题,请参考以下文章
八.RocketMQ极简入门-在SpringBoot中使用RocketMQ
Spring 从入门到精通系列 08——使用纯注解的方式实现 IOC 案例与 Junit 整合