从 rabbitmq 获取已发布消息的响应。戈朗

Posted

技术标签:

【中文标题】从 rabbitmq 获取已发布消息的响应。戈朗【英文标题】:Get response from rabbitmq for the published message. Golang 【发布时间】:2020-07-31 09:47:26 【问题描述】:

我已经实现了消费消息的工作线程,用它们完成某种类型的工作并将结果发布到队列上。

我有api,它将接收http请求,向worker发布消息并等待worker发送响应。

我有两个队列,一个是worker,一个是gateway。

Api 将 -> 在工作队列上发布事件。使用网关队列中的事件(等待响应) 工作人员将 -> 使用工作人员队列中的事件。在网关队列上发布事件。

工人工作得很好,我从来没有遇到过问题。所以我会专注于api,因为那里有错误。

问题:

每次我的 api 发布消息时。它首先会生成一些 uuid,我们会在地图中记住这些 uuid,以便在我们使用响应时。我们可以匹配响应是哪个请求。

当我们消费消息时,第一次一切正常,我们可以将响应与请求相匹配。但是第二次,我们在地图中找不到该响应。尽管我已经添加了它。

这里有一些代码示例。

存储,将与地图一起使用的接口

package main

import "fmt"

type ChannelStorage interface 

    Add(uid string, message chan ResponseMessage)
    Delete(uid string)
    Get(uid string) chan ResponseMessage


func NewChannelStorage() ChannelStorage 
    return ChannelMapStorage
        channelMap: make(map[string]chan ResponseMessage),
    


type ChannelMapStorage struct 
    channelMap map[string]chan ResponseMessage


func (storage ChannelMapStorage) Add(uid string, message chan ResponseMessage) 
    fmt.Println(fmt.Sprintf("Adding Message: %s ", uid))
    storage.channelMap[uid] = message


func (storage ChannelMapStorage) Delete(uid string) 
    fmt.Println(fmt.Sprintf("Deleting Message: %s ", uid))
    delete(storage.channelMap, uid)


func (storage ChannelMapStorage) Get(uid string) chan ResponseMessage 
    fmt.Println(fmt.Sprintf("Getting Message: %s ", uid))
    return storage.channelMap[uid]

这是我的发布者,它将向工作队列发送事件。

package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

var channel *amqp.Channel

func init()  

    // Connect to the rabbit.
    conn, err := amqp.Dial(rabbitConfig.uri)
    if err != nil 
        panic(err)
    

    // create channel
    channel, err = conn.Channel()
    if err != nil 
        panic(err)
    


func publish(queueName string , data []byte, id string) error 

    // publish message
    return channel.Publish(
        "",            // exchange
        queueName, // routing key
        false,         // mandatory
        false,         // immediate
        amqp.Publishing
            CorrelationId: id,
            ContentType: "text/plain",
            Body:        data,
        ,
    )


这里是发件人,这是代码的一部分,所有操作都发生在这里。 它的工作是连接到rabbitmq,消费事件。将事件与请求匹配并通知我们得到响应。

package main

import (
    "encoding/json"
    "errors"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "time"
)

func init()  

    conn, err := amqp.Dial(rabbitConfig.uri)

    failOnError(err, "Publisher failed to connect to the rabbitmq")

    // Create channel
    channel, err := conn.Channel()

    failOnError(err, "Publisher failed to create channel")

    // Create queue
    queue, err := channel.QueueDeclare(
        RECIEVE_QUEUE_NAME, // channelname
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )

    failOnError(err, "Failed to create queue for consumer")

    // channel
    messages, err := channel.Consume(
        queue.Name, // queue
        "",         // consumer
        true,      // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // args
    )

    failOnError(err, "Failed on consuming event")

    go func() 
        for message:= range messages 
            go handleMessage(message)
        
    ()


func sendMessage(name string, id string ,requestType string) ([]byte, int, error) 

    // Create new task message
    message := TaskMessage
        Uid: uid(),
        ReplyTo: RECIEVE_QUEUE_NAME,
        Type: requestType,
        Name: name,
        Id: id,
    

    data ,err := json.Marshal(message)

    if err != nil 
        fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
        return nil, 0, err
    

    err = publish(SEND_QUEUE_NAME, data, message.Uid)

    if err != nil 
        fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
        return nil, 0, err
    

    // whenever we send message, we need to add it to the waiting response channel
    rchannel := make(chan ResponseMessage)
    channelStorage.Add(message.Uid, rchannel)

    // Wait for the response
    select 

    case response := <- rchannel:

        fmt.Println(fmt.Sprintf("Sending response: %s ", message.Uid))

        data := response.Response
        code := response.StatusCode

        channelStorage.Delete(message.Uid)

        return data, code, nil

    case <-time.After(3 * time.Second):

        // remove channel from rchans
        channelStorage.Delete(message.Uid)

        // Return timeout error.
        return nil, 0, errors.New("Response timed out on rabbit.")
    


func handleMessage(msg amqp.Delivery)  

    // Parse message.
    response := &ResponseMessage

    // Parse response.
    err := json.Unmarshal(msg.Body, response)

    if err != nil 
        log.Printf("ERROR: fail unmarshl: %s", msg.Body)
        return
    

    // find waiting channel(with uid) and forward the reply to it
    if channel := channelStorage.Get(response.Uid); channel != nil 
        channel <- *response
    


方法 sendMessage() 将发布事件,等待消费响应。将其映射到我们的请求并返回结果。

在这里我将创建新任务

// Create new task message
    message := TaskMessage
        Uid: uid(),
        ReplyTo: RECIEVE_QUEUE_NAME,
        Type: requestType,
        Name: name,
        Id: id,
    

    data ,err := json.Marshal(message)

    if err != nil 
        fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
        return nil, 0, err
    

使用来自 publisher.go 的方法发布任务

err = publish(SEND_QUEUE_NAME, data, message.Uid)

    if err != nil 
        fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
        return nil, 0, err
    

使用我们的存储接口来映射具有我们生成的唯一 ID 的响应。

rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)

等待通知的渠道是我们收到对已发布事件的响应。如果我们在 3 秒内没有得到响应,我们就会超时。

case response := <- rchannel:

        fmt.Println(fmt.Sprintf("Sending response: %s ", message.Uid))

        data := response.Response
        code := response.StatusCode

        channelStorage.Delete(message.Uid)

        return data, code, nil

    case <-time.After(3 * time.Second):

        // remove channel from rchans
        channelStorage.Delete(message.Uid)

        // Return timeout error.
        return nil, 0, errors.New("Response timed out on rabbit.")
    

问题发生在我们消费消息时,但尝试将 id 与响应映射。它表示它为我们地图中的 id 返回 nil。

func handleMessage(msg amqp.Delivery)  

    // Parse message.
    response := &ResponseMessage

    // Parse response.
    err := json.Unmarshal(msg.Body, response)

    if err != nil 
        log.Printf("ERROR: fail unmarshl: %s", msg.Body)
        return
    

    // find waiting channel(with uid) and forward the reply to it
    if channel := channelStorage.Get(response.Uid); channel != nil 
        channel <- *response
    

我的 main.go 是这样的,它会创建全局存储并运行 http 服务器:

package main

import (
    "fmt"
    "os"
)

var channelStorage ChannelStorage

func main()  

    channelStorage = NewChannelStorage()

    err := SetupRouter(NewApi()).Run(":8080")

    if err != nil 
        fmt.Println(err)
        os.Exit(1)
    

因为我对 golang 没有太多经验。问题可能会发生,因为我创建了全局存储或类似的东西。

【问题讨论】:

【参考方案1】:

首先:你的存储现在不是线程安全的,你应该使用 sync.Mutex 例如。

type ChannelMapStorage struct 
    m sync.Nutex
    channelMap map[string]chan ResponseMessage


func (storage *ChannelMapStorage) Add(uid string, message chan ResponseMessage) 
    storage.m.Lock()
    fmt.Println(fmt.Sprintf("Adding Message: %s ", uid))
    storage.channelMap[uid] = message
    storage.m.Unlock()

所有其他方法都一样。

第二:您应该在发布前将消息添加到存储中,如果发布失败则将其删除。

data ,err := json.Marshal(message)

if err != nil 
    fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
    return nil, 0, err


rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)

err = publish(SEND_QUEUE_NAME, data, message.Uid)
if err != nil 
    channelStorage.Del(message.Uid)
    fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
    return nil, 0, err

【讨论】:

以上是关于从 rabbitmq 获取已发布消息的响应。戈朗的主要内容,如果未能解决你的问题,请参考以下文章

获取 RabbitMQ 队列中的消息数

当 RabbitMQ 消费者崩溃时,获取的消息会发生啥?

SimpleAmqpClient Channel::basicConsume() ,如何从 RabbitMQ 获取所有消息

异步通信rabbitmq

RabbitMQ:从认识MQ到安装,学习消息模型等

RabbitMQ RPC 关闭最终消息的响应队列