confluent-kafka-go源码分析

Posted golang算法架构leetcode技术php

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了confluent-kafka-go源码分析相关的知识,希望对你有一定的参考价值。

confluent-kafka-go是已知的kafka 客户端中最快的,为什么呢?因为它非常轻量,通过cgo 对librdkafka做了一个封装,所以本质上运行的是一个c客户端。

一、安装和配置

安装

go get -u github.com/confluentinc/confluent-kafka-gogit clone https://github.com/edenhill/librdkafka.gitcd librdkafka./configuremakesudo make install

安装kafka

brew install kafkabrew services start kafkazookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
cd /usr/local/Cellar/kafka/2.6.0_1/./bin/kafka-topics --create  --zookeeper localhost:2181 --partitions 1 --replication-factor 1  --topic test
./bin/kafka-topics --list --zookeeper localhost:2181test
./bin/kafka-console-producer --broker-list localhost:9092 --topic test>This is a message
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginningThis is a message

至此,kafka已经安装完毕。但是如果我们直接用go来连接会报错

1617546888.931|FAIL|rdkafka#producer-1| [thrd:bogon:9092/0]: bogon:9092/0: Failed to resolve 'bogon:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT)

如果外界需要访问kafka需要配置advertised.listeners,"PLAINTEXT"表示协议,可选的值有PLAINTEXT和SSL

vi /usr/local/etc/kafka/server.properties#advertised.listeners=PLAINTEXT://your.host.name:9092advertised.listeners=PLAINTEXT://localhost:9092

重启kfaka

% brew services restart kafkaStopping `kafka`... (might take a while)==> Successfully stopped `kafka` (label: homebrew.mxcl.kafka)==> Successfully started `kafka` (label: homebrew.mxcl.kafka)


已经配置完毕,接着,我们通过go代码来进行生产和消费

package main
import ( "fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) if err != nil { panic(err) }
defer p.Close()
// Delivery report handler for produced messages go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }()
// Produce messages to topic (asynchronously) topic := "test" for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) }
// Wait for message deliveries before shutting down p.Flush(15 * 1000)}
package main
import ( "fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "myGroup", "auto.offset.reset": "earliest", })
if err != nil { panic(err) }
c.SubscribeTopics([]string{"test", "^aRegex.*[Tt]opic"}, nil)
for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { // The client will automatically try to recover from all errors. fmt.Printf("Consumer error: %v (%v)\n", err, msg) } }
c.Close()}

运行一下,发现已经成功了

go run exp1/producer/main.goDelivered message to test[0]@4Delivered message to test[0]@5
go run exp1/consumer/main.goConsumer error: Subscribed topic not available: ^aRegex.*[Tt]opic: Broker: Unknown topic or partition (<nil>)Message on test[0]@4: WelcomeMessage on test[0]@5: to

二、源码分析

生产者主要调用了三个接口

1, kafka.NewProducer

2, for e := range p.Events() 在协程中监听生产者事件

3, p.Produce 生产消息

消费者也主要调用了三个接口

1, kafka.NewConsumer

2,c.SubscribeTopics


3, msg, err := c.ReadMessage(-1) 消费消息

下面我们看下confluent-kafka-go 源码的结构

cd ~/go/pkg/mod/gopkg.in/confluentinc/confluent-kafka-go.v1@v1.6.1lsCHANGELOG.md LICENSE README.md examples kafka kafkatest mk

核心源码在kafka这个目录下

% cd kafka && tree.|____kafka_test.go|____log.go|____adminoptions.go|____header.go|____config.go|____metadata.go|____error.go|____stats_event_test.go|____event.go|____misc.go|____consumer_performance_test.go|____time.go|____txn_integration_test.go|____build_glibc_linux.go|____producer.go|____error_gen.go|____header_test.go|____offset.go|____message.go|____build_musl_linux.go|____build_darwin.go|____go_rdkafka_generr| |____go_rdkafka_generr.go|____librdkafka_vendor| |____rdkafka.h| |____librdkafka_musl_linux.a| |____README.md| |____librdkafka_darwin.a| |____.gitignore| |____import.sh| |____librdkafka_glibc_linux.a| |____bundle-import.sh| |____LICENSES.txt| |____librdkafka.go|____message_test.go|____adminapi_test.go|____event_test.go|____handle.go|____glue_rdkafka.h|____producer_performance_test.go|____README.md|____testconf-example.json|____select_rdkafka.h|____kafka.go|____.gitignore|____context.go|____consumer.go|____producer_test.go|____testhelpers.go|____consumer_test.go|____adminapi.go|____generated_errors.go|____00version.go|____metadata_test.go|____error_test.go|____api.html|____config_test.go|____integration_test.go|____testhelpers_test.go|____build_dynamic.go

librdkafka_vendor目录下面是针对不同平台,编译的c语言的包,默认是静态链接的加载方式,如果是一个位置的平台,有两种解决方法:

1,编译一个静态链接库,放在librdkafka_vendor 下面,修改bundle-import.sh 文件,编译

2,编译librdkafka成功后,在编译调用代码的时候,指定为动态加载 -tag dynamic


这个目录下最核心的主要有两个文件consumer.go、producer.go


首先看下producer.go

1,func NewProducer首先初始化了一个 Producer的结构体

type Producer struct { events chan Event produceChannel chan *Message handle handle
// Terminates the poller() goroutine pollerTermChan chan bool}

然后解析一系列依赖的配置,接着通过cgo调用一系列c函数来实现producer的初始化。

2,注册生产者关心的一系列事件

  C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH)

3,创建生产者实例

p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)

4,获取生产队列的主replication

p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk)

5,起协程监听生产者事件

 go func() { poller(p, p.pollerTermChan) p.handle.waitGroup.Done() }()
_, term := p.handle.eventPoll(p.events, 100, 1000, termChan)

在event.go文件里定义了事件的分发处理函数

func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) {

里面也是对cgo的事件进行了封装

rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)

处理了kafka的一系列事件fetch、reblance、error等等。

6,起协程生产消息

 go func() { producer(p) p.handle.waitGroup.Done() }()
func channelProducer(p *Producer) { for m := range p.produceChannel { err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)

里面是cgo函数

C.do_produce(p.handle.rk, crkt, C.int32_t(msg.TopicPartition.Partition), C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY, valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen), keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen), C.int64_t(timestamp), (*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt), (C.uintptr_t)(cgoid))

上面两个协程都只是注册了事件,在协程里等待chanel的信号,channel里信号到达才开始处理。

7,Events()仅仅是返回了事件的channel

func (p *Producer) Events() chan Event { return p.events}

8,produce函数和初始化的时候注册的函数底层调用的是同一个,不同的是,初始化的时候需要等待事件的到来。

func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error {

接着看看consumer.go

1,func NewConsumer里面初始化了一个结构体

type Consumer struct { events chan Event handle handle eventsChanEnable bool readerTermChan chan bool rebalanceCb RebalanceCb appReassigned bool appRebalanceEnable bool // Config setting}

2,解析了一系列依赖的配置

3,注册需要监听的事件

C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH)

4,初始化consumer实例

c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)

5,将consumer注册到事件监听handler里面

C.rd_kafka_poll_set_consumer(c.handle.rk)

6,获取队列的主分区

c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk)

7,起协程消费消息

 go func() { consumerReader(c, c.readerTermChan) c.handle.waitGroup.Done() }()

里面等待事件的到来,确定是否终止,和producer一样

 _, term := c.handle.eventPoll(c.events, 100, 1000, termChan)

8,订阅topic

func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) 
ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics)))C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA)e := C.rd_kafka_subscribe(c.handle.rk, ctopics)

9,通过poll获取事件消息

 ev := c.Poll(timeoutMs)
switch e := ev.(type) { case *Message:
ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil

最终也是调用event.go 的eventPoll来获取消息

rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)

confluent-kafka-go源码分析