[pulsar] pulsar go sdk 测试
Posted adream307
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[pulsar] pulsar go sdk 测试相关的知识,希望对你有一定的参考价值。
docker 启动 pulsar
docker run --rm -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:2.6.0 bin/pulsar standalone
pulsar go sdk
package main
import (
"context"
"encoding/binary"
"github.com/apache/pulsar-client-go/pulsar"
"log"
"sync"
)
func main()
cli, err := pulsar.NewClient(pulsar.ClientOptionsURL: "pulsar://localhost:6650")
if err != nil
log.Fatal(err)
defer cli.Close()
var wg sync.WaitGroup
for i := 0; i < 10; i++
wg.Add(1)
i := uint64(i)
go func()
send, err := cli.CreateProducer(pulsar.ProducerOptionsTopic: "hello")
if err != nil
log.Fatal(err)
defer send.Close()
bi := make([]byte, 8)
binary.LittleEndian.PutUint64(bi, i)
_, err = send.Send(context.TODO(), &pulsar.ProducerMessagePayload: bi)
if err != nil
log.Fatal(err)
log.Printf("send %d into pulsar\\n", i)
()
go func()
recv, err := cli.Subscribe(pulsar.ConsumerOptions
Topic: "hello",
SubscriptionName: "hello-g",
Type: pulsar.KeyShared,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
)
if err != nil
log.Fatal(err)
for i := 0; i < 10; i++
cm, ok := <-recv.Chan()
if !ok
log.Fatalf("pulsar channel closed")
recv.AckID(cm.ID())
val := binary.LittleEndian.Uint64(cm.Payload())
log.Printf("recv %d from pulsar\\n", val)
wg.Done()
()
wg.Wait()
log.Printf("finish")
以上是关于[pulsar] pulsar go sdk 测试的主要内容,如果未能解决你的问题,请参考以下文章
03_Apache Pulsar的Local与分布式集群构建Pulsar的分布式集群模式Pulsar的分布式集群模式构建启动测试
Eclipse Pulsar + Java Micro Edition SDK 3.0 中的 LWUIT 1.4 预验证错误