Go语言使用RabbitMQ

Posted 稚与

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go语言使用RabbitMQ相关的知识,希望对你有一定的参考价值。

基本概念

什么是消息队列

消息队列是一种应用(进程)间的通信方式。

生产者只需把消息发布到MQ,消费者只需重MQ中取出,可靠传递由消息队列中的消息系统来确保。

消息队列有什么用

消息队列是一种异步协作机制,最根本的用处在于将一些不需要即时生效的操作拆分出来异步执行,从而达到可靠传递、流量削峰等目的。

比如如果有一个业务需要发送短信,可以在主流程完成之后发送消息到MQ后,让主流程完结。而由另外的线程拉取MQ的消息,完成发送短信的操作。

常用的消息队列

常用的MQ大概有ActiveMQ、RabbitMQ、RocketMQ、Kafka

  • ActiveMQ,基于Java

优点:对Java的JMS支持最好;多线程并发;

缺点:历史悠久,版本更新慢。现在慢慢用的少了;

  • RabbitMQ,基于Erlang

优点:生态丰富,是现在主流的MQ;支持多种客户端、支持AJAX;

缺点:对想深入源码的Java选手不太友好;

  • RocketMQ,基于Java

优点:为海量数据打造;主张拉模式;天然集群、HA、负载均衡;

缺点:生态较小

  • Kafka,基于Scala

优点:分布式高可拓展;高性能;容错强

缺点:消息重复;乱序;维护成本高

什么是RabbitMQ

消息中间件

erlang:一种并发函数式语言

AMQP:Advanced Message Queuing Protocol,高级消息队列协议。由Exchange、Queue和Bind组成

RabbitMQ是一个erlang开发的AMQP实现

生产者将消息发送到Exchange上,通过Exchange从而Binding到Queues上。

Exchange有三种具体类型:

  • direct:如果消息中的RoutingKey和Binding中的BindingKey一致就转发
  • fanout:消息被分发到所有队列中
  • topic:将RoutingKey和队列的模式进行匹配

应用场景

异步

可以理解为将遇到非必须的业务时,立即响应客户端,不关系业务何时完成

比如在用户注册时,有将信息写入数据库发送注册成功邮件两项业务。

数据库写入完成即标志着用户注册成功,此时如果继续处理发送邮件的业务,会给客户端带来不必要的等待时间。引入消息队列后,在队列中写入完成注册的消息后,即可完成整个注册流程。至于邮件,可以等到邮件业务从消息队列中取出消息再发送。

把不紧急的业务从主线中剥离出来,主线不必考虑不紧急的业务何时完成的时候,可以考虑使用消息队列实现异步。

解耦

考虑两个系统间存在消息传递,一个系统的故障会影响到整个业务的正常运转。可以用消息队列来保证消息可靠传递

比如一个订单系统和一个库存系统,完成订单之后,需要进行库存调度。考虑到如果库存系统故障,会引起已完成的订单消息的丢失,而做很多异常处理会使业务变得臃肿。这个时候可考虑引入消息队列,使用消息队列保证可靠传输,从而减少业务逻辑。

削峰

考虑短时间的大量请求,可能会带来内存溢出、大面积连接超时等情况,使得服务器崩溃。引入消息队列后,可以控制请求到业务处理系统的流量,从而防止崩溃现象的出现。

比如秒杀场景。大量请求同时涌入,服务器不能分配足够的资源响应,或者带宽不足,导致宕机。可以引入消息队列来限流,MQ通过限制同一时间的出口消息,使得流量在服务器能够承受的范围之内。等待一部分请求处理完成之后,再向业务处理系统导入新的消息。

Go语言使用RabbitMQ

docker安装RabbitMQ

sudo docker pull rabbitmq
sudo docker images
sudo docker run --name rabbitmq -d -p 5672:5672
sudo docker ps

go安装rabbitmq客户端

go get github.com/streadway/amqp

HelloRabbitMQ

send.go

func main() 
	conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
	defer conn.Close()
	ch, _ := conn.Channel()
	defer ch.Close()
	q, _ := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	body := "Hello World!"
	_ = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing
			ContentType: "text/plain",
			Body:        []byte(body),
		)

recv.go

func main() 
	conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
	defer conn.Close()
	ch, _ := conn.Channel()
	defer ch.Close()
	q, _ := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	msgs, _ := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	forever := make(chan bool)
	go func() 
		for d := range msgs 
			log.Printf("Received a message: %s", d.Body)
		
	()
	<-forever

Reference

几种常见的MQ总结对比

消息队列之RabbitMQ

服务为什么会崩溃

17 | 消息队列:秒杀时如何处理每秒上万次的下单请求?

docker安装RabbitMq

RabbitMQ Go语言客户端教程1——HelloWorld

以上是关于Go语言使用RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

消息队列rabbitmq的五种工作模式(go语言版本)

Go语言学习笔记golang 操作 Redis & Mysql & RabbitMQ

Go RabbitMQ主题

关于Go语言,你可能会讨厌的五件事

GO语言(二十五):管理依赖项(上)-

全网最强C语言学习C语言入门篇(主线)——初识C语言①