RocketMQ-quickstart

Posted 寻找风口的猪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ-quickstart相关的知识,希望对你有一定的参考价值。

基本概念:

  Producer:消息生产者,负责生产消息,一般由业务系统负责生产消息。

  Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。

  Push Consumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Linsener接口方法

  Pull Consumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制

  Consumer Group:一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。

  Broker:消息中转角色,负责存储消息,转发消息,一般也称为Server,在JMS规范中成为Provider

代码示例:

 生产者:

package org.hope.lee.producer;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

public class Producer {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
//        producer.setNamesrvAddr("192.168.31.176:9876");
        producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        try {
            // 设置实例名称
            producer.setInstanceName("quick_start_producer");
            // 设置重试次数
            producer.setRetryTimesWhenSendFailed(3);
            // 开启生产者
            producer.start();
            // 创建一条消息
            Message msg = new Message("PushTopic_tt1", "TagB", "OrderID0034", "uniform_just_for_test".getBytes());
            //目前发现3.2.6版本没有这个方法,3.5.3版本有这个方法,并且必须要设置为false否则会报错
//            producer.setVipChannelEnabled(false);
            SendResult send = producer.send(msg);
            System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
            //发送,并触发回调函数
            /*producer.send(msg, new SendCallback(){
                //成功回调函数
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult.getSendStatus());
                    System.out.println("成功");
                }
                //异常回调函数
                @Override
                public void onException(Throwable e) {
                    System.out.println("失败了" +  e.getMessage());
                }
            });*/
            
            //获取某个主题的消息队列  
            /*List<MessageQueue> messageQueues = producer  
                    .fetchPublishMessageQueues("PushTopic");  
            System.out.println(messageQueues.size());  */
            
            
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } 
        producer.shutdown();
    }
}

 

 消费者:

package org.hope.lee.consumer;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.sun.org.apache.xpath.internal.SourceTree;

public class Consumer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer");
        consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        //批量消费,每次拉取10条
        consumer.setConsumeMessageBatchMaxSize(10);
        try {
//            consumer.setInstanceName("quick_start_consumer");
            //3.2.6这个版本没有这个方法,3.5.3版本要设置这个方法为false,否则取不到topic
//            consumer.setVipChannelEnabled(false);

            //程序第一次启动从消息队列头取数据
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("PushTopic_tt1", "*");
//            consumer.subscribe("PushTopic_tt1", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MessageListenerConcurrently() {


                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for(MessageExt msg : msgs) {
                        System.out.println("-------->" + msg.getKeys());
                        System.out.println("-------->" + msg.getMsgId());
                        System.out.println("-------->" + msg.getQueueId());
                        System.out.println("-------->" + msg.getQueueOffset());
                        System.out.println("-------->" + msg.getBody().toString());
                        System.out.println("-------->" + msg.toString());
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started."); 
//            consumer.suspend();
            
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        
        
    }
}

 

 https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

问题:

[1] producer生产消息报 “No route info of this topic” 异常。

解决方案:

阿里的github issues : https://github.com/alibaba/RocketMQ/issues/44

网上参考,把rocketmq的配置文件broker-a.properties中的autoCreateTopicEnable值改成true

以上是关于RocketMQ-quickstart的主要内容,如果未能解决你的问题,请参考以下文章

微信小程序代码片段

VSCode自定义代码片段——CSS选择器

谷歌浏览器调试jsp 引入代码片段,如何调试代码片段中的js

片段和活动之间的核心区别是啥?哪些代码可以写成片段?

VSCode自定义代码片段——.vue文件的模板

VSCode自定义代码片段6——CSS选择器