Spring boot 2.x 集成Rocketmq实现事物消息

Posted yx88

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring boot 2.x 集成Rocketmq实现事物消息相关的知识,希望对你有一定的参考价值。

1.引入相关Maven依赖:

技术图片
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xxx</groupId>
    <artifactId>rocket</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-actuator-autoconfigure</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
              <version>2.0.1</version>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.14</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
View Code

 

2.配置生产者:

  2.1 application.properties 配置如下:

####producer
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=  my-group
rocketmq.producer.send-message-timeout= 300000
rocketmq.producer.compress-message-body-threshold= 4096
rocketmq.producer.max-message-size= 4194304
rocketmq.producer.retry-times-when-send-async-failed= 0
rocketmq.producer.retry-next-server= true
rocketmq.producer.retry-times-when-send-failed= 2

  2.2 事务监听:

技术图片
package com.xxx.listener;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

@Slf4j
@RocketMQTransactionListener(txProducerGroup = "rocket")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener 

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) 
        System.out.println("本地事务和消息发送:" + JSON.toJSONString(message));

        return RocketMQLocalTransactionState.UNKNOWN;
    

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) 
        System.out.println("回查信息:" + JSON.toJSONString(message));

        return RocketMQLocalTransactionState.COMMIT;
    
View Code

  2.3  发送事物消息:

技术图片
package com.xxx;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@Slf4j
@SpringBootApplication
public class SpringBootRocketMqApplication 
    public static void main(String[] args) throws InterruptedException 
        ConfigurableApplicationContext context = SpringApplication.run(SpringBootRocketMqApplication.class, args);

        RocketMQTemplate template = context.getBean(RocketMQTemplate.class);
        while (true) 
            String msg = "demo msg test";
            log.info("开始发送消息:"+msg);

            Message message = MessageBuilder.withPayload(msg).build();
            TransactionSendResult result = template.sendMessageInTransaction("rocket", "ts", message, null);
            log.info("消息发送响应信息:"+result.toString());

            Thread.sleep(10);
        
    
View Code

 

3. 配置消费者:

  3.1 application.properties 配置如下:

    rocketmq.name-server=127.0.0.1:9876

 

  3.2 消费者监听:

技术图片
package com.xxx.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
@RocketMQMessageListener(topic = "ts", consumerGroup = "my-consumer-group")
public class ConsumerLifecycleListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener 
    @Override
    public void onMessage(String s) 
        // 实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
        log.info("实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用");
    

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) 
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                for (MessageExt messageExt : msgs) 
                    System.out.println("重试次数:" + messageExt.getReconsumeTimes());

                    System.out.println("接受到的消息:" + new String(messageExt.getBody()));
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
    
View Code

 

4. 延时消息
  RocketMQ 目前只支持固定精度的定时消息。

  延迟级别(18个等级)

  1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h  Message message = MessageBuilder.withPayload(msg).build();

  rocketMQTemplate.syncSend(topic, message,1000,2);//表示延时5秒

5. 顺序消息
  asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback)

     通过指定hashkey实现顺序消费,同步的hashkey会按顺序消费

  

以上是关于Spring boot 2.x 集成Rocketmq实现事物消息的主要内容,如果未能解决你的问题,请参考以下文章

spring boot 2.X 集成 Elasticsearch 5.x 实战 增删改查

Spring boot 2.x 集成Rocketmq实现事物消息

ELK3.spring boot 2.X集成ES spring-data-ES 进行CRUD操作 完整版

spring boot 集成 redis lettuce(jedis)

Spring Boot 集成 websocket(广播式)

Spring Boot 2.x 实践记:@SpringBootTest