RocketMQ的使用
Posted domi22
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ的使用相关的知识,希望对你有一定的参考价值。
1 在resources目录下创建config目录,新建文件rocketmq.properties文件
# 指定namesrv地址 suning.rocketmq.namesrvAddr=localhost:9876 #生产者group名称 suning.rocketmq.producerGroupName=user_group #事务生产者group名称 suning.rocketmq.transactionProducerGroupName=order_transaction #消费者group名称 suning.rocketmq.consumerGroupName=user_consumer_group #生产者实例名称 suning.rocketmq.producerInstanceName=user_producer_instance #消费者实例名称 suning.rocketmq.consumerInstanceName=user_consumer_instance #事务生产者实例名称 suning.rocketmq.producerTranInstanceName=user_producer_transacition #一次最大消费多少数量消息 suning.rocketmq.consumerBatchMaxSize=1 #广播消费 suning.rocketmq.consumerBroadcasting=false #消费的topic:tag suning.rocketmq.subscribe[0]=user-topic:white #启动的时候是否消费历史记录 suning.rocketmq.enableHistoryConsumer=false #启动顺序消费 suning.rocketmq.enableOrderConsumer=false
2 新建properties文件读取类
package com.test.domi.config; import java.util.ArrayList; import java.util.List; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; /** * @Author 18011618 * @Date 19:31 2018/7/18 * @Function 读取配置文件信息 */ @PropertySource("classpath:config/rocketmq.properties") @ConfigurationProperties(prefix = "suning.rocketmq") @Configuration public class RocketMQProperties { private String namesrvAddr; private String producerGroupName; private String transactionProducerGroupName; private String consumerGroupName; private String producerInstanceName; private String consumerInstanceName; private String producerTranInstanceName; private int consumerBatchMaxSize; private boolean consumerBroadcasting; private boolean enableHistoryConsumer; private boolean enableOrderConsumer; private List<String> subscribe = new ArrayList<String>(); public String getNamesrvAddr() { return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } public String getProducerGroupName() { return producerGroupName; } public void setProducerGroupName(String producerGroupName) { this.producerGroupName = producerGroupName; } public String getTransactionProducerGroupName() { return transactionProducerGroupName; } public void setTransactionProducerGroupName(String transactionProducerGroupName) { this.transactionProducerGroupName = transactionProducerGroupName; } public String getConsumerGroupName() { return consumerGroupName; } public void setConsumerGroupName(String consumerGroupName) { this.consumerGroupName = consumerGroupName; } public String getProducerInstanceName() { return producerInstanceName; } public void setProducerInstanceName(String producerInstanceName) { this.producerInstanceName = producerInstanceName; } public String getConsumerInstanceName() { return consumerInstanceName; } public void setConsumerInstanceName(String consumerInstanceName) { this.consumerInstanceName = consumerInstanceName; } public String getProducerTranInstanceName() { return producerTranInstanceName; } public void setProducerTranInstanceName(String producerTranInstanceName) { this.producerTranInstanceName = producerTranInstanceName; } public int getConsumerBatchMaxSize() { return consumerBatchMaxSize; } public void setConsumerBatchMaxSize(int consumerBatchMaxSize) { this.consumerBatchMaxSize = consumerBatchMaxSize; } public boolean isConsumerBroadcasting() { return consumerBroadcasting; } public void setConsumerBroadcasting(boolean consumerBroadcasting) { this.consumerBroadcasting = consumerBroadcasting; } public boolean isEnableHistoryConsumer() { return enableHistoryConsumer; } public void setEnableHistoryConsumer(boolean enableHistoryConsumer) { this.enableHistoryConsumer = enableHistoryConsumer; } public boolean isEnableOrderConsumer() { return enableOrderConsumer; } public void setEnableOrderConsumer(boolean enableOrderConsumer) { this.enableOrderConsumer = enableOrderConsumer; } public List<String> getSubscribe() { return subscribe; } public void setSubscribe(List<String> subscribe) { this.subscribe = subscribe; } @Override public String toString() { return "RocketMQProperties{" + "namesrvAddr=‘" + namesrvAddr + ‘‘‘ + ", producerGroupName=‘" + producerGroupName + ‘‘‘ + ", transactionProducerGroupName=‘" + transactionProducerGroupName + ‘‘‘ + ", consumerGroupName=‘" + consumerGroupName + ‘‘‘ + ", producerInstanceName=‘" + producerInstanceName + ‘‘‘ + ", consumerInstanceName=‘" + consumerInstanceName + ‘‘‘ + ", producerTranInstanceName=‘" + producerTranInstanceName + ‘‘‘ + ", consumerBatchMaxSize=" + consumerBatchMaxSize + ", consumerBroadcasting=" + consumerBroadcasting + ", enableHistoryConsumer=" + enableHistoryConsumer + ", enableOrderConsumer=" + enableOrderConsumer + ", subscribe=" + subscribe + ‘}‘; } }
3.加载properties文件
package com.test.domi.config; import javax.annotation.PostConstruct; import groovy.util.logging.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.List; import java.util.stream.Collectors; /** * @Author 18011618 * @Date 19:36 2018/7/18 * @Function 通过使用指定的文件读取类 来加载配置文件到字段中 */ @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @Slf4j public class RocketMQConfiguration { @Autowired private RocketMQProperties rocketMQProperties; //事件监听 @Autowired private ApplicationEventPublisher publisher = null; private static boolean isFirstSub = true; private static long startTime = System.currentTimeMillis(); private static Logger log = LoggerFactory.getLogger(RocketMQConfiguration.class); /** * 容器初始化的时候 打印参数 */ @PostConstruct public void init() { System.err.println(rocketMQProperties.getNamesrvAddr()); System.err.println(rocketMQProperties.getProducerGroupName()); System.err.println(rocketMQProperties.getConsumerBatchMaxSize()); System.err.println(rocketMQProperties.getConsumerGroupName()); System.err.println(rocketMQProperties.getConsumerInstanceName()); System.err.println(rocketMQProperties.getProducerInstanceName()); System.err.println(rocketMQProperties.getProducerTranInstanceName()); System.err.println(rocketMQProperties.getTransactionProducerGroupName()); System.err.println(rocketMQProperties.isConsumerBroadcasting()); System.err.println(rocketMQProperties.isEnableHistoryConsumer()); System.err.println(rocketMQProperties.isEnableOrderConsumer()); System.out.println(rocketMQProperties.getSubscribe().get(0)); } /** * 创建普通消息发送者实例 * @return * @throws MQClientException */ @Bean public DefaultMQProducer defaultProducer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer( rocketMQProperties.getProducerGroupName()); producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr()); producer.setInstanceName(rocketMQProperties.getProducerInstanceName()); producer.setVipChannelEnabled(false); producer.setRetryTimesWhenSendAsyncFailed(10); producer.start(); log.info("rocketmq producer server is starting...."); return producer; } /** * 创建支持消息事务发送的实例 * @return * @throws MQClientException */ @Bean public TransactionMQProducer transactionProducer() throws MQClientException { TransactionMQProducer producer = new TransactionMQProducer( rocketMQProperties.getTransactionProducerGroupName()); producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr()); producer.setInstanceName(rocketMQProperties .getProducerTranInstanceName()); producer.setRetryTimesWhenSendAsyncFailed(10); // 事务回查最小并发数 producer.setCheckThreadPoolMinSize(2); // 事务回查最大并发数 producer.setCheckThreadPoolMaxSize(2); // 队列数 producer.setCheckRequestHoldMax(2000); producer.start(); log.info("rocketmq transaction producer server is starting...."); return producer; } /** * 创建消息消费的实例 * @return * @throws MQClientException */ @Bean public DefaultMQPushConsumer pushConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( rocketMQProperties.getConsumerGroupName()); consumer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr()); consumer.setInstanceName(rocketMQProperties.getConsumerInstanceName()); //判断是否是广播模式 if (rocketMQProperties.isConsumerBroadcasting()) { consumer.setMessageModel(MessageModel.BROADCASTING); } //设置批量消费 consumer.setConsumeMessageBatchMaxSize(rocketMQProperties .getConsumerBatchMaxSize() == 0 ? 1 : rocketMQProperties .getConsumerBatchMaxSize()); //获取topic和tag List<String> subscribeList = rocketMQProperties.getSubscribe(); for (String sunscribe : subscribeList) { consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]); } // 顺序消费 if (rocketMQProperties.isEnableOrderConsumer()) { consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage( List<MessageExt> msgs, ConsumeOrderlyContext context) { try { context.setAutoCommit(true); msgs = filterMessage(msgs); if (msgs.size() == 0) return ConsumeOrderlyStatus.SUCCESS; publisher.publishEvent(new MessageEvent(msgs, consumer)); } catch (Exception e) { e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); } // 并发消费 else { consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { //过滤消息 msgs = filterMessage(msgs); if (msgs.size() == 0) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; publisher.publishEvent(new MessageEvent(msgs, consumer)); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); } new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000); try { consumer.start(); } catch (Exception e) { e.printStackTrace(); } log.info("rocketmq consumer server is starting...."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); return consumer; } /** * 消息过滤 * @param msgs * @return */ private List<MessageExt> filterMessage(List<MessageExt> msgs) { if (isFirstSub && !rocketMQProperties.isEnableHistoryConsumer()) { msgs = msgs.stream() .filter(item -> startTime - item.getBornTimestamp() < 0) .collect(Collectors.toList()); } if (isFirstSub && msgs.size() > 0) { isFirstSub = false; } return msgs; } }
4 创建生产者
package com.test.domi.controller; import com.test.domi.dto.User; import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.alibaba.fastjson.JSON; import java.util.List; @RestController public class ProducerController { @Autowired private DefaultMQProducer defaultProducer; @Autowired private TransactionMQProducer transactionProducer; /** * 发送普通消息 */ @GetMapping("/sendMessage") public void sendMsg() { for(int i=0;i<10;i++){ User user = new User(); user.setId(String.valueOf(i)); user.setUsername("jhp"+i); String json = JSON.toJSONString(user); Message msg = new Message("user-topic","white",json.getBytes()); try { SendResult result = defaultProducer.send(msg); System.out.println("消息id:"+result.getMsgId()+":"+","+"发送状态:"+result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); System.out.println("消息发送失败"); } } } /** * 发送事务消息 * @return */ @GetMapping("/sendTransactionMess") public String sendTransactionMsg() { SendResult sendResult = null; try { // a,b,c三个值对应三个不同的状态 String ms = "c"; Message msg = new Message("user-topic","white",ms.getBytes()); // 发送事务消息 sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1, Object arg) -> { String value = ""; if (arg instanceof String) { value = (String) arg; } if (value == "") { throw new RuntimeException("发送消息不能为空..."); } else if (value =="a") { return LocalTransactionState.ROLLBACK_MESSAGE; } else if (value =="b") { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; }, 4); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); } return sendResult.toString(); } /** * 支持顺序发送消息 */ @GetMapping("/sendMessOrder") public void sendMsgOrder() { for(int i=0;i<100;i++) { User user = new User(); user.setId(String.valueOf(i)); user.setUsername("jhp" + i); String json = JSON.toJSONString(user); Message msg = new Message("user-topic", "white", json.getBytes()); try{ defaultProducer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int index = ((Integer) arg) % mqs.size(); return mqs.get(index); } },i); } catch (Exception e){ e.printStackTrace(); } } } }
5.创建监听对象
package com.test.domi.config; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.context.ApplicationEvent; import java.util.List; /** * 监听对象 * @author 18011618 * */ public class MessageEvent extends ApplicationEvent { private static final long serialVersionUID = -4468405250074063206L; private DefaultMQPushConsumer consumer; private List<MessageExt> msgs; public MessageEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception { super(msgs); this.consumer = consumer; this.setMsgs(msgs); } public DefaultMQPushConsumer getConsumer() { return consumer; } public void setConsumer(DefaultMQPushConsumer consumer) { this.consumer = consumer; } public List<MessageExt> getMsgs() { return msgs; } public void setMsgs(List<MessageExt> msgs) { this.msgs = msgs; } }
6.监听消息进行消费
package com.test.domi.config; import java.util.List; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; /** * 监听消息进行消费 */ @Component public class ConsumerService { @EventListener(condition = "#event.msgs[0].topic==‘user-topic‘ && #event.msgs[0].tags==‘white‘") public void rocketmqMsgListener(MessageEvent event) { try { List<MessageExt> msgs = event.getMsgs(); for (MessageExt msg : msgs) { System.err.println("消费消息:"+new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); } } }
访问/sendMessage的url生产消息,控制台打印如下:
消息id:C0A801652B5C18B4AAC20F5FF9C60032:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFE20033:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFE50034:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFE80035:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFEB0036:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFEF0037:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFF30038:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFF90039:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFFC003A:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFFF003B:,发送状态:SEND_OK 消费消息:{"id":"9","username":"jhp9"} 消费消息:{"id":"6","username":"jhp6"} 消费消息:{"id":"0","username":"jhp0"} 消费消息:{"id":"5","username":"jhp5"} 消费消息:{"id":"1","username":"jhp1"} 消费消息:{"id":"2","username":"jhp2"} 消费消息:{"id":"7","username":"jhp7"} 消费消息:{"id":"3","username":"jhp3"} 消费消息:{"id":"4","username":"jhp4"} 消费消息:{"id":"8","username":"jhp8"}
以上是关于RocketMQ的使用的主要内容,如果未能解决你的问题,请参考以下文章