rocketmq 以广播方式实现消费者消费消息
Posted zhangzhiqin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq 以广播方式实现消费者消费消息相关的知识,希望对你有一定的参考价值。
package com.bfxy.rocketmq.model;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import com.bfxy.rocketmq.constants.Const;
public class Consumer1 {
public Consumer1() {
try {
String group_name = "test_model_consumer_name1";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic2", "TagA");
//consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
//if(tags.equals("TagA")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
public static void main(String[] args) {
Consumer1 c1 = new Consumer1();
System.out.println("c1 start..");
}
}
//=========================
package com.bfxy.rocketmq.model;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import com.bfxy.rocketmq.constants.Const;
public class Consumer2 {
public Consumer2() {
try {
String group_name = "test_model_consumer_name2";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic2", "TagB");
//consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
//if(tags.equals("TagB")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
public static void main(String[] args) {
Consumer2 c2 = new Consumer2();
System.out.println("c2 start..");
}
}
以上是关于rocketmq 以广播方式实现消费者消费消息的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ专题2:三种常用生产消费方式(顺序广播定时)以及顺序消费源码探究