RocketMQ 自定义消息与延迟消息

Posted 流楚丶格念

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 自定义消息与延迟消息相关的知识,希望对你有一定的参考价值。

文章目录

自定义消息

自定义消息介绍

前边我们发送的消息内容格式都是字符串 ,在生产开发中消息内容格式是复杂的 ,下面介绍如何对消息格式进行自定义。

JSON是互联网开发中非常常用的数据格式 ,它具有格式标准 ,扩展方便的特点 ,

将消息的格式使用JSON进行定义可以提高消息内容的扩展性 ,RocketMQ支持传递JSON数据格式。

代码示例

在生产端和消费端定义模型类 :

package com.yyl.test.rocketmq.model;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;
import java.util.Date;

/**
 * @author Administrator
 * @version 1.0
 **/
@Data
@NoArgsConstructor
@ToString
public class OrderExt implements Serializable 

    private String id;

    private Date createTime;

    private Long money;

    private String title;



生产端 :

//同步发送对象消息
public void sendMsgByJson(String topic, OrderExt orderExt) 
    //将对象转成json串发送
    rocketMQTemplate.convertAndSend(topic, orderExt);

编写测试方法:

@Test
public void testSendMsgByJson()
    OrderExt orderExt = new OrderExt();
    orderExt.setTitle("哭唧唧");
    orderExt.setId("56456465");
    orderExt.setMoney(893L);
    orderExt.setCreateTime(new Date());
    producerSimple.sendMsgByJson("my-topic-obj",orderExt);

消费端 :

下面实现了RocketMQ传输JSON消息的过程 ,消费端在接收到JSON手动将JSON转成对象

@Component 
@RocketMQMessageListener(topic = "my-topic-obj",consumerGroup = "demo‐consumer‐group") 
public class ConsumerSimple implements RocketMQListener<String> 
    //接手到消息调用此方法 
    @Override public void onMessage(String s) 
        //如果是json数据,可以将json转为对象 
        OrderExt orderExt = JSON.parseObject(s, OrderExt.class);
        System.out.println(s);
    

也可以自动转换成对象 ,代码如下 :

定义新的监听类 ,RocketMQListener泛型指定要转换的对象类型。

@Component
@RocketMQMessageListener(topic = "my-topic-obj", consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener<OrderExt> 

    @Override
    public void onMessage(OrderExt orderExt) 
        System.out.println(orderExt);
    

启动消费者服务进行监听,运行测试类方法:

延迟消息

延迟消息介绍

延迟消息也叫做定时消息 ,比如在电商项目的交易系统中 ,当用户下单之后超过一段时间之后仍然没有支付 ,此时就需要将该订单关l闭。

要实现该功能的话 ,可以在用户创建订单时就发送一条包含订单内容的延迟消息 ,该消息在 一段时间之后投递给消息消费者 ,当消息消费者接收到该消息后 ,判断该订单的支付状态 ,如果处于未支付状态 , 则将该订单关闭。

RocketMQ的延迟消息实现非常简单 ,只需要发送消息前设置延迟的时间 ,延迟时间存在十八个等级 (1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h )
调用setDelayTimeLevel()设置 与时间相对应的延迟级别即可。

代码示例

同步消息延迟

生产端 :

//发送延迟消息
public void sendMsgByJsonDelay(String topic, OrderExt orderExt) 
    //构建消息体
    Message<OrderExt> message = MessageBuilder.withPayload(orderExt).build();
	//String destination, Message<?> message, long timeout(发送消息超时时间,毫秒), int delayLevel 延迟等级
    rocketMQTemplate.syncSend(topic, message, 10000, 3);
    System.out.printf("send msg : %s", orderExt);

参数说明
String destinationtopic名
Message<?> message发送的消息
long timeout发送消息超时时间,毫秒,超过这个时间就会报错
int delayLevel延迟等级:十八个等级 (1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h )

消费端 :

@Component
@RocketMQMessageListener(topic = "my-topic-obj", consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener<OrderExt> 

    @Override
    public void onMessage(OrderExt orderExt) 
        System.out.println(orderExt);
    

测试 :

@Test
public void testSendMsgByJsonDelay()
    OrderExt orderExt = new OrderExt();
    orderExt.setTitle("啊哭哭哭哭哭");
    orderExt.setId("110110110110");
    orderExt.setMoney(1111L);
    orderExt.setCreateTime(new Date());
    producerSimple.sendMsgByJsonDelay("my-topic-obj",orderExt);

启动消费者服务监听消息,运行测试类方法,几秒后才接收到消息:

异步消息延迟

生产端 :

// 发送异步延迟消息
public void sendAsyncMsgByJsonDelay(String topic, OrderExt orderExt)
        throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException 
    //消息内容将orderExt转为json
    String json = this.rocketMQTemplate.getObjectMapper().writeValueAsString(orderExt);
    org.apache.rocketmq.common.message.Message message =
            new org.apache.rocketmq.common.message.Message(topic, json.getBytes(Charset.forName("UTF-8"))); //设置延迟等级
    message.setDelayTimeLevel(3);
    //发送异步消息
    this.rocketMQTemplate.getProducer().send(message, new SendCallback() 
        @Override
        public void onSuccess(SendResult sendResult) 
            System.out.println(sendResult);
        
        @Override
        public void onException(Throwable throwable) 
            System.out.println(throwable.getMessage());
        
    );
    System.out.printf("send msg : %s", orderExt);

消费端 :

@Component
@RocketMQMessageListener(topic = "my-topic-obj", consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener<OrderExt> 

    @Override
    public void onMessage(OrderExt orderExt) 
        System.out.println(orderExt);
    

测试

//测试发送异步消息
@Test
public void testSendAsyncMsgByJsonDelay() 
throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException, RemotingException, MQClientException, MQBrokerException, JsonProcessingException 
    OrderExt orderExt = new OrderExt();
    orderExt.setId(UUID.randomUUID().toString());
    orderExt.setCreateTime(new Date());
    orderExt.setMoney(168L);
    orderExt.setTitle("订单测试   啊啊啊啊啊啊啊啊啊啊啊");
    this.producerSimple.sendAsyncMsgByJsonDelay("my-topic-obj", orderExt);
    System.out.println("end...");
    Thread.sleep(20000);


以上是关于RocketMQ 自定义消息与延迟消息的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ系列广播与延迟消息

RocketMQ消息堆积与消息延迟

RocketMQ使用延迟消息

RocketMQ使用延迟消息

rocketmq延时消息自定义配置;topic下tag使用

RocketMQ源码(24)—DefaultMQPushConsumer延迟消息源码