RabbitMQ瀹樻柟鏁欑▼浜?Work Queues(GOLANG璇█瀹炵幇)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ瀹樻柟鏁欑▼浜?Work Queues(GOLANG璇█瀹炵幇)相关的知识,希望对你有一定的参考价值。
鏍囩锛?a href='http://www.mamicode.com/so/1/%e5%91%bd%e4%bb%a4' title='鍛戒护'>鍛戒护 铏氭嫙 text 鍖呭惈 璁剧疆 argument 鍐呭瓨 routing
RabbitMQ瀹樻柟鏁欑▼浜?Work Queues(GOLANG璇█瀹炵幇)
鍑嗗
鍦ㄦ湰鏁欑▼鐨勫墠涓€閮ㄥ垎锛屾垜浠彂閫佷簡涓€鏉″寘鍚€?Hello World锛佲€濈殑娑堟伅銆?鐜板湪锛屾垜浠皢鍙戦€佷唬琛ㄥ鏉備换鍔$殑瀛楃涓层€?鎴戜滑娌℃湁鐪熷疄鐨勪换鍔★紝渚嬪瑕佽皟鏁村ぇ灏忕殑鍥惧儚鎴栬娓叉煋鐨刾df鏂囦欢锛屽洜姝ゆ垜浠亣瑁呰€楁椂浠诲姟-浣跨敤time.Sleep鍑芥暟鏉ヤ吉閫犲畠銆?鎴戜滑灏嗗瓧绗︿覆涓殑鐐规暟浣滀负瀹冪殑澶嶆潅搴︺€?姣忎釜鐐瑰皢鍗犫€滃伐浣溾€濈殑涓€绉掋€?渚嬪锛孒ello ...鎻忚堪鐨勮櫄鎷熶换鍔″皢鑺辫垂涓夌閽熴€?/p>
鎴戜滑灏嗙◢寰慨鏀逛笂涓€涓ず渚嬩腑鐨剆end.go浠g爜锛屼互鍏佽浠庡懡浠よ鍙戦€佷换鎰忔秷鎭€?璇ョ▼搴忎細灏嗕换鍔″畨鎺掑埌鎴戜滑鐨勫伐浣滈槦鍒椾腑锛屽洜姝ゆ垜浠皢鍏跺懡鍚嶄负new_task.go锛?/p>
#new_task.go
package main
import (
"github.com/streadway/amqp"
"log"
"os"
"strings"
)
func main(){
// 杩炴帴RabbitMQ鏈嶅姟鍣? conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 鍒涘缓涓€涓猚hannel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // 闃熷垪鍚嶇О
false, // 鏄惁鎸佷箙鍖? false, // 鏄惁鑷姩鍒犻櫎
false, // 鏄惁鐙珛
false,nil,
)
failOnError(err, "Failed to declare a queue")
body := bodyForm(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
DeliveryMode:amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyForm(args []string) string{
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
// 甯姪鍑芥暟妫€娴嬫瘡涓€涓猘mqp璋冪敤
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
#worker.go
package main
import (
"bytes"
"github.com/streadway/amqp"
"log"
"time"
)
func main(){
// 杩炴帴RabbitMQ鏈嶅姟鍣? conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 鍒涘缓涓€涓猚hannel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 鐩戝惉闃熷垪
q, err := ch.QueueDeclare(
"hello", // 闃熷垪鍚嶇О
false, // 鏄惁鎸佷箙鍖? false, // 鏄惁鑷姩鍒犻櫎
false, // 鏄惁鐙珛
false,nil,
)
failOnError(err, "Failed to declare a queue")
// 娑堣垂闃熷垪
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 缁熻string涓殑`.`鏉ヨ〃绀烘墽琛屾椂闂? dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
// 甯姪鍑芥暟妫€娴嬫瘡涓€涓猘mqp璋冪敤
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
寮€鍚袱涓獁orker鍑嗗鎺ュ彈闃熷垪涓暟鎹?/p>
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
鎵цnew_task.go杩涜鍙戦€佹暟鎹?/p>
# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....
寰楀埌缁撴灉濡備笅
娑堟伅纭
鎵ц浠诲姟鍙兘闇€瑕佸嚑绉掗挓銆?鎮ㄥ彲鑳芥兂鐭ラ亾锛屽鏋滃叾涓竴涓秷璐硅€呭紑濮嬩竴椤规极闀跨殑浠诲姟鑰屼粎閮ㄥ垎瀹屾垚鑰屾鎺夛紝浼氬彂鐢熶粈涔堟儏鍐点€?浣跨敤鎴戜滑褰撳墠鐨勪唬鐮侊紝RabbitMQ涓€鏃﹀悜娑堣垂鑰呭彂閫佷簡涓€鏉℃秷鎭紝渚跨珛鍗冲皢鍏舵爣璁颁负鍒犻櫎銆?鍦ㄨ繖绉嶆儏鍐典笅锛屽鏋滄偍鏉€姝讳竴涓獁orker锛屾垜浠皢涓㈠け姝e湪澶勭悊鐨勬秷鎭€?鎴戜滑杩樺皢涓㈠け鎵€鏈夊彂閫佺粰璇ョ壒瀹氬伐浣滀汉鍛樹絾灏氭湭澶勭悊鐨勬秷鎭€?/p>
浣嗘槸鎴戜滑涓嶆兂涓㈠け浠讳綍浠诲姟銆?濡傛灉涓€涓獁orker姝讳骸锛屾垜浠笇鏈涘皢浠诲姟浜や粯缁欏彟涓€涓獁orker銆?/p>
涓轰簡纭繚娑堟伅姘镐笉涓㈠け锛孯abbitMQ鏀寔娑堟伅纭銆?娑堣垂鑰呭彂閫佸洖涓€涓秷鎭‘璁わ紝鍛婄煡RabbitMQ鐗瑰畾鐨勬秷鎭凡琚帴鏀讹紝澶勭悊锛屽苟涓擱abbitMQ鍙互鑷敱鍒犻櫎瀹冦€?/p>
濡傛灉浣跨敤鑰呭畷鏈猴紙鍏堕€氶亾宸插叧闂紝杩炴帴宸插叧闂垨TCP杩炴帴涓㈠け锛夎€屾病鏈夊彂閫佺‘璁わ紝RabbitMQ灏嗕簡瑙f秷鎭湭瀹屽叏澶勭悊锛屽苟灏嗛噸鏂版帓闃熴€?濡傛灉鍚屾椂鏈夊叾浠栨秷璐硅€呭湪绾匡紝瀹冨皢寰堝揩灏嗗叾閲嶆柊鍒嗗彂缁欏彟涓€涓秷璐硅€呫€?杩欐牱锛屾偍鍙互纭繚鍗充娇worker鍋跺皵瀹曟満涔熶笉浼氫涪澶变换浣曟秷鎭€?/p>
RabbitMQ娌℃湁浠讳綍娑堟伅瓒呮椂璁剧疆锛?娑堣垂鑰呮浜℃椂锛孯abbitMQ灏嗛噸鏂颁紶閫掓秷鎭€?鍗充娇澶勭悊涓€鏉℃秷鎭姳璐归潪甯搁潪甯搁暱鐨勬椂闂翠篃娌″叧绯汇€?/p>
鍦ㄦ湰鏁欑▼涓紝鎴戜滑灏嗛€氳繃涓衡€?auto-ack鈥濆弬鏁颁紶閫掍竴涓猣alse鏉ヤ娇鐢ㄦ墜鍔ㄦ秷鎭‘璁わ紝鐒跺悗涓€鏃﹀湪浠诲姟瀹屾垚鍚庝娇鐢╠.Ack锛坒alse锛変粠worker鍙戦€侀€傚綋鐨勭‘璁ゃ€?/p>
#worker.go淇敼閮ㄩ棬浠g爜
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // 鏄惁鑷姩娑堟伅纭
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
浣跨敤姝や唬鐮侊紝鎴戜滑鍙互纭繚锛屽嵆浣挎偍鍦ㄥ鐞嗘秷鎭椂浣跨敤CTRL + C鏉€姝粀orker锛屼篃涓嶄細涓㈠け浠讳綍淇℃伅銆?worker鍚庝笉涔咃紝鎵€鏈夋湭纭鐨勬秷鎭皢閲嶆柊鍙戦€併€?/p>
娑堟伅纭蹇呴』涓庢秷鎭彂閫佸湪鍚屼竴channel涓娿€?灏濊瘯浣跨敤鍏朵粬channel杩涜纭灏嗗鑷撮€氶亾绾у崗璁紓甯搞€?/p>
蹇樿娑堟伅纭
蹇樿璁剧疆娑堟伅纭鏄竴涓父瑙佺殑閿欒銆?杩欐槸涓€涓緢瀹规槗鐘殑閿欒锛屼絾鏄悗鏋滃緢涓ラ噸銆?褰撴偍鐨勫鎴风閫€鍑烘椂锛屾秷鎭皢琚噸鏂板彂閫侊紙鍙兘鐪嬭捣鏉ュ儚鏄殢鏈洪噸鏂板彂閫侊級锛屼絾鏄疪abbitMQ灏嗗崰鐢ㄨ秺鏉ヨ秺澶氱殑鍐呭瓨锛屽洜涓哄畠灏嗘棤娉曢噴鏀句换浣曟湭纭鐨勬秷鎭€?/p>
涓轰簡璋冭瘯杩欑閿欒锛屾偍鍙互浣跨敤rabbitmqctl鎵撳嵃messages_unacknowledged瀛楁锛?/p>
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
杩欑鎸佷箙鐨勯€夐」鏇存敼闇€瑕佸悓鏃跺簲鐢ㄤ簬鐢熶骇鑰呬唬鐮佸拰娑堣垂鑰呬唬鐮併€?/p>
鍦ㄨ繖涓€鐐逛笂锛屾垜浠‘淇濆嵆浣縍abbitMQ閲嶆柊鍚姩锛宼ask_queue闃熷垪涔熶笉浼氫涪澶便€?鐜板湪锛屾垜浠渶瑕佷娇鐢╝mqp.Persistent閫夐」amqp.Publishing鏉ュ皢娑堟伅鏍囪涓烘寔涔呮秷鎭€?/p>
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
鍏充簬娑堟伅鎸佷箙鎬х殑璇存槑
灏嗘秷鎭爣璁颁负鎸佷箙鎬у苟涓嶈兘瀹屽叏淇濊瘉涓嶄細涓㈠け娑堟伅銆?灏界瀹冨憡璇塕abbitMQ灏嗘秷鎭繚瀛樺埌纾佺洏锛屼絾鏄疪abbitMQ鎺ュ彈娑堟伅骞朵笖灏氭湭淇濆瓨娑堟伅鏃讹紝杩樻湁寰堢煭鐨勬椂闂淬€?鑰屼笖锛孯abbitMQ涓嶄細瀵规瘡鏉℃秷鎭兘鎵цfsync锛?锛夛紞瀹冨彲鑳藉彧鏄繚瀛樺埌缂撳瓨涓紝鑰屾病鏈夌湡姝e啓鍏ョ鐩樸€?鎸佷箙鎬т繚璇佸苟涓嶅己锛屼絾鏄浜庢垜浠殑绠€鍗曚换鍔¢槦鍒楄€岃█锛岃繖宸茬粡缁扮话鏈変綑浜嗐€?濡傛灉鎮ㄩ渶瑕佹洿寮烘湁鍔涚殑淇濊瘉锛屽垯鍙互浣跨敤publisher confirms銆?/p>
鍏钩鍒嗗彂
鎮ㄥ彲鑳藉凡缁忔敞鎰忓埌锛岃皟搴︿粛鐒舵棤娉曞畬鍏ㄦ寜鐓ф垜浠殑瑕佹眰杩涜銆?渚嬪锛屽湪鏈変袱鍚峸orker鐨勬儏鍐典笅锛屽綋鎵€鏈夊鏁扮殑娑堟伅閮藉緢閲嶏紝鍋舵暟娑堟伅寰堣交鏃讹紝涓€涓獁orker灏嗕竴鐩村繖纰岃€屽彟涓€浣峸orker灏嗗嚑涔庝笉鍋氫换浣曞伐浣溿€?浣嗘槸锛孯abbitMQ瀵规涓€鏃犳墍鐭ワ紝骞朵笖浠嶅皢骞冲潎鍒嗛厤娑堟伅銆?/p>
鍙戠敓杩欑鎯呭喌鏄洜涓篟abbitMQ鍦ㄦ秷鎭繘鍏ラ槦鍒楁椂鎵嶈皟搴︽秷鎭€?瀹冧笉浼氭煡鐪嬩娇鐢ㄨ€呯殑鏈‘璁ゆ秷鎭暟銆?瀹冨彧鏄洸鐩湴灏嗘瘡绗琻鏉℃秷鎭彂閫佺粰绗琻涓娇鐢ㄨ€呫€?/p>
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
鍏充簬闃熷垪澶у皬鐨勬敞鎰忎簨椤?/p>
濡傛灉鎵€鏈夊伐浣滀汉鍛橀兘蹇欙紝鎮ㄧ殑闃熷垪灏辨弧浜嗐€?鎮ㄥ皢闇€瑕佺暀鎰忚繖涓€鐐癸紝涔熻浼氬鍔犳洿澶氱殑宸ヤ綔浜哄憳锛屾垨鑰呮湁鍏朵粬涓€浜涚瓥鐣ャ€?/p>
new_task.go
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
worker.go
package main
import (
"bytes"
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
package main
import (
"bytes"
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
以上是关于RabbitMQ瀹樻柟鏁欑▼浜?Work Queues(GOLANG璇█瀹炵幇)的主要内容,如果未能解决你的问题,请参考以下文章
laravel-admin 闆嗘垚wangEditor,鎸夌収瀹樻柟鏂囨。鍘绘悶锛屼絾鏄笉鏄剧ず鍟婏紝鏄剧ず Error Field type [editor] does not exist.