[pulsar] pulsar go sdk 测试

Posted adream307

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[pulsar] pulsar go sdk 测试相关的知识,希望对你有一定的参考价值。

docker 启动 pulsar

docker run --rm -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:2.6.0 bin/pulsar standalone

pulsar go sdk

package main

import (
	"context"
	"encoding/binary"
	"github.com/apache/pulsar-client-go/pulsar"
	"log"
	"sync"
)

func main() 
	cli, err := pulsar.NewClient(pulsar.ClientOptionsURL: "pulsar://localhost:6650")
	if err != nil 
		log.Fatal(err)
	
	defer cli.Close()

	var wg sync.WaitGroup

	for i := 0; i < 10; i++ 
		wg.Add(1)
		i := uint64(i)
		go func() 
			send, err := cli.CreateProducer(pulsar.ProducerOptionsTopic: "hello")
			if err != nil 
				log.Fatal(err)
			
			defer send.Close()
			bi := make([]byte, 8)

			binary.LittleEndian.PutUint64(bi, i)
			_, err = send.Send(context.TODO(), &pulsar.ProducerMessagePayload: bi)
			if err != nil 
				log.Fatal(err)
			
			log.Printf("send %d into pulsar\\n", i)
		()
	

	go func() 
		recv, err := cli.Subscribe(pulsar.ConsumerOptions
			Topic:                       "hello",
			SubscriptionName:            "hello-g",
			Type:                        pulsar.KeyShared,
			SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
		)
		if err != nil 
			log.Fatal(err)
		
		for i := 0; i < 10; i++ 
			cm, ok := <-recv.Chan()
			if !ok 
				log.Fatalf("pulsar channel closed")
			
			recv.AckID(cm.ID())
			val := binary.LittleEndian.Uint64(cm.Payload())
			log.Printf("recv %d from pulsar\\n", val)
			wg.Done()
		
	()

	wg.Wait()
	log.Printf("finish")

以上是关于[pulsar] pulsar go sdk 测试的主要内容,如果未能解决你的问题,请参考以下文章

[Pulsar系列] 10分钟学会Pulsar消息系统概念

03_Apache Pulsar的Local与分布式集群构建Pulsar的分布式集群模式Pulsar的分布式集群模式构建启动测试

Eclipse Pulsar + Java Micro Edition SDK 3.0 中的 LWUIT 1.4 预验证错误

Pulsar 中的意外积压大小

pulsar的Kop压力测试

3台linux搭建Pulsar集群环境用于测试