从 rabbitmq 获取已发布消息的响应。戈朗
Posted
技术标签:
【中文标题】从 rabbitmq 获取已发布消息的响应。戈朗【英文标题】:Get response from rabbitmq for the published message. Golang 【发布时间】:2020-07-31 09:47:26 【问题描述】:我已经实现了消费消息的工作线程,用它们完成某种类型的工作并将结果发布到队列上。
我有api,它将接收http请求,向worker发布消息并等待worker发送响应。
我有两个队列,一个是worker,一个是gateway。
Api 将 -> 在工作队列上发布事件。使用网关队列中的事件(等待响应) 工作人员将 -> 使用工作人员队列中的事件。在网关队列上发布事件。
工人工作得很好,我从来没有遇到过问题。所以我会专注于api,因为那里有错误。
问题:
每次我的 api 发布消息时。它首先会生成一些 uuid,我们会在地图中记住这些 uuid,以便在我们使用响应时。我们可以匹配响应是哪个请求。
当我们消费消息时,第一次一切正常,我们可以将响应与请求相匹配。但是第二次,我们在地图中找不到该响应。尽管我已经添加了它。
这里有一些代码示例。
存储,将与地图一起使用的接口
package main
import "fmt"
type ChannelStorage interface
Add(uid string, message chan ResponseMessage)
Delete(uid string)
Get(uid string) chan ResponseMessage
func NewChannelStorage() ChannelStorage
return ChannelMapStorage
channelMap: make(map[string]chan ResponseMessage),
type ChannelMapStorage struct
channelMap map[string]chan ResponseMessage
func (storage ChannelMapStorage) Add(uid string, message chan ResponseMessage)
fmt.Println(fmt.Sprintf("Adding Message: %s ", uid))
storage.channelMap[uid] = message
func (storage ChannelMapStorage) Delete(uid string)
fmt.Println(fmt.Sprintf("Deleting Message: %s ", uid))
delete(storage.channelMap, uid)
func (storage ChannelMapStorage) Get(uid string) chan ResponseMessage
fmt.Println(fmt.Sprintf("Getting Message: %s ", uid))
return storage.channelMap[uid]
这是我的发布者,它将向工作队列发送事件。
package main
import (
"fmt"
"github.com/streadway/amqp"
)
var channel *amqp.Channel
func init()
// Connect to the rabbit.
conn, err := amqp.Dial(rabbitConfig.uri)
if err != nil
panic(err)
// create channel
channel, err = conn.Channel()
if err != nil
panic(err)
func publish(queueName string , data []byte, id string) error
// publish message
return channel.Publish(
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing
CorrelationId: id,
ContentType: "text/plain",
Body: data,
,
)
这里是发件人,这是代码的一部分,所有操作都发生在这里。 它的工作是连接到rabbitmq,消费事件。将事件与请求匹配并通知我们得到响应。
package main
import (
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func init()
conn, err := amqp.Dial(rabbitConfig.uri)
failOnError(err, "Publisher failed to connect to the rabbitmq")
// Create channel
channel, err := conn.Channel()
failOnError(err, "Publisher failed to create channel")
// Create queue
queue, err := channel.QueueDeclare(
RECIEVE_QUEUE_NAME, // channelname
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to create queue for consumer")
// channel
messages, err := channel.Consume(
queue.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed on consuming event")
go func()
for message:= range messages
go handleMessage(message)
()
func sendMessage(name string, id string ,requestType string) ([]byte, int, error)
// Create new task message
message := TaskMessage
Uid: uid(),
ReplyTo: RECIEVE_QUEUE_NAME,
Type: requestType,
Name: name,
Id: id,
data ,err := json.Marshal(message)
if err != nil
fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
return nil, 0, err
err = publish(SEND_QUEUE_NAME, data, message.Uid)
if err != nil
fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
return nil, 0, err
// whenever we send message, we need to add it to the waiting response channel
rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)
// Wait for the response
select
case response := <- rchannel:
fmt.Println(fmt.Sprintf("Sending response: %s ", message.Uid))
data := response.Response
code := response.StatusCode
channelStorage.Delete(message.Uid)
return data, code, nil
case <-time.After(3 * time.Second):
// remove channel from rchans
channelStorage.Delete(message.Uid)
// Return timeout error.
return nil, 0, errors.New("Response timed out on rabbit.")
func handleMessage(msg amqp.Delivery)
// Parse message.
response := &ResponseMessage
// Parse response.
err := json.Unmarshal(msg.Body, response)
if err != nil
log.Printf("ERROR: fail unmarshl: %s", msg.Body)
return
// find waiting channel(with uid) and forward the reply to it
if channel := channelStorage.Get(response.Uid); channel != nil
channel <- *response
方法 sendMessage() 将发布事件,等待消费响应。将其映射到我们的请求并返回结果。
在这里我将创建新任务
// Create new task message
message := TaskMessage
Uid: uid(),
ReplyTo: RECIEVE_QUEUE_NAME,
Type: requestType,
Name: name,
Id: id,
data ,err := json.Marshal(message)
if err != nil
fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
return nil, 0, err
使用来自 publisher.go 的方法发布任务
err = publish(SEND_QUEUE_NAME, data, message.Uid)
if err != nil
fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
return nil, 0, err
使用我们的存储接口来映射具有我们生成的唯一 ID 的响应。
rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)
等待通知的渠道是我们收到对已发布事件的响应。如果我们在 3 秒内没有得到响应,我们就会超时。
case response := <- rchannel:
fmt.Println(fmt.Sprintf("Sending response: %s ", message.Uid))
data := response.Response
code := response.StatusCode
channelStorage.Delete(message.Uid)
return data, code, nil
case <-time.After(3 * time.Second):
// remove channel from rchans
channelStorage.Delete(message.Uid)
// Return timeout error.
return nil, 0, errors.New("Response timed out on rabbit.")
问题发生在我们消费消息时,但尝试将 id 与响应映射。它表示它为我们地图中的 id 返回 nil。
func handleMessage(msg amqp.Delivery)
// Parse message.
response := &ResponseMessage
// Parse response.
err := json.Unmarshal(msg.Body, response)
if err != nil
log.Printf("ERROR: fail unmarshl: %s", msg.Body)
return
// find waiting channel(with uid) and forward the reply to it
if channel := channelStorage.Get(response.Uid); channel != nil
channel <- *response
我的 main.go 是这样的,它会创建全局存储并运行 http 服务器:
package main
import (
"fmt"
"os"
)
var channelStorage ChannelStorage
func main()
channelStorage = NewChannelStorage()
err := SetupRouter(NewApi()).Run(":8080")
if err != nil
fmt.Println(err)
os.Exit(1)
因为我对 golang 没有太多经验。问题可能会发生,因为我创建了全局存储或类似的东西。
【问题讨论】:
【参考方案1】:首先:你的存储现在不是线程安全的,你应该使用 sync.Mutex 例如。
type ChannelMapStorage struct
m sync.Nutex
channelMap map[string]chan ResponseMessage
func (storage *ChannelMapStorage) Add(uid string, message chan ResponseMessage)
storage.m.Lock()
fmt.Println(fmt.Sprintf("Adding Message: %s ", uid))
storage.channelMap[uid] = message
storage.m.Unlock()
所有其他方法都一样。
第二:您应该在发布前将消息添加到存储中,如果发布失败则将其删除。
data ,err := json.Marshal(message)
if err != nil
fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
return nil, 0, err
rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)
err = publish(SEND_QUEUE_NAME, data, message.Uid)
if err != nil
channelStorage.Del(message.Uid)
fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
return nil, 0, err
【讨论】:
以上是关于从 rabbitmq 获取已发布消息的响应。戈朗的主要内容,如果未能解决你的问题,请参考以下文章
SimpleAmqpClient Channel::basicConsume() ,如何从 RabbitMQ 获取所有消息