RocketMQ消费者demo
Posted lizhen1412
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ消费者demo相关的知识,希望对你有一定的参考价值。
简约版本RocketMQ消费者:
pom引入依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hellobike</groupId> <artifactId>rocketmqconsumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rocketmqconsumer</name> <description>RocketMqConsumer</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
消费者方法类:
package com.xxx.rocketmqconsumer.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.ContextRefreshedEvent; @Configuration public class RocketConfiguration implements ApplicationListener<ContextRefreshedEvent> protected static final Logger logger = LoggerFactory.getLogger(RocketConfiguration.class); private static final String consumerGroup = "basic-test"; private static final String namesrvAddr = "test1-rocketmq.ttbike.com.cn:9876;test2-rocketmq.ttbike.com.cn:9876"; private static final String topic = "TopicTest"; @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(namesrvAddr); try if (contextRefreshedEvent.getApplicationContext().getParent() == null) consumer.subscribe(topic, "*"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; ); consumer.start(); catch (MQClientException mQClientException) mQClientException.printStackTrace();
RocketMQ1.0消费者配置文件抽出:
pom.xml引入依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hellobike</groupId> <artifactId>rocketmqconsumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rocketmqconsumer</name> <description>RocketMqConsumer</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
package com.xxx.rocketmqconsumer.consumer; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "rocketmq.comsumer") @Data public class RocketMQComsumerProperties // nameServer 定义实体 private String nameServer = "localhost:9876"; // Topic 实体定义 private String topic; // 消费者Group定义 private String comsumerGroup;
增加logback-spring.xml 配置文件
<?xml version="1.0" encoding="utf-8"?> <configuration> <!-- 控制台输出--> <appender name="consoleLog" class="ch.qos.logback.core.ConsoleAppender"> <!-- 布局--> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%d - %msg%n</pattern> </layout> </appender> <appender name="fileInfoLog" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>ERROR</level> <onMatch>DENY</onMatch> <onMismatch>ACCEPT</onMismatch> </filter> <encoder> <pattern>%msg%n</pattern> </encoder> <!--滚动策略--> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--路径--> <fileNamePattern>/workspace/carkey/RocketMqConsumer/info.%d.log</fileNamePattern> </rollingPolicy> </appender> <appender name="fileErrorLog" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>ERROR</level> </filter> <encoder> <pattern>%msg%n</pattern> </encoder> <!--滚动策略--> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--路径--> <fileNamePattern>/workspace/carkey/RocketMqConsumer/error.%d.log</fileNamePattern> </rollingPolicy> </appender> <root level="info"> <appender-ref ref="consoleLog"/> <appender-ref ref="fileInfoLog"/> <appender-ref ref="fileErrorLog"/> </root> </configuration>
配置文件配置:
rocketmq.comsumer.nameServer=test1-rocketmq.ttbike.com.cn:9876;test2-rocketmq.ttbike.com.cn:9876 rocketmq.comsumer.topic=TopicTest rocketmq.comsumer.comsumerGroup=basic-test logging.file=app.log logging.level.com.hellobike.rocketmqconsumer.consumer=error
以上是关于RocketMQ消费者demo的主要内容,如果未能解决你的问题,请参考以下文章