Spring boot 2.x 集成Rocketmq实现事物消息
Posted yx88
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring boot 2.x 集成Rocketmq实现事物消息相关的知识,希望对你有一定的参考价值。
1.引入相关Maven依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xxx</groupId> <artifactId>rocket</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.5.RELEASE</version> <relativePath /> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-actuator-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.14</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.配置生产者:
2.1 application.properties 配置如下:
####producer rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group= my-group rocketmq.producer.send-message-timeout= 300000 rocketmq.producer.compress-message-body-threshold= 4096 rocketmq.producer.max-message-size= 4194304 rocketmq.producer.retry-times-when-send-async-failed= 0 rocketmq.producer.retry-next-server= true rocketmq.producer.retry-times-when-send-failed= 2
2.2 事务监听:
package com.xxx.listener; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; @Slf4j @RocketMQTransactionListener(txProducerGroup = "rocket") public class TransactionListenerImpl implements RocketMQLocalTransactionListener @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) System.out.println("本地事务和消息发送:" + JSON.toJSONString(message)); return RocketMQLocalTransactionState.UNKNOWN; @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) System.out.println("回查信息:" + JSON.toJSONString(message)); return RocketMQLocalTransactionState.COMMIT;
2.3 发送事物消息:
package com.xxx; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @Slf4j @SpringBootApplication public class SpringBootRocketMqApplication public static void main(String[] args) throws InterruptedException ConfigurableApplicationContext context = SpringApplication.run(SpringBootRocketMqApplication.class, args); RocketMQTemplate template = context.getBean(RocketMQTemplate.class); while (true) String msg = "demo msg test"; log.info("开始发送消息:"+msg); Message message = MessageBuilder.withPayload(msg).build(); TransactionSendResult result = template.sendMessageInTransaction("rocket", "ts", message, null); log.info("消息发送响应信息:"+result.toString()); Thread.sleep(10);
3. 配置消费者:
3.1 application.properties 配置如下:
rocketmq.name-server=127.0.0.1:9876
3.2 消费者监听:
package com.xxx.listener; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component @RocketMQMessageListener(topic = "ts", consumerGroup = "my-consumer-group") public class ConsumerLifecycleListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener @Override public void onMessage(String s) // 实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用 log.info("实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用"); @Override public void prepareStart(DefaultMQPushConsumer consumer) consumer.registerMessageListener(new MessageListenerConcurrently() @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) for (MessageExt messageExt : msgs) System.out.println("重试次数:" + messageExt.getReconsumeTimes()); System.out.println("接受到的消息:" + new String(messageExt.getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; );
4. 延时消息
RocketMQ 目前只支持固定精度的定时消息。
延迟级别(18个等级)
1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend(topic, message,1000,2);//表示延时5秒
5. 顺序消息
asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback)
通过指定hashkey实现顺序消费,同步的hashkey会按顺序消费
以上是关于Spring boot 2.x 集成Rocketmq实现事物消息的主要内容,如果未能解决你的问题,请参考以下文章
spring boot 2.X 集成 Elasticsearch 5.x 实战 增删改查
Spring boot 2.x 集成Rocketmq实现事物消息
ELK3.spring boot 2.X集成ES spring-data-ES 进行CRUD操作 完整版