SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)
Posted yangk1996
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)相关的知识,希望对你有一定的参考价值。
一、Stream简介
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。
二、Stream学习
2.1消息中间件
使用kafka消息中间件来学习:https://www.cnblogs.com/yangk1996/p/10841588.html
2.2、工程改造
- springcloud-api
@Data public class User implements Serializable /** * ID */ private Long id; /** * 用户名称 */ private String name;
- spring-cloud-api-client
- maven
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
- 增加消息输出
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * @Description: 用户信息输出 * @date: 2019/6/23 13:05 */ public interface UserMessage @Output("user-message") MessageChannel output();
- Controller
@Autowired private UserMessage userMessage;
@PostMapping("/user/save/message/stream") public boolean saveUserByRabbitMessage(@RequestBody User user) MessageChannel messageChannel = userMessage.output(); return messageChannel.send(MessageBuilder.withPayload(user).build());
- application.properties
## Kafka 生产者配置 spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092 spring.kafka.consumer.group-id= yangk spring.kafka.consumer.clientId=spring-cloud-api-client ## Spring Cloud Stream Binding 配置 ### user-message 为输出管道名称 destination 指定 Topic spring.cloud.stream.bindings.user-message.destination = springCloud
- 启动入口增加EnableBinding
@EnableBinding(UserMessage.class)
- spring-cloud-api-provider
- Maven
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
- 消息输入
public interface UserMessage @Input("user-message") //管道名称 SubscribableChannel input();
- UserMessageServiceImpl(三种消息监听实现)
@Autowired private UserMessage userMessage; @Autowired private UserService userService; @Autowired private ObjectMapper objectMapper; @ServiceActivator(inputChannel = "user-message") public void listen(String data) throws IOException System.out.println("ServiceActivator实现"+data); saveUser(data); @StreamListener("user-message") public void onMessage(String data) throws IOException System.out.println(" @StreamListeners实现"+data); saveUser(data); private void saveUser(String data) throws IOException User user = objectMapper.readValue(data, User.class); userService.saveUser(user); @PostConstruct public void init() SubscribableChannel subscribableChannel = userMessage.input(); subscribableChannel.subscribe(message -> System.out.println("SubscribableChannel实现"+message); );
- application.properties
#Kafka配置 spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092 spring.kafka.consumer.group-id=yangk spring.kafka.consumer.clientId=spring-cloud-api-client ## Spring Cloud Stream Binding 配置 ### userMessage 为输入管道名称 destination 指定 Topic spring.cloud.stream.bindings.user-message.destination = springCloud
- 启动入口激活Stream Binding
//激活 Stream Binding到UserMessage @EnableBinding(UserMessage.class)
三、验证
项目启动顺序:spring-cloud-eureka-server -> spring-cloud-api-client -> spring-cloud-api-provider
这里乱码的应该是kafka的其他属性没有转换过来,这里我也没有处理这些。
以上是关于SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud学习第八篇:gateway学习(Hoxton.SR4)