关于RocketMQ的基础API操作——这一篇就够了

Posted 默辨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于RocketMQ的基础API操作——这一篇就够了相关的知识,希望对你有一定的参考价值。

关于RocketMQ的基础操作



写在前面:本文中出现的示例及代码,均来源RocketMQ的官网以及B站尚硅谷的RocketMQ视频教程。结合整理出了一个相对简单的笔记。
本文仅为自己学习RocketMQ的一个知识笔记,也希望这个笔记能帮助到你。

一、基础API操作

下面的案例需要引入对应的maven依赖

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.8.0</version>
    </dependency>
</dependencies>

1、 普通消息

普通消息的消息生产者分别通过三种方式发送消息:同步发送、异步发送以及单向发送。以DefaultMQProducer为起点。

1.1、消息生产端

1)Producer端发送同步消息——send

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

public class SyncProducer 
	public static void main(String[] args) throws Exception 
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 设置NameServer的地址
    	producer.setNamesrvAddr("127.0.0.1:9876");
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 100; i++) 
    	    // 创建消息,并指定Topic,Tag和消息体
    	    Message msg = new Message("TopicTest" /* Topic */,
        	"TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
    	
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    


2)发送异步消息——send+SendCallback

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer。 所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往Broker发送消息的时候也要作为服务端提供服务。

public class AsyncProducer 
	public static void main(String[] args) throws Exception 
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 设置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
    	// 启动Producer实例
        producer.start();
        // 设置异步重试次数为0次,表示失败后不重试。默认2次
        producer.setRetryTimesWhenSendAsyncFailed(0);
	
	int messageCount = 100;
        // 根据消息数量实例化倒计时计算器
	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
    	for (int i = 0; i < messageCount; i++) 
                final int index = i;
            	// 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收异步返回结果的回调
                producer.send(msg, new SendCallback() 
                    @Override
                    public void onSuccess(SendResult sendResult) 
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    
                    @Override
                    public void onException(Throwable e) 
                        countDownLatch.countDown();
      	                System.out.printf("%-10d Exception %s %n", index, e);
      	                e.printStackTrace();
                    
            	);
    	
	// 等待5s
	countDownLatch.await(5, TimeUnit.SECONDS);
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    


3)单向发送消息的样例——sendOneway

单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返 回ACK。该方式的消息发送效率最高,但消息可靠性较差。

这种方式主要用在不特别关心发送结果的场景,例如日志发送

public class OnewayProducer 
	public static void main(String[] args) throws Exception
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 设置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 100; i++) 
        	// 创建消息,并指定Topic,Tag和消息体
        	Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 发送单向消息,没有任何返回结果
        	producer.sendOneway(msg);

    	
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    



1.2、消息消费端

消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。

通常情况下,用**推模式(DefaultMQPushConsumer)**比较简单,实际上RocketMQ的推模式也是由拉模式(DefaultMQPullConsumerImpl)封装出来的

4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl

1)push模式——DefaultMQPushConsumer

public class Consumer 

	public static void main(String[] args) throws InterruptedException, MQClientException 

    	// 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

    	// 设置NameServer的地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

    	// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("TopicTest", "*");
    	// 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
	


2)pull模式——DefaultMQPullConsumer

public class PullConsumer 
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException 
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) 
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) 
                try 
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) 
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    
                 catch (Exception e) 
                    e.printStackTrace();
                
            
        
        consumer.shutdown();
    

    private static long getMessageQueueOffset(MessageQueue mq) 
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;
        return 0;
    

    private static void putMessageQueueOffset(MessageQueue mq, long offset) 
        OFFSE_TABLE.put(mq, offset);
    



2、顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

2.1、消息生产端

public class Producer 

   public static void main(String[] args) throws Exception 
       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
       producer.setNamesrvAddr("127.0.0.1:9876");
       producer.start();

       String[] tags = new String[]"TagA", "TagC", "TagD";

       // 订单列表
       List<OrderStep> orderList = new Producer().buildOrders();

       Date date = new Date();
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       String dateStr = sdf.format(date);
       for (int i = 0; i < 10; i++) 
           // 加个时间前缀
           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

           SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 
                   Long id = (Long) arg;  //根据订单id选择发送queue
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               
           , orderList.get(i).getOrderId());//订单id

           System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
               sendResult.getSendStatus(),
               sendResult.getMessageQueue().getQueueId(),
               body));
       

       producer.shutdown();
   

   /**
    * 订单的步骤
    */
   private static class OrderStep 
       private long orderId;
       private String desc;

       public long getOrderId() 
           return orderId;
       

       public void setOrderId(long orderId) 
           this.orderId = orderId;
       

       public String getDesc() 
           return desc;
       

       public void setDesc(String desc) 
           this.desc = desc;
       

       @Override
       public String toString() 
           return "OrderStep" +
               "orderId=" + orderId +
               ", desc='" + desc + '\\'' +
               '';
       
   

   /**
    * 生成模拟订单数据
    */
   private List<OrderStep> buildOrders() 
       List<OrderStep> orderList = new ArrayList<OrderStep>();

       OrderStep orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("推送");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("完成");
       orderList.add关于el-upload看这一篇就够了

关于el-upload看这一篇就够了

关于el-upload看这一篇就够了

关于el-upload看这一篇就够了

python从入门到提高,这一篇就够了

关于Java基本语法这一篇就够了