RocketMQ 自定义消息与延迟消息
Posted 流楚丶格念
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 自定义消息与延迟消息相关的知识,希望对你有一定的参考价值。
文章目录
自定义消息
自定义消息介绍
前边我们发送的消息内容格式都是字符串 ,在生产开发中消息内容格式是复杂的 ,下面介绍如何对消息格式进行自定义。
JSON是互联网开发中非常常用的数据格式 ,它具有格式标准 ,扩展方便的特点 ,
将消息的格式使用JSON进行定义可以提高消息内容的扩展性 ,RocketMQ支持传递JSON数据格式。
代码示例
在生产端和消费端定义模型类 :
package com.yyl.test.rocketmq.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
import java.util.Date;
/**
* @author Administrator
* @version 1.0
**/
@Data
@NoArgsConstructor
@ToString
public class OrderExt implements Serializable
private String id;
private Date createTime;
private Long money;
private String title;
生产端 :
//同步发送对象消息
public void sendMsgByJson(String topic, OrderExt orderExt)
//将对象转成json串发送
rocketMQTemplate.convertAndSend(topic, orderExt);
编写测试方法:
@Test
public void testSendMsgByJson()
OrderExt orderExt = new OrderExt();
orderExt.setTitle("哭唧唧");
orderExt.setId("56456465");
orderExt.setMoney(893L);
orderExt.setCreateTime(new Date());
producerSimple.sendMsgByJson("my-topic-obj",orderExt);
消费端 :
下面实现了RocketMQ传输JSON消息的过程 ,消费端在接收到JSON手动将JSON转成对象
@Component
@RocketMQMessageListener(topic = "my-topic-obj",consumerGroup = "demo‐consumer‐group")
public class ConsumerSimple implements RocketMQListener<String>
//接手到消息调用此方法
@Override public void onMessage(String s)
//如果是json数据,可以将json转为对象
OrderExt orderExt = JSON.parseObject(s, OrderExt.class);
System.out.println(s);
也可以自动转换成对象 ,代码如下 :
定义新的监听类 ,RocketMQListener泛型指定要转换的对象类型。
@Component
@RocketMQMessageListener(topic = "my-topic-obj", consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener<OrderExt>
@Override
public void onMessage(OrderExt orderExt)
System.out.println(orderExt);
启动消费者服务进行监听,运行测试类方法:
延迟消息
延迟消息介绍
延迟消息也叫做定时消息 ,比如在电商项目的交易系统中 ,当用户下单之后超过一段时间之后仍然没有支付 ,此时就需要将该订单关l闭。
要实现该功能的话 ,可以在用户创建订单时就发送一条包含订单内容的延迟消息 ,该消息在 一段时间之后投递给消息消费者 ,当消息消费者接收到该消息后 ,判断该订单的支付状态 ,如果处于未支付状态 , 则将该订单关闭。
RocketMQ的延迟消息实现非常简单 ,只需要发送消息前设置延迟的时间 ,延迟时间存在十八个等级 (1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h )
调用setDelayTimeLevel()设置 与时间相对应的延迟级别即可。
代码示例
同步消息延迟
生产端 :
//发送延迟消息
public void sendMsgByJsonDelay(String topic, OrderExt orderExt)
//构建消息体
Message<OrderExt> message = MessageBuilder.withPayload(orderExt).build();
//String destination, Message<?> message, long timeout(发送消息超时时间,毫秒), int delayLevel 延迟等级
rocketMQTemplate.syncSend(topic, message, 10000, 3);
System.out.printf("send msg : %s", orderExt);
参数 | 说明 |
---|---|
String destination | topic名 |
Message<?> message | 发送的消息 |
long timeout | 发送消息超时时间,毫秒,超过这个时间就会报错 |
int delayLevel | 延迟等级:十八个等级 (1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h ) |
消费端 :
@Component
@RocketMQMessageListener(topic = "my-topic-obj", consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener<OrderExt>
@Override
public void onMessage(OrderExt orderExt)
System.out.println(orderExt);
测试 :
@Test
public void testSendMsgByJsonDelay()
OrderExt orderExt = new OrderExt();
orderExt.setTitle("啊哭哭哭哭哭");
orderExt.setId("110110110110");
orderExt.setMoney(1111L);
orderExt.setCreateTime(new Date());
producerSimple.sendMsgByJsonDelay("my-topic-obj",orderExt);
启动消费者服务监听消息,运行测试类方法,几秒后才接收到消息:
异步消息延迟
生产端 :
// 发送异步延迟消息
public void sendAsyncMsgByJsonDelay(String topic, OrderExt orderExt)
throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException
//消息内容将orderExt转为json
String json = this.rocketMQTemplate.getObjectMapper().writeValueAsString(orderExt);
org.apache.rocketmq.common.message.Message message =
new org.apache.rocketmq.common.message.Message(topic, json.getBytes(Charset.forName("UTF-8"))); //设置延迟等级
message.setDelayTimeLevel(3);
//发送异步消息
this.rocketMQTemplate.getProducer().send(message, new SendCallback()
@Override
public void onSuccess(SendResult sendResult)
System.out.println(sendResult);
@Override
public void onException(Throwable throwable)
System.out.println(throwable.getMessage());
);
System.out.printf("send msg : %s", orderExt);
消费端 :
@Component
@RocketMQMessageListener(topic = "my-topic-obj", consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener<OrderExt>
@Override
public void onMessage(OrderExt orderExt)
System.out.println(orderExt);
测试
//测试发送异步消息
@Test
public void testSendAsyncMsgByJsonDelay()
throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException, RemotingException, MQClientException, MQBrokerException, JsonProcessingException
OrderExt orderExt = new OrderExt();
orderExt.setId(UUID.randomUUID().toString());
orderExt.setCreateTime(new Date());
orderExt.setMoney(168L);
orderExt.setTitle("订单测试 啊啊啊啊啊啊啊啊啊啊啊");
this.producerSimple.sendAsyncMsgByJsonDelay("my-topic-obj", orderExt);
System.out.println("end...");
Thread.sleep(20000);
以上是关于RocketMQ 自定义消息与延迟消息的主要内容,如果未能解决你的问题,请参考以下文章