RabbitMQ瀹樻柟鏁欑▼浜?Work Queues(GOLANG璇█瀹炵幇)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ瀹樻柟鏁欑▼浜?Work Queues(GOLANG璇█瀹炵幇)相关的知识,希望对你有一定的参考价值。

鏍囩锛?a href='http://www.mamicode.com/so/1/%e5%91%bd%e4%bb%a4' title='鍛戒护'>鍛戒护   铏氭嫙   text   鍖呭惈   璁剧疆   argument   鍐呭瓨   routing   

RabbitMQ瀹樻柟鏁欑▼浜?Work Queues(GOLANG璇█瀹炵幇)

鎶€鏈浘鐗? src=鍑嗗

鍦ㄦ湰鏁欑▼鐨勫墠涓€閮ㄥ垎锛屾垜浠彂閫佷簡涓€鏉″寘鍚€?Hello World锛佲€濈殑娑堟伅銆?鐜板湪锛屾垜浠皢鍙戦€佷唬琛ㄥ鏉備换鍔$殑瀛楃涓层€?鎴戜滑娌℃湁鐪熷疄鐨勪换鍔★紝渚嬪瑕佽皟鏁村ぇ灏忕殑鍥惧儚鎴栬娓叉煋鐨刾df鏂囦欢锛屽洜姝ゆ垜浠亣瑁呰€楁椂浠诲姟-浣跨敤time.Sleep鍑芥暟鏉ヤ吉閫犲畠銆?鎴戜滑灏嗗瓧绗︿覆涓殑鐐规暟浣滀负瀹冪殑澶嶆潅搴︺€?姣忎釜鐐瑰皢鍗犫€滃伐浣溾€濈殑涓€绉掋€?渚嬪锛孒ello ...鎻忚堪鐨勮櫄鎷熶换鍔″皢鑺辫垂涓夌閽熴€?/p>

鎴戜滑灏嗙◢寰慨鏀逛笂涓€涓ず渚嬩腑鐨剆end.go浠g爜锛屼互鍏佽浠庡懡浠よ鍙戦€佷换鎰忔秷鎭€?璇ョ▼搴忎細灏嗕换鍔″畨鎺掑埌鎴戜滑鐨勫伐浣滈槦鍒椾腑锛屽洜姝ゆ垜浠皢鍏跺懡鍚嶄负new_task.go锛?/p>

#new_task.go

package main

import (
    "github.com/streadway/amqp"
    "log"
    "os"
    "strings"
)

func main(){
    // 杩炴帴RabbitMQ鏈嶅姟鍣?    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()
    // 鍒涘缓涓€涓猚hannel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err  := ch.QueueDeclare(
        "hello",            // 闃熷垪鍚嶇О
        false,          // 鏄惁鎸佷箙鍖?        false,      // 鏄惁鑷姩鍒犻櫎
        false,          // 鏄惁鐙珛
        false,nil,
        )
    failOnError(err, "Failed to declare a queue")

    body := bodyForm(os.Args)
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            DeliveryMode:amqp.Persistent,
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s", body)
}

func bodyForm(args []string) string{
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

// 甯姪鍑芥暟妫€娴嬫瘡涓€涓猘mqp璋冪敤
func failOnError(err error, msg string)  {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

#worker.go

package main

import (
    "bytes"
    "github.com/streadway/amqp"
    "log"
    "time"
)

func main(){
    // 杩炴帴RabbitMQ鏈嶅姟鍣?    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()
    // 鍒涘缓涓€涓猚hannel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()
    // 鐩戝惉闃熷垪
    q, err  := ch.QueueDeclare(
        "hello",            // 闃熷垪鍚嶇О
        false,          // 鏄惁鎸佷箙鍖?        false,      // 鏄惁鑷姩鍒犻櫎
        false,          // 鏄惁鐙珛
        false,nil,
    )
    failOnError(err, "Failed to declare a queue")
    // 娑堣垂闃熷垪
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            // 缁熻string涓殑`.`鏉ヨ〃绀烘墽琛屾椂闂?            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

// 甯姪鍑芥暟妫€娴嬫瘡涓€涓猘mqp璋冪敤
func failOnError(err error, msg string)  {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

寮€鍚袱涓獁orker鍑嗗鎺ュ彈闃熷垪涓暟鎹?/p>

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

鎵цnew_task.go杩涜鍙戦€佹暟鎹?/p>


# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....

寰楀埌缁撴灉濡備笅
鎶€鏈浘鐗? src=娑堟伅纭

鎵ц浠诲姟鍙兘闇€瑕佸嚑绉掗挓銆?鎮ㄥ彲鑳芥兂鐭ラ亾锛屽鏋滃叾涓竴涓秷璐硅€呭紑濮嬩竴椤规极闀跨殑浠诲姟鑰屼粎閮ㄥ垎瀹屾垚鑰屾鎺夛紝浼氬彂鐢熶粈涔堟儏鍐点€?浣跨敤鎴戜滑褰撳墠鐨勪唬鐮侊紝RabbitMQ涓€鏃﹀悜娑堣垂鑰呭彂閫佷簡涓€鏉℃秷鎭紝渚跨珛鍗冲皢鍏舵爣璁颁负鍒犻櫎銆?鍦ㄨ繖绉嶆儏鍐典笅锛屽鏋滄偍鏉€姝讳竴涓獁orker锛屾垜浠皢涓㈠け姝e湪澶勭悊鐨勬秷鎭€?鎴戜滑杩樺皢涓㈠け鎵€鏈夊彂閫佺粰璇ョ壒瀹氬伐浣滀汉鍛樹絾灏氭湭澶勭悊鐨勬秷鎭€?/p>

浣嗘槸鎴戜滑涓嶆兂涓㈠け浠讳綍浠诲姟銆?濡傛灉涓€涓獁orker姝讳骸锛屾垜浠笇鏈涘皢浠诲姟浜や粯缁欏彟涓€涓獁orker銆?/p>

涓轰簡纭繚娑堟伅姘镐笉涓㈠け锛孯abbitMQ鏀寔娑堟伅纭銆?娑堣垂鑰呭彂閫佸洖涓€涓秷鎭‘璁わ紝鍛婄煡RabbitMQ鐗瑰畾鐨勬秷鎭凡琚帴鏀讹紝澶勭悊锛屽苟涓擱abbitMQ鍙互鑷敱鍒犻櫎瀹冦€?/p>

濡傛灉浣跨敤鑰呭畷鏈猴紙鍏堕€氶亾宸插叧闂紝杩炴帴宸插叧闂垨TCP杩炴帴涓㈠け锛夎€屾病鏈夊彂閫佺‘璁わ紝RabbitMQ灏嗕簡瑙f秷鎭湭瀹屽叏澶勭悊锛屽苟灏嗛噸鏂版帓闃熴€?濡傛灉鍚屾椂鏈夊叾浠栨秷璐硅€呭湪绾匡紝瀹冨皢寰堝揩灏嗗叾閲嶆柊鍒嗗彂缁欏彟涓€涓秷璐硅€呫€?杩欐牱锛屾偍鍙互纭繚鍗充娇worker鍋跺皵瀹曟満涔熶笉浼氫涪澶变换浣曟秷鎭€?/p>

RabbitMQ娌℃湁浠讳綍娑堟伅瓒呮椂璁剧疆锛?娑堣垂鑰呮浜℃椂锛孯abbitMQ灏嗛噸鏂颁紶閫掓秷鎭€?鍗充娇澶勭悊涓€鏉℃秷鎭姳璐归潪甯搁潪甯搁暱鐨勬椂闂翠篃娌″叧绯汇€?/p>

鍦ㄦ湰鏁欑▼涓紝鎴戜滑灏嗛€氳繃涓衡€?auto-ack鈥濆弬鏁颁紶閫掍竴涓猣alse鏉ヤ娇鐢ㄦ墜鍔ㄦ秷鎭‘璁わ紝鐒跺悗涓€鏃﹀湪浠诲姟瀹屾垚鍚庝娇鐢╠.Ack锛坒alse锛変粠worker鍙戦€侀€傚綋鐨勭‘璁ゃ€?/p>

#worker.go淇敼閮ㄩ棬浠g爜
msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // 鏄惁鑷姩娑堟伅纭
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)
    log.Printf("Done")
    d.Ack(false)
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

浣跨敤姝や唬鐮侊紝鎴戜滑鍙互纭繚锛屽嵆浣挎偍鍦ㄥ鐞嗘秷鎭椂浣跨敤CTRL + C鏉€姝粀orker锛屼篃涓嶄細涓㈠け浠讳綍淇℃伅銆?worker鍚庝笉涔咃紝鎵€鏈夋湭纭鐨勬秷鎭皢閲嶆柊鍙戦€併€?/p>

娑堟伅纭蹇呴』涓庢秷鎭彂閫佸湪鍚屼竴channel涓娿€?灏濊瘯浣跨敤鍏朵粬channel杩涜纭灏嗗鑷撮€氶亾绾у崗璁紓甯搞€?/p>

蹇樿娑堟伅纭

蹇樿璁剧疆娑堟伅纭鏄竴涓父瑙佺殑閿欒銆?杩欐槸涓€涓緢瀹规槗鐘殑閿欒锛屼絾鏄悗鏋滃緢涓ラ噸銆?褰撴偍鐨勫鎴风閫€鍑烘椂锛屾秷鎭皢琚噸鏂板彂閫侊紙鍙兘鐪嬭捣鏉ュ儚鏄殢鏈洪噸鏂板彂閫侊級锛屼絾鏄疪abbitMQ灏嗗崰鐢ㄨ秺鏉ヨ秺澶氱殑鍐呭瓨锛屽洜涓哄畠灏嗘棤娉曢噴鏀句换浣曟湭纭鐨勬秷鎭€?/p>

涓轰簡璋冭瘯杩欑閿欒锛屾偍鍙互浣跨敤rabbitmqctl鎵撳嵃messages_unacknowledged瀛楁锛?/p>

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue")

杩欑鎸佷箙鐨勯€夐」鏇存敼闇€瑕佸悓鏃跺簲鐢ㄤ簬鐢熶骇鑰呬唬鐮佸拰娑堣垂鑰呬唬鐮併€?/p>

鍦ㄨ繖涓€鐐逛笂锛屾垜浠‘淇濆嵆浣縍abbitMQ閲嶆柊鍚姩锛宼ask_queue闃熷垪涔熶笉浼氫涪澶便€?鐜板湪锛屾垜浠渶瑕佷娇鐢╝mqp.Persistent閫夐」amqp.Publishing鏉ュ皢娑堟伅鏍囪涓烘寔涔呮秷鎭€?/p>

err = ch.Publish(
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
})

鍏充簬娑堟伅鎸佷箙鎬х殑璇存槑

灏嗘秷鎭爣璁颁负鎸佷箙鎬у苟涓嶈兘瀹屽叏淇濊瘉涓嶄細涓㈠け娑堟伅銆?灏界瀹冨憡璇塕abbitMQ灏嗘秷鎭繚瀛樺埌纾佺洏锛屼絾鏄疪abbitMQ鎺ュ彈娑堟伅骞朵笖灏氭湭淇濆瓨娑堟伅鏃讹紝杩樻湁寰堢煭鐨勬椂闂淬€?鑰屼笖锛孯abbitMQ涓嶄細瀵规瘡鏉℃秷鎭兘鎵цfsync锛?锛夛紞瀹冨彲鑳藉彧鏄繚瀛樺埌缂撳瓨涓紝鑰屾病鏈夌湡姝e啓鍏ョ鐩樸€?鎸佷箙鎬т繚璇佸苟涓嶅己锛屼絾鏄浜庢垜浠殑绠€鍗曚换鍔¢槦鍒楄€岃█锛岃繖宸茬粡缁扮话鏈変綑浜嗐€?濡傛灉鎮ㄩ渶瑕佹洿寮烘湁鍔涚殑淇濊瘉锛屽垯鍙互浣跨敤publisher confirms銆?/p>

鍏钩鍒嗗彂

鎮ㄥ彲鑳藉凡缁忔敞鎰忓埌锛岃皟搴︿粛鐒舵棤娉曞畬鍏ㄦ寜鐓ф垜浠殑瑕佹眰杩涜銆?渚嬪锛屽湪鏈変袱鍚峸orker鐨勬儏鍐典笅锛屽綋鎵€鏈夊鏁扮殑娑堟伅閮藉緢閲嶏紝鍋舵暟娑堟伅寰堣交鏃讹紝涓€涓獁orker灏嗕竴鐩村繖纰岃€屽彟涓€浣峸orker灏嗗嚑涔庝笉鍋氫换浣曞伐浣溿€?浣嗘槸锛孯abbitMQ瀵规涓€鏃犳墍鐭ワ紝骞朵笖浠嶅皢骞冲潎鍒嗛厤娑堟伅銆?/p>

鍙戠敓杩欑鎯呭喌鏄洜涓篟abbitMQ鍦ㄦ秷鎭繘鍏ラ槦鍒楁椂鎵嶈皟搴︽秷鎭€?瀹冧笉浼氭煡鐪嬩娇鐢ㄨ€呯殑鏈‘璁ゆ秷鎭暟銆?瀹冨彧鏄洸鐩湴灏嗘瘡绗琻鏉℃秷鎭彂閫佺粰绗琻涓娇鐢ㄨ€呫€?/p>

鎶€鏈浘鐗? src=err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS")

鍏充簬闃熷垪澶у皬鐨勬敞鎰忎簨椤?/p>

濡傛灉鎵€鏈夊伐浣滀汉鍛橀兘蹇欙紝鎮ㄧ殑闃熷垪灏辨弧浜嗐€?鎮ㄥ皢闇€瑕佺暀鎰忚繖涓€鐐癸紝涔熻浼氬鍔犳洿澶氱殑宸ヤ綔浜哄憳锛屾垨鑰呮湁鍏朵粬涓€浜涚瓥鐣ャ€?/p>

new_task.go

package main

import (
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "task_queue", // name
                true,         // durable
                false,        // delete when unused
                false,        // exclusive
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "",           // exchange
                q.Name,       // routing key
                false,        // mandatory
                false,
                amqp.Publishing{
                        DeliveryMode: amqp.Persistent,
                        ContentType:  "text/plain",
                        Body:         []byte(body),
                })
        failOnError(err, "Failed to publish a message")
        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

worker.go


package main

import (
        "bytes"
        "github.com/streadway/amqp"
        "log"
        "time"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "task_queue", // name
                true,         // durable
                false,        // delete when unused
                false,        // exclusive
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
        )
        failOnError(err, "Failed to set QoS")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf("Received a message: %s", d.Body)
                        dot_count := bytes.Count(d.Body, []byte("."))
                        t := time.Duration(dot_count)
                        time.Sleep(t * time.Second)
                        log.Printf("Done")
                        d.Ack(false)
                }
        }()

        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
}

以上是关于RabbitMQ瀹樻柟鏁欑▼浜?Work Queues(GOLANG璇█瀹炵幇)的主要内容,如果未能解决你的问题,请参考以下文章

2020闃块噷宸村反瀹樻柟鏈€鏂癛edis寮€鍙戣鑼冿紒

[瑙﹀姩绮剧伒]瑙﹀姩绮剧伒瀹樻柟鎵嬪唽娴佹按璐?

laravel-admin 闆嗘垚wangEditor,鎸夌収瀹樻柟鏂囨。鍘绘悶锛屼絾鏄笉鏄剧ず鍟婏紝鏄剧ず Error Field type [editor] does not exist.

GitHub瓒?000鏄燂紒濡備綍鐢?0澶╁悆鎺塗ensorFlow2?

Newtonsoft.Json 姒傝堪

Java鍙嶅皠鏈哄埗