springcloud的stream消息组件的使用@StreamListener
Posted 好大的月亮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springcloud的stream消息组件的使用@StreamListener相关的知识,希望对你有一定的参考价值。
常见问题(使用rabbitmq)
消息分组防止多实例重复消费
在一个服务多实例场景下使用默认使用@StreamListener
监听消息消费,yml中没有特殊配置的话是会导致消息重复消费的,原因是此时每个实例都是匿名在rabbitmq上注册的队列,需要给消费者指定一个消费组,让消息在组里只被消费一次;
spring.cloud.stream.bindings.xxx(消费者队列名).group=xxx(组名
)
在springboot下在同一个服务(项目中)使用@input
和@outPut
时指定的队列名是不可以重复的.会在启动编译的时候报bean定义重复。需要在yml给生产者和消费者指定同一个交换机。
spring:
rabbitmq:
host: xxx.xxx.xxx.xx
port: 35672
username: xxx
password: xxx
virtual-host: /xxx
cloud:
stream:
bindings:
in:
#若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topic
destination: test
#在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列
#并且默认生成的交换机是topic类型的,会导致重复消费
group: myIn
out:
destination: test
先上依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.fchan</groupId>
<artifactId>springcloudstream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springcloudstream</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<!-- <version>2.0.1.RELEASE</version>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Ditmars.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
再上yml配置
spring:
rabbitmq:
host: xxx.xxx.xxx.xx
port: 35672
username: xxx
password: xxx
virtual-host: /xxx
cloud:
stream:
bindings:
in:
#若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topic
destination: test
#在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列
#并且默认生成的交换机是topic类型的,会导致重复消费
group: myIn
out:
destination: test
消息生产者
package com.fchan.springcloudstream.service;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyMessageChannel {
String out = "out";
String in = "in";
@Output(out)
MessageChannel out();
@Input(in)
SubscribableChannel in();
}
发送消息
package com.fchan.springcloudstream.controller;
import com.fchan.springcloudstream.service.MyMessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@RestController
public class MessageController {
@Resource
private MyMessageChannel myMessageChannel;
@RequestMapping("test")
public String testMessage(){
Map<String,Object> map = new HashMap<>();
map.put("shopId", "123");
myMessageChannel.out().send(MessageBuilder.withPayload(map).build());
return "success";
}
}
消息消费者
package com.fchan.springcloudstream.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@EnableBinding({MyMessageChannel.class})
public class MyConsumer {
Logger log = LoggerFactory.getLogger(MyConsumer.class);
@StreamListener(MyMessageChannel.in)
public void input(Message<Map<String,Object>> message){
log.info("收到消息:{}", message.getPayload());
}
}
以上是关于springcloud的stream消息组件的使用@StreamListener的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud学习—— SpringCloud Stream 消息驱动