RocketMQ

Posted Mr.years

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ相关的知识,希望对你有一定的参考价值。

1. 消费端集群消费(负载均衡)

 示例代码:

/**
 * Producer,发送消息
 * 
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("message_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "Tag1",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

/**
 * Consumer,订阅消息
 */
public class Consumer1 {

    public Consumer1() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                    
                    System.out.println("======暂停=====");
                    Thread.sleep(60000);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer1 consumer1 = new Consumer1();
        System.out.println("Consumer1 Started.");
    }
}

/**
 * Consumer,订阅消息
 */
public class Consumer2 {

    public Consumer2() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer2 consumer2 = new Consumer2();
        System.out.println("Consumer2 Started.");
    }
}
View Code

一个生产者,两个消费者,注意两个消费者的组名要一样。

先启动两个消费者(customer1,customer2),通过控制台查看如下:

 

再启动生产者生成100条消息,消费情况如下:

 

生成的100条消息被customer1和customer2平均的消费了。可以通过consumer.setAllocateMessageQueueStrategy去设置分配策略。

BTW:这是默认的模式,可以通过consumer.setMessageModel设置,MessageModel.CLUSTERING | MessageModel.BROADCASTING,如果是广播消费,则每个客户端都会收到生产端的所有消息

2.消息未响应会重发

 代码示例:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("message_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "Tag1",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}


public class Consumer1 {

    public Consumer1() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                    
                    System.out.println("======暂停=====");
                    Thread.sleep(600000);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer1 consumer1 = new Consumer1();
        System.out.println("Consumer1 Started.");
    }
}


public class Consumer2 {

    public Consumer2() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer2 consumer2 = new Consumer2();
        System.out.println("Consumer2 Started.");
    }
}
View Code

先启动consumer1,再启动consumer2,最后启动producer

consumer1收到了消息,consumer2没有收到消息,这时把consumer1强制停止,也就是说consumer1不会给MQ返回响应,查看结果:

 

consumer2也收到消息了,说明在MQ没收到消费端响应的情况下,会重发消息。

 3. 修改topic的队列数

默认的队列数是4个,可以从执行结果中看出:queueId都是0-3

细节可以看https://www.cnblogs.com/dyfh/p/4113677.html

可以增加设置producer.createTopic("TopicTest", "TopicTest", 8);

 

 

以上是关于RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章

配置 kafka 同步刷盘

3RocketMQ 源码解析之 源代码环境搭建

3RocketMQ 源码解析之 源代码环境搭建

3RocketMQ 源码解析之 源代码环境搭建

RocketMq 修改日志保存目录 无需修改代码

30 RocketMQ事务消息的代码实现细节