浅谈RabbitMQ

Posted with the wind(随风)

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈RabbitMQ相关的知识,希望对你有一定的参考价值。

简单模拟MQ的作用 – 完成异步通讯


如图显示:有了RabbitMQ,你就可以和三个妹子同时进行短信聊天,就不需要为无法同时和三个妹子语音聊天而苦恼

MQ存在的意义 (解决同步通讯的存在的缺点)


如图所示:当用户完成了支付功能,我们需要仓储服务对应商品数量减1。当我们的支付服务频繁调用仓储服务之后,仓储服务挂掉了,这个时候支付服务想再调仓储服务就调不了,然后支付服务也阻塞在这里了,然后系统崩了。

同步存在的问题

初探MQ的作用

1. 服务解耦合

2. 性能提升,吞吐量提高

3. 服务没有强依赖,不担心级联失败问题

ps:概念啥的,谁想看啊

我们直接进入RabbitMQ的使用

安装RabbitMQ

PS:默认你们安装好了docker在自己的虚拟机或者自己的服务器上面了
docker pull rabbitmq:3-management(远程安装,版本可以自主选择)
--------------------------------------
也可以在自己电脑上面下载rabbitmq.tar,然后上传到你的虚拟机或者服务器,然后
docker load -i mq.tar

---------------------------
接下来,我们就可以使用docker运行我们的RabbitMQ了
docker run \\
 -e RABBITMQ_DEFAULT_USER=xxx \\
 -e RABBITMQ_DEFAULT_PASS=xxx\\
 --name mq \\
 --hostname mq1 \\
 -p 15672:15672 \\        (注意:这是管理页面端口)
 -p 5672:5672 \\          (注意:这是通讯端口)
 -d \\            (后台运行)
 rabbitmq:3-management

别直接复制上面的运行指令直接运行了,xxx,xxx这两个你自主设置一下,把括号和括号里面的内容删除一下
祝你好运

登录管理页面( ip地址:15672 )



如果你能登录进去说明你的安装配置都没有问题了,恭喜你。

整合RabbitMQ到项目里面

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


我们可以创建一个普通的maven项目为父项目,把依赖放在里面,再创建两个springboot子项目,分别作为消息的发布者和消息的消费者
ps:需要这个项目文件的可加qq:2338244917
然后我们在连个子项目中连接上我们的RabbitMQ

spring:
  rabbitmq:
    host: 47.98.251.192  # ip地址(你们懂的)
    port: 5672    # 这个你就别填15672了,那个是登录管理面板使用的
    username: xxx
    password: xxx
    virtual-host: /   # 虚拟主机地址:可以在管理面板创建,不然就和我一样
// 我们可以在项目publisher的测试类里面测试和学习
    @Autowired
    private RabbitTemplate rabbitTemplate; //注入对象
	@Test
    public void test01() {
        String queueName = "simple.queue";//队列名
        String msg = "hello spring amqp!";//发送的信息
        rabbitTemplate.convertAndSend(queueName,msg);
    }
//我们在consumer项目里面创建一个listeber包,然后写一个类
//使用注解@RabbitListener,然后填出正确队列名,
//我们String msg的类型要和消息发布的类型一致
	@RabbitListener(queues = "simple.queue")
    public void listenSimpleMessage(String msg) {
        System.out.println("消费者接收到simple.queue的消息:"+msg);
    }

通过上面的操作,我们就可以让我们的消费者时刻消费发布者的消息了

上面的例子,我们就会出现一个问题。假设:我们有两个消费者(我们可以按照上面的多做一个@RabbitListener(queues=“xxx.xxx”))就行了,这个时候,我们的发布者发送50条信息,消费者我们搞连个来消费这些消息,一个1秒处理50条,一个1秒处理20条。

这样,这两个消费者处理一样数量的消息

原因是因为要一个处理完,下一个才能处理,这样有个乱用,我们要能者多劳

spring:
  rabbitmq:
    host: 47.98.251.199
    username: root
    password: 922815
    port: 5672
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # 每次取一条消息,处理完成才能获取下一条消息

在consumer项目配置一下就可以解决这个问题了

最后聊一聊交换机(Exchange)

交换机不储存消息

FanoutExchange

作用: 当我们消息的发送者发送消息成功之后,我们的FanoutExchange就会让绑定在它上面的队列queue全部都能消费到每一个消息

//consumer项目下
package cn.itcast.mq.cofig;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    //声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("lin.fanout");
    }
    //声明第一个队列
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
    //绑定队列1和交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }

    //声明第二个队列
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }
    //绑定队列2和交换机
    @Bean
    public Binding bindingQueue2(Queue 		fanoutQueue2,FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }

    //...略,以相同方式声明第n个队列,并完成绑定
}
//publisher项目
	@Test
    public void testSendFanoutExchange() {
        //交换机名称
        String exchangeName = "lin.fanout";
        //消息
        String message = "Hello Everyone!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }

前面的代码是在consumer项目里面的,后面的代码是publisher项目的代码

DirectExchange

作用: 我们可以通过DirectExchange(路由交换机),通过发送信息时,绑定routingKey来指定我们发送给哪个消息队列消费

//consumer项目下
	@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "lin.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenDirectQueue1(String msg) {
        System.out.println("消费者接收到direct.queue1消息:["+msg+"]");
    }
//publisher项目下
	@Test
    public void testSendDirectExchange1() {
        //交换机名称
        String exchangeName = "lin.direct";
        //消息
        String message = "Hello yellow!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"yellow",message);
    }

TopicExchange

作用: 这玩意的作用相当于把消息队列分类

#:代表0个或者1个单词,*:代表1个单词

	@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "lin.topic",type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue2(String msg) {
        System.out.println("消费者接收到topic.queue1的消息:["+msg+"]");
    }
//publisher项目
	@Test
    public void testSendTopicExchange1() {
        //交换机名字
        String exchangeName = "lin.topic";

        //消息
        String msg = "Hello,topicExchange!";

        //发送
        rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);
    }

ps:最后的最后渴望变成天使
大家别走,听我bb完最后一个知识点,b完我就下班了

SpringAMQP-消息转换器

它的作用就是,当我们消息的发布者发布一个消息是一个对象给rabbitMQ的时候,我们jdk有它一套方式,这方式不但性能不行,还不安全,我们要取代它

引入依赖

<!--jackson-databind-->
<!--我们在父项目依赖里面引入,这样我们学习的时候就能让子项目也能用了-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

注入@Bean

这个@Bean我们可以在springboot项目入口那个类里面注入
当然在配置类里面注入也可以

//这玩意就是覆盖jdk编码格式的
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
   //上面是为了避免你们导错包
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

我们简单来使用一下它

	@RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String,Object> msg) {
        System.out.println("接收到object.queue的消息:"+msg);
    }
	@Test
    public void testSendDirectExchange5() {
        //队列名字
        String queryName = "object.queue";
        //消息
        Map<String,Object> msg = new HashMap<>();
        msg.put("name","柳岩");
        msg.put("age",21);
        rabbitTemplate.convertAndSend(queryName,msg);
    }

好了,到此浅谈RabbitMQ就告一段落了

感谢能看到这里还不走的读者

要是还能给我点个赞就更加感谢了

如有问题,可以联系qq:2338244917

作者:随风

以上是关于浅谈RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

浅谈Python的RabbitMQ使用

浅谈RabbitMQ——死信队列与延迟队列

浅谈RabbitMQ——死信队列与延迟队列

浅谈RabbitMQ——死信队列与延迟队列

浅谈Mybatis

浅谈AngularJS中的$parse和$eval