RocketMQ消费者demo

Posted lizhen1412

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ消费者demo相关的知识,希望对你有一定的参考价值。

简约版本RocketMQ消费者:

pom引入依赖:

技术图片
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hellobike</groupId>
    <artifactId>rocketmqconsumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmqconsumer</name>
    <description>RocketMqConsumer</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
pom.xml

消费者方法类:

技术图片
package com.xxx.rocketmqconsumer.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;


@Configuration
public class RocketConfiguration implements ApplicationListener<ContextRefreshedEvent> 
    protected static final Logger logger = LoggerFactory.getLogger(RocketConfiguration.class);
    private static final String consumerGroup = "basic-test";
    private static final String namesrvAddr = "test1-rocketmq.ttbike.com.cn:9876;test2-rocketmq.ttbike.com.cn:9876";
    private static final String topic = "TopicTest";

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        try 
            if (contextRefreshedEvent.getApplicationContext().getParent() == null) 
                consumer.subscribe(topic, "*");
                consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> 
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                );
                consumer.start();
            
         catch (MQClientException mQClientException) 
            mQClientException.printStackTrace();
        
    
RocketConfiguration

 

RocketMQ1.0消费者配置文件抽出:

pom.xml引入依赖:

技术图片
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hellobike</groupId>
    <artifactId>rocketmqconsumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmqconsumer</name>
    <description>RocketMqConsumer</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
pom.xml
技术图片
package com.xxx.rocketmqconsumer.consumer;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "rocketmq.comsumer")
@Data
public class RocketMQComsumerProperties 
    // nameServer 定义实体
    private String nameServer = "localhost:9876";
    // Topic 实体定义
    private String topic;
    // 消费者Group定义
    private String comsumerGroup;
RocketMQComsumerProperties

增加logback-spring.xml 配置文件

技术图片
<?xml version="1.0" encoding="utf-8"?>
<configuration>
    <!--    控制台输出-->
    <appender name="consoleLog" class="ch.qos.logback.core.ConsoleAppender">
        <!--        布局-->
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>%d - %msg%n</pattern>
        </layout>
    </appender>
    <appender name="fileInfoLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>DENY</onMatch>
            <onMismatch>ACCEPT</onMismatch>
        </filter>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
        <!--滚动策略-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--路径-->
            <fileNamePattern>/workspace/carkey/RocketMqConsumer/info.%d.log</fileNamePattern>
        </rollingPolicy>
    </appender>
    <appender name="fileErrorLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>ERROR</level>
        </filter>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
        <!--滚动策略-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--路径-->
            <fileNamePattern>/workspace/carkey/RocketMqConsumer/error.%d.log</fileNamePattern>
        </rollingPolicy>
    </appender>
    <root level="info">
        <appender-ref ref="consoleLog"/>
        <appender-ref ref="fileInfoLog"/>
        <appender-ref ref="fileErrorLog"/>
    </root>
</configuration>
logback-spring.xml

配置文件配置:

技术图片
rocketmq.comsumer.nameServer=test1-rocketmq.ttbike.com.cn:9876;test2-rocketmq.ttbike.com.cn:9876
rocketmq.comsumer.topic=TopicTest
rocketmq.comsumer.comsumerGroup=basic-test
logging.file=app.log
logging.level.com.hellobike.rocketmqconsumer.consumer=error
application.properties

 

以上是关于RocketMQ消费者demo的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ快速入门:消息发送延迟消息消费重试

springboot rabbitMQ demo

Apache RocketMQ:使用官方demo测试rocketmq

rocketmq消费问题总结

RocketMQ 顺序消费

RocketMQ-消息消费模式 顺序消费