golang使用rabbitmqRPC
Posted 吴冬冬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang使用rabbitmqRPC相关的知识,希望对你有一定的参考价值。
之前的章节都是在说一方发指令,另一方执行这种情况,指令发送者不关系指令的完成情况,或者不需要阻塞去获得结果,结果异步通知。
但很多时候是需要阻塞得去获得执行结果的,比如http的请求。
在rabbitmq使用RPC的关键就是ReplyTo
属性
err = ch.ExchangeDeclare("rpc_topic", amqp.ExchangeTopic, true, false, false, false, nil)
failOnError(err, "Failed to declare a exchange")
q, err := ch.QueueDeclare("", false, false, true, false, nil)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError(err, "Failed to register a consumer")
corrId := randomString(32)
err = ch.Publish("rpc_topic", "fib", false, false, amqp.Publishing
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
)
failOnError(err, "Failed to publish a message")
for msg := range msgs
if corrId == msg.CorrelationId
res, err = strconv.Atoi(string(msg.Body))
failOnError(err, "Failed to convert body to integer")
break
往exchange里发消息的时候就指定回复队列的名字,然后等待回复队列的回复。
而作为被调用的一侧,收到消息处理完之后直接往回复队列里丟消息即可。如果同类消息想复用回复队列,不用每次都创建新的队列,那么可以用CorrelationId
做一个标识
err = ch.ExchangeDeclare("rpc_topic", amqp.ExchangeTopic, true, false, false, false, nil)
failOnError(err, "Failed to declare a exchange")
q, err := ch.QueueDeclare("", false, true, false, false, nil)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(q.Name, "fib", "rpc_topic", false, nil)
failOnError(err, "Failed to bind a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
failOnError(err, "Failed to register a consumer")
go func()
for msg := range msgs
n, err := strconv.Atoi(string(msg.Body))
failOnError(err, "Failed to convert body to integer")
log.Printf(" [.] fib(%d)", n)
res := fib(n)
err = ch.Publish("", msg.ReplyTo, false, false, amqp.Publishing
ContentType: "text/plain",
CorrelationId: msg.CorrelationId,
Body: []byte(strconv.Itoa(res)),
)
failOnError(err, "Failed to publish a message")
msg.Ack(false)
()
log.Printf(" [*] Awaiting RPC requests")
以上是关于golang使用rabbitmqRPC的主要内容,如果未能解决你的问题,请参考以下文章