golang使用rabbitmqRPC

Posted 吴冬冬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang使用rabbitmqRPC相关的知识,希望对你有一定的参考价值。

之前的章节都是在说一方发指令,另一方执行这种情况,指令发送者不关系指令的完成情况,或者不需要阻塞去获得结果,结果异步通知。

但很多时候是需要阻塞得去获得执行结果的,比如http的请求。

在rabbitmq使用RPC的关键就是ReplyTo属性

err = ch.ExchangeDeclare("rpc_topic", amqp.ExchangeTopic, true, false, false, false, nil)
	failOnError(err, "Failed to declare a exchange")

	q, err := ch.QueueDeclare("", false, false, true, false, nil)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")

	corrId := randomString(32)
	err = ch.Publish("rpc_topic", "fib", false, false, amqp.Publishing
		ContentType:   "text/plain",
		CorrelationId: corrId,
		ReplyTo:       q.Name,
		Body:          []byte(strconv.Itoa(n)),
	)
	failOnError(err, "Failed to publish a message")

	for msg := range msgs 
		if corrId == msg.CorrelationId 
			res, err = strconv.Atoi(string(msg.Body))
			failOnError(err, "Failed to convert body to integer")
			break
		
	

往exchange里发消息的时候就指定回复队列的名字,然后等待回复队列的回复。

而作为被调用的一侧,收到消息处理完之后直接往回复队列里丟消息即可。如果同类消息想复用回复队列,不用每次都创建新的队列,那么可以用CorrelationId做一个标识

err = ch.ExchangeDeclare("rpc_topic", amqp.ExchangeTopic, true, false, false, false, nil)
	failOnError(err, "Failed to declare a exchange")

	q, err := ch.QueueDeclare("", false, true, false, false, nil)
	failOnError(err, "Failed to declare a queue")

	err = ch.QueueBind(q.Name, "fib", "rpc_topic", false, nil)
	failOnError(err, "Failed to bind a queue")

	err = ch.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, // global
	)
	failOnError(err, "Failed to set QoS")

	msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")
	go func() 
		for msg := range msgs 
			n, err := strconv.Atoi(string(msg.Body))
			failOnError(err, "Failed to convert body to integer")

			log.Printf(" [.] fib(%d)", n)

			res := fib(n)

			err = ch.Publish("", msg.ReplyTo, false, false, amqp.Publishing
				ContentType:   "text/plain",
				CorrelationId: msg.CorrelationId,
				Body:          []byte(strconv.Itoa(res)),
			)
			failOnError(err, "Failed to publish a message")
			msg.Ack(false)
		
	()
	log.Printf(" [*] Awaiting RPC requests")

以上是关于golang使用rabbitmqRPC的主要内容,如果未能解决你的问题,请参考以下文章

golang使用rabbitmqRPC

golang使用rabbitmqRPC

RabbitMQ RPC:排他队列锁定 @ PHP

Spring Boot 和异步 RabbitMQ RPC

什么是合同类以及如何使用它

在不同类中使用带有代码的按钮