Spring整合RocketMQ实现入门案例

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring整合RocketMQ实现入门案例相关的知识,希望对你有一定的参考价值。

此前我们学习了如何在linux安装RocketMQ,以及使用Java程序公网连接的一系列准备工作。

现在我们来学习Spring整合RocketMQ的第一个案例!

文章目录

1 创建maven项目

创建一个maven项目。引入springboot和rocketmq的pom依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.5.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <!--RocketMQ依赖-->
    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.3</version>
    </dependency>
</dependencies>

建立如下目录结构:

项目启动类RocketMQApplication:

@SpringBootApplication
public class RocketMQApplication

   public static void main( String[] args )

      SpringApplication.run( RocketMQApplication.class, args );
   


2 配置文件

在配置文件中,我们编写rocketmq的配置信息。

apache:
  rocketmq:
    # 消费者的组名
    consumer: FirstConsumer
    # 生产者的组名
    producer: FirstProducer
    # NameServer地址
    namesrvAddr: xx.xx.xx.xx:9876
    #topic和tag
    topic: FirstTopic
    tag: first

3 生产者

通过创建一个DefaultMQProducer对象作为消息生产者,通过调用start方法启动生产者,并且将其交给spring容器管理。

/**
 * @author liuxin95
 * @date 2022/3/29 17:22
 */
@Service
public class Producer

   private static final Logger logger = LoggerFactory.getLogger( Producer.class );

   @Value( "$apache.rocketmq.producer" )
   private String producerGroup;
   @Value( "$apache.rocketmq.namesrvAddr" )
   private String namesrvAddr;

   @Bean
   public DefaultMQProducer defaultMQProducer()
      //生产者的组名
      DefaultMQProducer producer = new DefaultMQProducer( producerGroup );
      //指定NameServer地址,多个地址以 ; 隔开
      producer.setNamesrvAddr( namesrvAddr );

      try
         //调用start初始化
         producer.start();
      
      catch( Exception e )
         logger.error( "error", e );
         producer.shutdown();
      
      return producer;
   


4 消费者

通过创建一个DefaultMQPushConsumer的对象作为消费者,还需要配置一个消息监听器的实例。当消费者收到消息之后,会自动调用消息监听器MessageListener的consumeMessage方法进行消息消费。最后通过start方法启动消费者。

/**
 * @author liuxin95
 * @date 2022/3/29 17:22
 */
@Service
public class Consumer

   private static final Logger logger = LoggerFactory.getLogger( Consumer.class );

   @Value( "$apache.rocketmq.consumer" )
   private String consumerGroup;
   @Value( "$apache.rocketmq.namesrvAddr" )
   private String namesrvAddr;
   @Value( "$apache.rocketmq.topic" )
   private String topic;
   @Value( "$apache.rocketmq.tag" )
   private String tag;

   @PostConstruct
   public void defaultMQPushConsumer()
      //创建消费者
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( consumerGroup );

      //NameServer地址,多个地址以;隔开
      consumer.setNamesrvAddr( namesrvAddr );
      try
         //订阅名为的firstTopic的topic消息,且tag为first
         consumer.subscribe( topic, tag );

         //如果第一次启动该消费者,那么从头开始消费
         //如果非第一次启动,那么按照上次消费的位置继续消费
         consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET );

         /*
          * 注册一个并行消息监听器
          */
         consumer.registerMessageListener( ( MessageListenerConcurrently )( list, context ) -> 
            try
               System.out.println( "size: " + list.size() );
               for( MessageExt messageExt : list )
                  System.out.println( "messageExt: " + messageExt );
                  String messageBody = new String( messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET );
                  System.out.println( "messageBody: " + messageBody );

                  if( "1".equals( messageBody ) )
                     throw new Exception( "consume error" );
                  
               
            
            catch( Exception e )
               logger.error( "error", e );
               //并行失败&稍后重试
               //实际会放入重试队列,默认最大重试16次,默认从level3的延迟开始,最终还是失败则存入死信队列
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            
            //消费成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          );
         //启动消费者
         consumer.start();
      
      catch( Exception e )
         logger.error( "error", e );
         consumer.shutdown();
      
   

5 控制器

由于生产者交给了Spring管理,因此我们可以直接引入producer的实例,然后我们在接口中发送消息,如果消息为“1”,则表示错误消息,将会出席拿消息重试的现象。其他值则会消费成功。

/**
 * @author liuxin95
 * @date 2022/3/31 17:23
 */
@RestController
public class MqController

   @Resource
   private DefaultMQProducer defaultMQProducer;
   @Value( "$apache.rocketmq.topic" )
   private String topic;
   @Value( "$apache.rocketmq.tag" )
   private String tag;

   @GetMapping( "/testMq/flag" )
   public String testMq( @PathVariable @NotNull String flag )
         throws UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException, MQClientException

      //创建一个消息实例,包含 topic、tag 和 消息体
      Message message = new Message( topic, tag, flag.getBytes( RemotingHelper.DEFAULT_CHARSET ) );
      //发送该消息,获取发送结果
      SendResult result = defaultMQProducer.send( message );
      return result.getMsgId() + "----" + result.getSendStatus();
   

6 测试

启动项目。访问http://localhost:8080/testMq/xxx发送消息,可以观察到消息消费成功,访问http://localhost:8080/testMq/1发送错误消息,可以观察到消息消费失败,并且重试消费。关于消息重试:RocketMQ的消息重试(消息重投)

以上是关于Spring整合RocketMQ实现入门案例的主要内容,如果未能解决你的问题,请参考以下文章

八.RocketMQ极简入门-在SpringBoot中使用RocketMQ

Spring 从入门到精通系列 08——使用纯注解的方式实现 IOC 案例与 Junit 整合

RocketMQ(二十四)整合SpringBoot

ActiveMQ入门案例以及整合Spring的简单实用

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息