rocketmq的消费者集群消费消息,实现负载均衡

Posted zhangzhiqin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq的消费者集群消费消息,实现负载均衡相关的知识,希望对你有一定的参考价值。

package com.bfxy.rocketmq.model;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.bfxy.rocketmq.constants.Const;

public class Consumer1 {

public Consumer1() {
try {
String group_name = "test_model_consumer_name";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic2", "TagA");
consumer.setMessageModel(MessageModel.CLUSTERING);
//consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}


class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
//if(tags.equals("TagA")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

}

public static void main(String[] args) {
Consumer1 c1 = new Consumer1();
System.out.println("c1 start..");

}
}

//==========================================================

package com.bfxy.rocketmq.model;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.bfxy.rocketmq.constants.Const;

public class Consumer2 {

public Consumer2() {
try {
String group_name = "test_model_consumer_name";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic2", "TagA");
consumer.setMessageModel(MessageModel.CLUSTERING);
//consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}


class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
//if(tags.equals("TagA")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

}

public static void main(String[] args) {
Consumer2 c2 = new Consumer2();
System.out.println("c2 start..");

}
}





























































































以上是关于rocketmq的消费者集群消费消息,实现负载均衡的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ消费者消息队列负载均衡

RocketMQ源码(16)—消费者负载均衡服务RebalanceService入口源码

RocketMQ源码(16)—消费者负载均衡服务RebalanceService入口源码

8 SpringBoot整合RocketMQ实现消费者广播模式和负载均衡模式

rocketmq总结以及自动化部署策略

rocketMQ安装配置+与java交互API操作+集群搭建+高级特性