微服务之路spring cloud stream

Posted mcbbss

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务之路spring cloud stream相关的知识,希望对你有一定的参考价值。

前言

场景描述

当客户端向服务端请求,服务端返回出现了异常,对于客户端1返回为NULL,而对于客户端2返回的是正常数据。而服务端并不知道返回给客户端们的数据对不对,只能通过用户反馈来证实返回的错误性,显然是不正确的。

Stream简介

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

主要议题

  • Kafka
  • Spring Kafka
  • Spring Boot Kafka
  • Spring Cloud Stream
  • Spring Cloud Stream Kafka Binder
  • 问题总结

主体内容

一、Kafka

官方地址:http://kafka.apache.org

主要用途

  • 消息中间件
  • 流式计算处理
  • 日志

执行脚本目录bin

E:\\JavaEE\\kafka-2.5.0-src\\kafka-2.5.0-src\\bin\\windows

同类产品比较

  • ActiveMQ:IMS(Java Message Service)规范实现
  • RabbitMQ:AMQP(Advanved Message Queue Protocol)规范实现
  • Kafka:并非某种规范的实现,它灵活和性能相对是优势的

快速上手步骤

1.下载并解压kafka压缩包。

2.下载并解压zookeeper压缩包,这里官方它的quickstart就是以zookeeper保证强一致性。zookeeper官方地址:https://zookeeper.apache.org/

3.以windows为例,我们先到zookeeper的conf目录下,把zoo_sample.cfg文件复制一份重命名为zoo.cfg。现在目录如下所示:

然后打开cmd,进入bin目录,启动服务。

zkServer.cmd

4.启动kafka。进入到kafka的window文件夹,执行启动命令。

kafka-server-start.bat ../../config/server.properties

5.创建kafka主题。再次打开一个cmd窗口,进入到windows文件夹

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gupao

6.生产者发送消息/生产消息

kafka-console-producer.bat --broker-list localhost:9092 --topic gupao

然后输入要发送的消息:

7.消费者接收消息/消费消息

重新打开一个cmd,输入接收命令。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao

当我在生产者端输入消息,消费者端马上就接收到了消息。

如果消费命令后面加上--from beginning参数,那么他会接收到从开始就生产的消息。

那么被消费后的消息能否被其他消费者消费?我们再开一个cmd,利用新的消费者消费。答案是可以的。

使用Kafka标准API

1.从start.spring.io构建项目。

2.新建包raw.api,创建类KafkaProducerDemo,这里就是让生产者通过java api形式进行发送消息。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;

/**
 * @ClassName
 * @Describe Kafka Producer Demo使用Kafka原始API
 * @Author 66477
 * @Date 2020/6/1417:15
 * @Version 1.0
 */
public class KafkaProducerDemo {
    public static void main(String[] args) throws Exception {

        //初始化配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","localhost:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer",StringSerializer.class.getName());//注意引包
        //创建Kafka Producer
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
        //创建 Kafka消息
        String topic =  "gupao";

        Integer partition=0;
        Long timestamp= System.currentTimeMillis();
        String key="message-key";
        String value = "gupao.com";
        ProducerRecord<String,String> record = new ProducerRecord<String, String>(topic,partition,timestamp,key,value);
        //发送Kafka消息
        Future<RecordMetadata> metadataFuture = kafkaProducer.send(record);
        //强制执行
        metadataFuture.get();
    }
}

3.运行以上代码,然后你会发现,cmd窗口的消费者会接收消息。

二、Spring Kafka

那么接下来我们使用Spring整合的kafka。

官方文档

设计模式

Spring社区对data数据操作,有一个基本的模式,Template模式:

  • JDBC:jdbcTemplate
  • Redis:RedisTemplate
  • Kafka:KafkaTemplate
  • JMS:JmsTemplate
  • Rest:RestTemplate

XXXTemplate一定实现XXXOpeations

Maven依赖

<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
</dependency>

三、Spring Boot Kafka

Maven依赖

自动装配器

KafkaAutoConfiguration

其中KafkaTemplate会被自动装配:

@Bean
@ConditionalOnMissingBean({KafkaTemplate.class})
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
    messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
    kafkaTemplate.setProducerListener(kafkaProducerListener);
    kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
    return kafkaTemplate;
}

关闭Spring Security

依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-security</artifactId>
</dependency>
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.WebSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;

@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
     
        @Override
        public void configure(WebSecurity web) throws Exception {
            web.ignoring().antMatchers("/**");
        }
}

创建生产者

1.我们继续在上面的项目动刀子,我们先在application.properties文件转移之前demo类中配置。

#定义应用名称
spring.application.name=spring-cloud-stream-kafka
#配置端口
server.port=8080
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主题
kafka.topic = gupao
#生产者配置
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

2.然后编写一个controller类。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName
 * @Describe Kafka生产者Controller
 * @Author 66477
 * @Date 2020/6/1418:07
 * @Version 1.0
 */
@RestController
public class KafkaProducerController {
    private final KafkaTemplate<String,String> kafkaTemplate;

    private final String topic;

    @Autowired
    public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
                                   @Value("${kafka.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    @PostMapping("/message/send")
    public Boolean sendMessage(@RequestParam String message){
        kafkaTemplate.send(topic,message);
        return  true;
    }
}

3.通过postman访问http://localhost:8080/message/send。

4.打开cmd消费端窗口,发现消息成功接收。

创建消费者

5.同样地,我们开始配置消费者,先去application.properties文件增加消费者配置。

#消费者配置
spring.kafka.consumer.group-id=gupao-1
spring.kafka.consumer.key-Derializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-Derializer=org.apache.kafka.common.serialization.StringDeserializer

6.因为消费者它是以监听的形式监听消息的,所以我们创建一个KafkaConsumerListener监听类。通过@KafkaListener来监听改主题的消息。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName
 * @Describe Kafka消费者监听器
 * @Author 66477
 * @Date 2020/6/1418:25
 * @Version 1.0
 */
@Component
public class KafkaConsumerListener {
    @KafkaListener(topics ="${kafka.topic}" )
    public void onMessage(String message){
        System.out.println("Kafka消费者监听器接收到消息:"+message);
    }
}

7.随后postman访问http://localhost:8080/message/send。控制台则会打印出:

2020-06-14 18:32:30.970  INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-06-14 18:32:30.970  INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-06-14 18:32:30.971  INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1592130750970
2020-06-14 18:32:30.976  INFO 451856 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消费者监听器接收到消息:hello world

四、Spring Cloud Stream

加上本章中的stream,上一篇中的架构图又丰富了些东西。

其中

  • RabbitMQ:AMQP、jms规范。
  • kafka:相对松散的消息队列协议

基本概念

Source:来源,近义词:Producer,Publisher

Sink:接收器,近义词:Consumer,Subcriber

Processor:对于上流而言是Sink,对于下流而言是Source

Reactive Streams

  • Publisher
  • Subscriber
  • Processor

代码示例

1.我们拷贝上面的spring cloud stream kafka项目,导入IDEA。

2.启动zookeeper,参考以上。

3.启动kafka,参考以上。

4.我们需要引入spring cloud stream依赖。

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream</artifactId>
</dependency>

5.创建一个stream包,包下再创建producer包,创建一个类MessageProducerBean

消息大致分为两个部分,消息头和消息体。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1421:58
 * @Version 1.0
 */
@Component
@EnableBinding(Source.class)
public class MessageProducerBean {

    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel messageChannel;
    
	@Autowired
    private Source source;
    
    /**
     * 发送消息
     * @param message 消息内容
     */
    public void send(String message){
        //通过消息管道发送消息
        source.output().send(MessageBuilder.withPayload(message).build());
    }

}

改写一下我们之前写的controller,增加另一种方式的接口。

import com.example.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName
 * @Describe Kafka生产者Controller
 * @Author 66477
 * @Date 2020/6/1418:07
 * @Version 1.0
 */
@RestController
public class KafkaProducerController {
    private final KafkaTemplate<String,String> kafkaTemplate;

    private final String topic;

    private final MessageProducerBean messageProducerBean;

    @Autowired
    public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
                                   @Value("${kafka.topic}") String topic, MessageProducerBean messageProducerBean) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
        this.messageProducerBean = messageProducerBean;
    }

    /**
     * 通过KafkaTemplate发送{@link KafkaTemplate}
     * @param message
     * @return
     */
    @PostMapping("/message/send")
    public Boolean sendMessage(@RequestParam String message){
        kafkaTemplate.send(topic,message);
        return  true;
    }

    /**
     * 通过消息生产者Bean发送{@link com.example.stream.producer.MessageProducerBean}
     * @param message
     * @return
     */
    @GetMapping("/message/send")
    public Boolean send(@RequestParam String message){
        messageProducerBean.send(message);
        return  true;
    }
}

6.我们需要给引入依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

之前没有加上spring cloud 版本,现在要加上:

<properties>
   <java.version>1.8</java.version>
   <spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>${spring-cloud.version}</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

7.这时我们再启动cmd中的consumer消费者。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao

8.注释掉之前配置的生产者序列化。

#生产者配置
#spring.kafka.producer.bootstrap-servers=localhost:9092
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

9.Postman分别以GET,POST方式访问http://localhost:8080/send/message,发现消费者正常收到消息。

拓展:如果想要多主题怎么办,那么我能不能仿造Source接口,搭建一个属于自己的管道呢?我们也在stream包下创建一个message包,message包下创建一个接口(仿Source)MyMessageSource.

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1423:27
 * @Version 1.0
 */
public interface MyMessagesSource {
    /**
     * 消息来源的管道名称
     */
    String NAME="gupao";

    @Output(NAME)
    MessageChannel gupao();

}

然后我们仿造之前写的MessageProducerBean,再整一套自己的,也就是自定义消息发送源。

import com.example.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1421:58
 * @Version 1.0
 */
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {

    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel messageChannel;

    @Autowired
    private Source source;

    @Autowired
    @Qualifier(MyMessagesSource.NAME)//Bean名称
    private MessageChannel gupaoMessageChannel;

    @Autowired
    private MyMessagesSource myMessagesSource;
    /**
     * 发送消息
     * @param message 消息内容
     */
    public void send(String message){
        //通过消息管道发送消息
        source.output().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 发送消息
     * @param message 消息内容
     */
    public void sendToGupao(String message){
        //通过消息管道发送消息
        myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
    }

}

在application.properties文件增加一行属于自己的主题配置

spring.cloud.stream.bindings.gupao.destination=mygupao

这时去消费者监听类里面增加监听主题。

@KafkaListener(topics ="mygupao" )
public void onGupaoMessage(String message){
    System.out.println("Kafka消费者监听器接收到主题mygupao消息:"+message);
}

我们去cmd黑窗口,把刚刚主题为gupao的停掉,改成mygupao主题监听。

E:\\JavaEE\\kafka\\bin\\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mygupao

最后去controller类增加一个接口,用于发送消息至我们新创建的管道。

/**
 * 通过消息生产者Bean发送{@link com.example.stream.producer.MessageProducerBean}
 * @param message
 * @return
 */
@GetMapping("/message/sendToGupao")
public Boolean sendToGupao(@RequestParam String message){
    messageProducerBean.sendToGupao(message);
    return  true;
}

由于我之前杀死过8080端口,导致zookeeper进程被杀了(它也是8080端口),所以我将stream项目改成8081,重新启动了zookeeper,postman测试一下我们http://localhost:8081/message/sendToGupao。控制台信息如下:

2020-06-15 00:00:24.170  INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-06-15 00:00:24.170  INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-06-15 00:00:24.170  INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1592150424170
2020-06-15 00:00:24.174  INFO 171780 --- [ad | producer-3] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消费者监听器接收到主题mygupao消息:mygupaoaaa

cmd窗口如下:

同样地,我们也可以创建一个消息消费Bean用于接收消息。

在stream包下继续创建一个consumer包,包下创建名为MessageConsumerBean的Bean。用来实现标准Sink监听,

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

/**
 * @ClassName
 * @Describe 消息消费Bean
 * @Author 66477
 * @Date 2020/6/1520:25
 * @Version 1.0
 */
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
    @Autowired
    @Qualifier(Sink.INPUT)//Bean名称
    private SubscribableChannel subscribableChannel;

    @Autowired
    private Sink sink;
    
     //那么订阅消息有多种方式
     //方式一:通过SubscribableChannel订阅消息
     //当字段注入完成后的回调
    @PostConstruct
    public void init(){
        subscribableChannel.subscribe(new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }
        });
    }
    
	//方式二:通过@ServiceActivator方式订阅消息
    @ServiceActivator(inputChannel = Sink.INPUT)
    public void onMessage(Object message){
        System.out.println("onMessage :"+message);
    }
    
    //方式三:通过@StreamListener实现
    @StreamListener(Sink.INPUT)
    public void onMessage(String message){
        System.out.println("StreamListener:"+message);
    }
}

application.properties下也要增加对应的input主题项了。

spring.cloud.stream.bindings.input.destination=${kafka.topic}

五、Spring Cloud Stream Kafka Binder(RabbitMQ)

我们复制一下上面的项目,准备为stream rabbitmq做准备。重命名为spring-cloud-stream-rabbitmq重新导入IDEA,里面pom文件的artifactId也要修改。清除掉有关kafka的代码,application.properties清除关于kafka的生产者,消费者配置。

现在项目结构如下:

修改依赖为

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

其中MessageConsumerBean

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

/**
 * @ClassName
 * @Describe 消息消费Bean
 * @Author 66477
 * @Date 2020/6/1520:25
 * @Version 1.0
 */
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
    @Autowired
    @Qualifier(Sink.INPUT)//Bean名称
    private SubscribableChannel subscribableChannel;

    @Autowired
    private Sink sink;

    //方式一:通过Subcribe订阅消息
    //当字段注入完成后的回调
    @PostConstruct
    public void init(){
        //实现异步回调
        subscribableChannel.subscribe(new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("subscribe:"+message.getPayload());
            }
        });
    }

    //方式二:通过@ServiceActivator
    @ServiceActivator(inputChannel = Sink.INPUT)
    public void onMessage(Object message){
        System.out.println("onMessage :"+message);
    }

    //方式三:通过@StreamListener实现
    @StreamListener(Sink.INPUT)
    public void onMessage(String message){
        System.out.println("StreamListener:"+message);
    }

}

MyMessagesSource

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1423:27
 * @Version 1.0
 */
public interface MyMessagesSource {
    /**
     * 消息来源的管道名称
     */
    String NAME="gupao";

    @Output(NAME)
    MessageChannel gupao();

}

MessageProducerBean

import com.example.rabbitmq.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1421:58
 * @Version 1.0
 */
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {

    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel messageChannel;

    @Autowired
    private Source source;

    @Autowired
    @Qualifier(MyMessagesSource.NAME)//Bean名称
    private MessageChannel gupaoMessageChannel;

    @Autowired
    private MyMessagesSource myMessagesSource;
    /**
     * 发送消息
     * @param message 消息内容
     */
    public void send(String message){
        //通过消息管道发送消息
        source.output().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 发送消息
     * @param message 消息内容
     */
    public void sendToGupao(String message){
        //通过消息管道发送消息
        myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
    }

}

MessageProducerController

import com.example.rabbitmq.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName
 * @Describe Rabbitmq生产者Controller
 * @Author 66477
 * @Date 2020/6/1418:07
 * @Version 1.0
 */
@RestController
class MessageProducerController {

    private final MessageProducerBean messageProducerBean;

    private final String topic;

    MessageProducerController(MessageProducerBean messageProducerBean, @Value("${kafka.topic}") String topic) {
        this.messageProducerBean = messageProducerBean;
        this.topic = topic;
    }


    /**
     * 通过消息生产者Bean发送{@link MessageProducerBean}
     * @param message
     * @return
     */
    @GetMapping("/messageProducer/send")
    public Boolean send(@RequestParam String message){
        messageProducerBean.send(message);
        return  true;
    }

    /**
     * 通过消息生产者Bean发送{@link MessageProducerBean}
     * @param message
     * @return
     */
    @GetMapping("/message/sendToGupao")
    public Boolean sendToGupao(@RequestParam String message){
        messageProducerBean.sendToGupao(message);
        return  true;
    }
}

application.properties

#定义应用名称
spring.application.name=spring-cloud-stream-rabbitmq
#配置端口
server.port=8081
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主题
kafka.topic = gupao

#定义Spring Cloud Stream Source消息去向
#针对kafka而言,基本模式如下
#spring.cloud.stream.bindings.${channel-name}.destination=${kafka.topic}
spring.cloud.stream.bindings.output.destination=${kafka.topic}
spring.cloud.stream.bindings.gupao.destination=mygupao
spring.cloud.stream.bindings.input.destination=${kafka.topic}

六、问题总结

1.当时用Future时,异步调用都可以使用get()方式强制执行吗?

解答:是的,get等待当前线程执行完毕,并且获取返回接口。

2.@KafkaListener和kafka consumer有啥区别?

解答:没有实质区别,主要是编程模式。

@KafkaListener采用注解驱动

kafka consumer API 采用接口编程。

3.消费者接收消息的地方在哪?

解答:订阅并且处理后就消失了。

4.生产环境配置多个生产者和消费者只需要定义不同的group就可以了吗?

解答:group是一种,要看是不是相同topic。

5.为了不丢失数据,消息队列的容错,和排错后的处理,如何实现的?

解答这个依赖于zookeeper。

6.异步接收除了打印还有什么办法处理消息吗?

解答:可以处理其他逻辑,比如存储数据库。

7.kafka适合什么场景下使用?

解答:高性能的Stream处理。

8.Kafka消息一直都在,内存占用会很多吧,消息量不停产生消息咋办?

解答:kafka还是会删除的,并不是一直存在。

9.怎么没看到broker配置?

解答:broker不需要设置,它是单独启动。

10.consumer为什么要分组?

解答:consumer需要定义不同逻辑分组,相同主题里面不同分组,便于管理。

11.@EnableBinding有什么用?

解答:@EnableBinding将Source、Sink以及Processor提升成相应的代理.

12.@Autowired Source source 这种写法是默认用官方的实现?

解答:是官方的实现。

13.这么多消息框架,各自有点是什么,怎么选取?

解答:RabbitMQ:AMQP,JMS规范

kafka:相对松散的消息队列协议

ActiveMQ:AMQP,JMS规范

14.如果中间件有问题怎么办,我们只管用,不用维护吗?现在遇到的很多问题不是使用,而是俄日胡,中间件一有问题,消息堵塞或者丢失,只有重启?

解答:消息中间件无法保证不丢消息,多数高一致性的消息背会还是有持久化的。

15.@EnableBinder,@EnableZuulProxy,@EnableDiscoverClient这些注解都是通过特定BeanPostProcessor实现的吗?

解答:不完全对,主要处理接口在@Import:

  • ImportSelector实现类
  • ImportBeanDefinitionsRegistrar实现类
  • @Configuration标注类
  • BeanPostProcessor实现类

16.我对流式处理还是懵懵的,到底啥事流式处理,怎样才能称为流式处理,一般应用在什么场景?

解答:Stream处理简单的说,异步处理,消息是一种处理方式。

提交申请,机器生产,对于高密度提交任务,多数场景采用异步处理,Stream,Evnet-Driven。举例说明:审核流程,鉴别黄图。

17.如果是大量消息,怎么快速消费,用多线程吗?

解答:确实是使用多线程,不过不一定奏效,依赖于处理的具体内容,比如:一个线程使用了25%的CPU,四个线程就将cpu耗尽,因此,并发100个处理,实际上还是4个线程在处理。I/O密集型,CPU密集型。大多数是多线程,其实也单线程,流式非阻塞。

18.购物车的价格计算可以使用流式计算来处理吗?能说下思路吗?有没有什么高性能的方式推荐?

解答:当商品添加到购物车的时候,就可以开始计算了。

以上是关于微服务之路spring cloud stream的主要内容,如果未能解决你的问题,请参考以下文章

Java之 Spring Cloud 微服务的 Spring Cloud Stream(第四个阶段)SpringBoot项目实现商品服务器端调用

Spring Cloud 升级之路 - 2020.0.x - 5. 理解 NamedContextF

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream微服务消息框架

Spring Cloud Stream微服务消息框架

spring cloud导入一个新的spring boot项目作为spring cloud的一个子模块微服务,怎么做/或者 每次导入一个新的spring boot项目,IDEA不识别子module(代