阿里云RocketMQ的消费者简单实现
Posted silentdoer
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阿里云RocketMQ的消费者简单实现相关的知识,希望对你有一定的参考价值。
业务场景之类的请看另一篇生产者的实现;
package com.ttt.eee; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.nio.charset.Charset; import java.util.Properties; import java.util.Scanner; public class MQTestConsumer public static void main(String[] args) Properties properties = new Properties(); // 您在控制台创建的 Group ID,其实就是网上说的groupName properties.put(PropertyKeyConst.GROUP_ID, "eee"); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.AccessKey, "sss"); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, "bbb"); // 设置 TCP 接入域名,到控制台的实例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://ttt.mq-internet-access.mq-internet.aliyuncs.com:80"); // 集群订阅方式 (默认) // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // 广播订阅方式 // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); Consumer consumer = ONSFactory.createConsumer(properties); // TODO tag如果是*表示订阅所有的tag消息,注意在producer里是叫tags,这里却叫subExpresion // *表示订阅所有Tag,TagA||TagB表示订阅 TagA和TagB consumer.subscribe("xxx-change", "*", (message, context) -> // context的用处暂时不知道 System.out.println("Receive: " + message); System.out.println("具体消息为:" + new String(message.getBody(), Charset.forName("UTF-8"))); // 正常消费返回这个,如果消费消息后业务处理出现问题一般返回:Action.ReconsumeLater表示这条消息晚点处理; return Action.CommitMessage; ); //订阅另外一个 Topic // TODO 一个Consumer可以订阅多个topic ??,不过既然是官网的例子应该是可以的 /*consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() //订阅全部 Tag public Action consume(Message message, ConsumeContext context) System.out.println("Receive: " + message); return Action.CommitMessage; );*/ consumer.start(); System.out.println("Consumer Started"); var scanner = new Scanner(System.in); scanner.next(); consumer.shutdown(); System.out.println("closed producer conn.");
集合到Spring里是:
@Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean xxxNotify(XxxNotifyListener xxxNotifyListener) return this.getConsumer(gid, topic, xxxNotifyListener); private ConsumerBean getConsumer(String gid, String topic, MessageListener messageListener) Properties properties = new Properties(); properties.setProperty("addr", addr); properties.setProperty("AccessKey", accessKey); properties.setProperty("SecretKey", secretKey); properties.setProperty("GROUP_ID", gid); Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(); Subscription subscription = new Subscription(); subscription.setTopic(topic); // 这里还可以设置subExpression来描述tag subscriptionTable.put(subscription, messageListener); ConsumerBean consumer = new ConsumerBean(); consumer.setProperties(properties); consumer.setSubscriptionTable(subscriptionTable); return consumer;
MessageListener里是用来实现消费这个消息后的具体业务逻辑的;
以上是关于阿里云RocketMQ的消费者简单实现的主要内容,如果未能解决你的问题,请参考以下文章
Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息
Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息