前言
场景描述
当客户端向服务端请求,服务端返回出现了异常,对于客户端1返回为NULL,而对于客户端2返回的是正常数据。而服务端并不知道返回给客户端们的数据对不对,只能通过用户反馈来证实返回的错误性,显然是不正确的。
Stream简介
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。
主要议题
- Kafka
- Spring Kafka
- Spring Boot Kafka
- Spring Cloud Stream
- Spring Cloud Stream Kafka Binder
- 问题总结
主体内容
一、Kafka
主要用途
- 消息中间件
- 流式计算处理
- 日志
执行脚本目录bin
E:\\JavaEE\\kafka-2.5.0-src\\kafka-2.5.0-src\\bin\\windows
同类产品比较
- ActiveMQ:IMS(Java Message Service)规范实现
- RabbitMQ:AMQP(Advanved Message Queue Protocol)规范实现
- Kafka:并非某种规范的实现,它灵活和性能相对是优势的
快速上手步骤
1.下载并解压kafka压缩包。
2.下载并解压zookeeper压缩包,这里官方它的quickstart就是以zookeeper保证强一致性。zookeeper官方地址:https://zookeeper.apache.org/
3.以windows为例,我们先到zookeeper的conf目录下,把zoo_sample.cfg文件复制一份重命名为zoo.cfg。现在目录如下所示:
然后打开cmd,进入bin目录,启动服务。
zkServer.cmd
4.启动kafka。进入到kafka的window文件夹,执行启动命令。
kafka-server-start.bat ../../config/server.properties
5.创建kafka主题。再次打开一个cmd窗口,进入到windows文件夹
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gupao
6.生产者发送消息/生产消息
kafka-console-producer.bat --broker-list localhost:9092 --topic gupao
然后输入要发送的消息:
7.消费者接收消息/消费消息
重新打开一个cmd,输入接收命令。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao
当我在生产者端输入消息,消费者端马上就接收到了消息。
如果消费命令后面加上--from beginning参数,那么他会接收到从开始就生产的消息。
那么被消费后的消息能否被其他消费者消费?我们再开一个cmd,利用新的消费者消费。答案是可以的。
使用Kafka标准API
1.从start.spring.io构建项目。
2.新建包raw.api,创建类KafkaProducerDemo,这里就是让生产者通过java api形式进行发送消息。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @ClassName
* @Describe Kafka Producer Demo使用Kafka原始API
* @Author 66477
* @Date 2020/6/1417:15
* @Version 1.0
*/
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
//初始化配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer",StringSerializer.class.getName());//注意引包
//创建Kafka Producer
KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
//创建 Kafka消息
String topic = "gupao";
Integer partition=0;
Long timestamp= System.currentTimeMillis();
String key="message-key";
String value = "gupao.com";
ProducerRecord<String,String> record = new ProducerRecord<String, String>(topic,partition,timestamp,key,value);
//发送Kafka消息
Future<RecordMetadata> metadataFuture = kafkaProducer.send(record);
//强制执行
metadataFuture.get();
}
}
3.运行以上代码,然后你会发现,cmd窗口的消费者会接收消息。
二、Spring Kafka
那么接下来我们使用Spring整合的kafka。
官方文档
设计模式
Spring社区对data数据操作,有一个基本的模式,Template模式:
- JDBC:jdbcTemplate
- Redis:RedisTemplate
- Kafka:KafkaTemplate
- JMS:JmsTemplate
- Rest:RestTemplate
XXXTemplate一定实现XXXOpeations
Maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
三、Spring Boot Kafka
Maven依赖
自动装配器
KafkaAutoConfiguration
其中KafkaTemplate会被自动装配:
@Bean
@ConditionalOnMissingBean({KafkaTemplate.class})
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
关闭Spring Security
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.WebSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
public void configure(WebSecurity web) throws Exception {
web.ignoring().antMatchers("/**");
}
}
创建生产者
1.我们继续在上面的项目动刀子,我们先在application.properties文件转移之前demo类中配置。
#定义应用名称
spring.application.name=spring-cloud-stream-kafka
#配置端口
server.port=8080
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主题
kafka.topic = gupao
#生产者配置
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
2.然后编写一个controller类。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName
* @Describe Kafka生产者Controller
* @Author 66477
* @Date 2020/6/1418:07
* @Version 1.0
*/
@RestController
public class KafkaProducerController {
private final KafkaTemplate<String,String> kafkaTemplate;
private final String topic;
@Autowired
public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
@Value("${kafka.topic}") String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
@PostMapping("/message/send")
public Boolean sendMessage(@RequestParam String message){
kafkaTemplate.send(topic,message);
return true;
}
}
3.通过postman访问http://localhost:8080/message/send。
4.打开cmd消费端窗口,发现消息成功接收。
创建消费者
5.同样地,我们开始配置消费者,先去application.properties文件增加消费者配置。
#消费者配置
spring.kafka.consumer.group-id=gupao-1
spring.kafka.consumer.key-Derializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-Derializer=org.apache.kafka.common.serialization.StringDeserializer
6.因为消费者它是以监听的形式监听消息的,所以我们创建一个KafkaConsumerListener监听类。通过@KafkaListener来监听改主题的消息。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe Kafka消费者监听器
* @Author 66477
* @Date 2020/6/1418:25
* @Version 1.0
*/
@Component
public class KafkaConsumerListener {
@KafkaListener(topics ="${kafka.topic}" )
public void onMessage(String message){
System.out.println("Kafka消费者监听器接收到消息:"+message);
}
}
7.随后postman访问http://localhost:8080/message/send。控制台则会打印出:
2020-06-14 18:32:30.970 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-06-14 18:32:30.970 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-06-14 18:32:30.971 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1592130750970
2020-06-14 18:32:30.976 INFO 451856 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消费者监听器接收到消息:hello world
四、Spring Cloud Stream
加上本章中的stream,上一篇中的架构图又丰富了些东西。
其中
- RabbitMQ:AMQP、jms规范。
- kafka:相对松散的消息队列协议
基本概念
Source:来源,近义词:Producer,Publisher
Sink:接收器,近义词:Consumer,Subcriber
Processor:对于上流而言是Sink,对于下流而言是Source
Reactive Streams
- Publisher
- Subscriber
- Processor
代码示例
1.我们拷贝上面的spring cloud stream kafka项目,导入IDEA。
2.启动zookeeper,参考以上。
3.启动kafka,参考以上。
4.我们需要引入spring cloud stream依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
5.创建一个stream包,包下再创建producer包,创建一个类MessageProducerBean
消息大致分为两个部分,消息头和消息体。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1421:58
* @Version 1.0
*/
@Component
@EnableBinding(Source.class)
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
/**
* 发送消息
* @param message 消息内容
*/
public void send(String message){
//通过消息管道发送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
}
改写一下我们之前写的controller,增加另一种方式的接口。
import com.example.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName
* @Describe Kafka生产者Controller
* @Author 66477
* @Date 2020/6/1418:07
* @Version 1.0
*/
@RestController
public class KafkaProducerController {
private final KafkaTemplate<String,String> kafkaTemplate;
private final String topic;
private final MessageProducerBean messageProducerBean;
@Autowired
public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
@Value("${kafka.topic}") String topic, MessageProducerBean messageProducerBean) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
this.messageProducerBean = messageProducerBean;
}
/**
* 通过KafkaTemplate发送{@link KafkaTemplate}
* @param message
* @return
*/
@PostMapping("/message/send")
public Boolean sendMessage(@RequestParam String message){
kafkaTemplate.send(topic,message);
return true;
}
/**
* 通过消息生产者Bean发送{@link com.example.stream.producer.MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/message/send")
public Boolean send(@RequestParam String message){
messageProducerBean.send(message);
return true;
}
}
6.我们需要给引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
之前没有加上spring cloud 版本,现在要加上:
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
7.这时我们再启动cmd中的consumer消费者。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao
8.注释掉之前配置的生产者序列化。
#生产者配置
#spring.kafka.producer.bootstrap-servers=localhost:9092
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
9.Postman分别以GET,POST方式访问http://localhost:8080/send/message,发现消费者正常收到消息。
拓展:如果想要多主题怎么办,那么我能不能仿造Source接口,搭建一个属于自己的管道呢?我们也在stream包下创建一个message包,message包下创建一个接口(仿Source)MyMessageSource.
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1423:27
* @Version 1.0
*/
public interface MyMessagesSource {
/**
* 消息来源的管道名称
*/
String NAME="gupao";
@Output(NAME)
MessageChannel gupao();
}
然后我们仿造之前写的MessageProducerBean,再整一套自己的,也就是自定义消息发送源。
import com.example.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1421:58
* @Version 1.0
*/
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
@Autowired
@Qualifier(MyMessagesSource.NAME)//Bean名称
private MessageChannel gupaoMessageChannel;
@Autowired
private MyMessagesSource myMessagesSource;
/**
* 发送消息
* @param message 消息内容
*/
public void send(String message){
//通过消息管道发送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
/**
* 发送消息
* @param message 消息内容
*/
public void sendToGupao(String message){
//通过消息管道发送消息
myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
}
}
在application.properties文件增加一行属于自己的主题配置
spring.cloud.stream.bindings.gupao.destination=mygupao
这时去消费者监听类里面增加监听主题。
@KafkaListener(topics ="mygupao" )
public void onGupaoMessage(String message){
System.out.println("Kafka消费者监听器接收到主题mygupao消息:"+message);
}
我们去cmd黑窗口,把刚刚主题为gupao的停掉,改成mygupao主题监听。
E:\\JavaEE\\kafka\\bin\\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mygupao
最后去controller类增加一个接口,用于发送消息至我们新创建的管道。
/**
* 通过消息生产者Bean发送{@link com.example.stream.producer.MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/message/sendToGupao")
public Boolean sendToGupao(@RequestParam String message){
messageProducerBean.sendToGupao(message);
return true;
}
由于我之前杀死过8080端口,导致zookeeper进程被杀了(它也是8080端口),所以我将stream项目改成8081,重新启动了zookeeper,postman测试一下我们http://localhost:8081/message/sendToGupao。控制台信息如下:
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1592150424170
2020-06-15 00:00:24.174 INFO 171780 --- [ad | producer-3] org.apache.kafka.clients.Metadata : [Producer clientId=producer-3] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消费者监听器接收到主题mygupao消息:mygupaoaaa
cmd窗口如下:
同样地,我们也可以创建一个消息消费Bean用于接收消息。
在stream包下继续创建一个consumer包,包下创建名为MessageConsumerBean的Bean。用来实现标准Sink监听,
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @ClassName
* @Describe 消息消费Bean
* @Author 66477
* @Date 2020/6/1520:25
* @Version 1.0
*/
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT)//Bean名称
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
//那么订阅消息有多种方式
//方式一:通过SubscribableChannel订阅消息
//当字段注入完成后的回调
@PostConstruct
public void init(){
subscribableChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
});
}
//方式二:通过@ServiceActivator方式订阅消息
@ServiceActivator(inputChannel = Sink.INPUT)
public void onMessage(Object message){
System.out.println("onMessage :"+message);
}
//方式三:通过@StreamListener实现
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("StreamListener:"+message);
}
}
application.properties下也要增加对应的input主题项了。
spring.cloud.stream.bindings.input.destination=${kafka.topic}
五、Spring Cloud Stream Kafka Binder(RabbitMQ)
我们复制一下上面的项目,准备为stream rabbitmq做准备。重命名为spring-cloud-stream-rabbitmq重新导入IDEA,里面pom文件的artifactId也要修改。清除掉有关kafka的代码,application.properties清除关于kafka的生产者,消费者配置。
现在项目结构如下:
修改依赖为
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
其中MessageConsumerBean
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @ClassName
* @Describe 消息消费Bean
* @Author 66477
* @Date 2020/6/1520:25
* @Version 1.0
*/
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT)//Bean名称
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
//方式一:通过Subcribe订阅消息
//当字段注入完成后的回调
@PostConstruct
public void init(){
//实现异步回调
subscribableChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("subscribe:"+message.getPayload());
}
});
}
//方式二:通过@ServiceActivator
@ServiceActivator(inputChannel = Sink.INPUT)
public void onMessage(Object message){
System.out.println("onMessage :"+message);
}
//方式三:通过@StreamListener实现
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("StreamListener:"+message);
}
}
MyMessagesSource
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1423:27
* @Version 1.0
*/
public interface MyMessagesSource {
/**
* 消息来源的管道名称
*/
String NAME="gupao";
@Output(NAME)
MessageChannel gupao();
}
MessageProducerBean
import com.example.rabbitmq.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1421:58
* @Version 1.0
*/
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
@Autowired
@Qualifier(MyMessagesSource.NAME)//Bean名称
private MessageChannel gupaoMessageChannel;
@Autowired
private MyMessagesSource myMessagesSource;
/**
* 发送消息
* @param message 消息内容
*/
public void send(String message){
//通过消息管道发送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
/**
* 发送消息
* @param message 消息内容
*/
public void sendToGupao(String message){
//通过消息管道发送消息
myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
}
}
MessageProducerController
import com.example.rabbitmq.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName
* @Describe Rabbitmq生产者Controller
* @Author 66477
* @Date 2020/6/1418:07
* @Version 1.0
*/
@RestController
class MessageProducerController {
private final MessageProducerBean messageProducerBean;
private final String topic;
MessageProducerController(MessageProducerBean messageProducerBean, @Value("${kafka.topic}") String topic) {
this.messageProducerBean = messageProducerBean;
this.topic = topic;
}
/**
* 通过消息生产者Bean发送{@link MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/messageProducer/send")
public Boolean send(@RequestParam String message){
messageProducerBean.send(message);
return true;
}
/**
* 通过消息生产者Bean发送{@link MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/message/sendToGupao")
public Boolean sendToGupao(@RequestParam String message){
messageProducerBean.sendToGupao(message);
return true;
}
}
application.properties
#定义应用名称
spring.application.name=spring-cloud-stream-rabbitmq
#配置端口
server.port=8081
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主题
kafka.topic = gupao
#定义Spring Cloud Stream Source消息去向
#针对kafka而言,基本模式如下
#spring.cloud.stream.bindings.${channel-name}.destination=${kafka.topic}
spring.cloud.stream.bindings.output.destination=${kafka.topic}
spring.cloud.stream.bindings.gupao.destination=mygupao
spring.cloud.stream.bindings.input.destination=${kafka.topic}
六、问题总结
1.当时用Future时,异步调用都可以使用get()方式强制执行吗?
解答:是的,get等待当前线程执行完毕,并且获取返回接口。
2.@KafkaListener和kafka consumer有啥区别?
解答:没有实质区别,主要是编程模式。
@KafkaListener采用注解驱动
kafka consumer API 采用接口编程。
3.消费者接收消息的地方在哪?
解答:订阅并且处理后就消失了。
4.生产环境配置多个生产者和消费者只需要定义不同的group就可以了吗?
解答:group是一种,要看是不是相同topic。
5.为了不丢失数据,消息队列的容错,和排错后的处理,如何实现的?
解答这个依赖于zookeeper。
6.异步接收除了打印还有什么办法处理消息吗?
解答:可以处理其他逻辑,比如存储数据库。
7.kafka适合什么场景下使用?
解答:高性能的Stream处理。
8.Kafka消息一直都在,内存占用会很多吧,消息量不停产生消息咋办?
解答:kafka还是会删除的,并不是一直存在。
9.怎么没看到broker配置?
解答:broker不需要设置,它是单独启动。
10.consumer为什么要分组?
解答:consumer需要定义不同逻辑分组,相同主题里面不同分组,便于管理。
11.@EnableBinding有什么用?
解答:@EnableBinding将Source、Sink以及Processor提升成相应的代理.
12.@Autowired Source source 这种写法是默认用官方的实现?
解答:是官方的实现。
13.这么多消息框架,各自有点是什么,怎么选取?
解答:RabbitMQ:AMQP,JMS规范
kafka:相对松散的消息队列协议
ActiveMQ:AMQP,JMS规范
14.如果中间件有问题怎么办,我们只管用,不用维护吗?现在遇到的很多问题不是使用,而是俄日胡,中间件一有问题,消息堵塞或者丢失,只有重启?
解答:消息中间件无法保证不丢消息,多数高一致性的消息背会还是有持久化的。
15.@EnableBinder,@EnableZuulProxy,@EnableDiscoverClient这些注解都是通过特定BeanPostProcessor实现的吗?
解答:不完全对,主要处理接口在@Import:
- ImportSelector实现类
- ImportBeanDefinitionsRegistrar实现类
- @Configuration标注类
- BeanPostProcessor实现类
16.我对流式处理还是懵懵的,到底啥事流式处理,怎样才能称为流式处理,一般应用在什么场景?
解答:Stream处理简单的说,异步处理,消息是一种处理方式。
提交申请,机器生产,对于高密度提交任务,多数场景采用异步处理,Stream,Evnet-Driven。举例说明:审核流程,鉴别黄图。
17.如果是大量消息,怎么快速消费,用多线程吗?
解答:确实是使用多线程,不过不一定奏效,依赖于处理的具体内容,比如:一个线程使用了25%的CPU,四个线程就将cpu耗尽,因此,并发100个处理,实际上还是4个线程在处理。I/O密集型,CPU密集型。大多数是多线程,其实也单线程,流式非阻塞。
18.购物车的价格计算可以使用流式计算来处理吗?能说下思路吗?有没有什么高性能的方式推荐?
解答:当商品添加到购物车的时候,就可以开始计算了。