golang使用rabbitmqTopic模式

Posted 吴冬冬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang使用rabbitmqTopic模式相关的知识,希望对你有一定的参考价值。

简介

上一篇用可选择接受的direct交换器代替了广播接受到fanout交换器,但是direct模式还是有局限性,它不能通过多个条件路由。

比如linux的日志系统,它不仅分级别(info/warn/crit…),而且还分来源设备(auth/cron/kern…)。

我们需要一个非常复杂度组合,比如想接受来自cron的critical日志和来自kern的所有日志。

当然topic交换器可以实现这一切。

Topic交换器

发往topic交换器的routing_key不能随意写,它必须是.分隔的几个词。词可以随意,但是最好可以说明信息特点,routing_key的最大长度是255字节。

binding key也是类似的格式,topic交换器的绑定逻辑和redict一样,根据routing_key把信息发往所有绑定了对应banding key的队列中。

*代替一个词,#代表0个和多个词

topic交换器非常强大,它可以替代其他交换器

当一个队列使用#绑定到topic交换器时,topic交换器看起来就像一个fanout交换器。

当队列的banding key没有任何*或者#时,topic交换器看起来又像一个direct交换器。

示例

发送者

package main

import (
	"github.com/streadway/amqp"
	"log"
	"os"
	"strings"
)

func main() 
	conn, err := amqp.Dial("amqp://admin:rabbitmq123@xxx.xxx.xxx.xxx:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare("logs_topic", amqp.ExchangeTopic, true, false, false, false, nil)
	failOnError(err, "Failed to declare an exchange")

	body := bodyFrom(os.Args)

	err = ch.Publish("logs_topic", severityFrom(os.Args), false, false, amqp.Publishing
		ContentType: "text/plain",
		Body:        []byte(body),
	)
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)


func severityFrom(args []string) string 
	if len(args) > 1 
		return args[1]
	
	return "anonymous.info"


func bodyFrom(args []string) string 
	if len(args) > 2 
		return strings.Join(args[2:], " ")
	
	return "hello"


func failOnError(err error, msg string) 
	if err != nil 
		log.Fatalf("%s: %s", msg, err)
	

接受者

package main

import (
	"github.com/streadway/amqp"
	"log"
	"os"
)

func main()  
	conn, err := amqp.Dial("amqp://admin:rabbitmq123@18.232.146.30:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare("logs_topic", amqp.ExchangeTopic, true, false, false, false, nil)
	failOnError(err, "Failed to declare an exchange")

	q,err:=ch.QueueDeclare("",false,false,false,false,nil)
	failOnError(err, "Failed to declare a queue")

	if len(os.Args) < 2 
		log.Printf("Usage: %s [binding_key]...", os.Args[0])
		os.Exit(0)
	

	for _, key := range os.Args[1:] 
		log.Printf("Binding queue %s to exchange %s with routing key %s",
			q.Name, "logs_topic", key)
		err=ch.QueueBind(q.Name,key,"logs_topic",false,nil)
		failOnError(err, "Failed to bind a queue")
	

	msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")

	go func() 
		for msg := range msgs 
			log.Printf("Received a message: %s", msg.Body)
		
	()

	<-make(chan bool)



func failOnError(err error, msg string) 
	if err != nil 
		log.Fatalf("%s: %s", msg, err)
	

以上是关于golang使用rabbitmqTopic模式的主要内容,如果未能解决你的问题,请参考以下文章

DES--------Golang对称加密之模式问题实战

Vue Apollo:变量未添加到 graphql 查询,或者我在后端没有正确接受它们(Golang)

如果标准 golang big.Int 函数接受两个参数并返回一个值,为啥它会使用接收器?

Golang 持久通道接受来自多个函数调用的输入

Golang入门到项目实战 golang方法

由于 CORS,Nginx 反向代理后面的 Golang 应用程序不会接受 ajax 请求