go对接kafka
Posted leo_jk
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go对接kafka相关的知识,希望对你有一定的参考价值。
1.生产者,生产消息
使用 github.com/Shopify/sarama包 对接kafka
package main
import (
"fmt"
"github.com/Shopify/sarama"
"strconv"
"time"
)
func main()
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll //follow同步数据后返回
config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分配分区 partition
config.Producer.Return.Successes = true
// 创建生产者(同步发送),连接kafka
client, err := sarama.NewSyncProducer([]string"192.168.0.113:9092", config)
if err != nil
fmt.Println("producer closed, err:", err)
return
defer client.Close()
for i := 0; i < 5; i++
// 构造一个消息
msg := &sarama.ProducerMessage
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log" + strconv.Itoa(i))
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil
fmt.Println("send msg failed, err:", err)
return
fmt.Printf("pid:%v offset:%v\\n", pid, offset) //分区id,偏移id
time.Sleep(2 * time.Second)
2.消费者消费消息
package main
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
var wg sync.WaitGroup
func main()
//创建消费者
consumer, err := sarama.NewConsumer([]string"192.168.0.113:9092", nil)
if err != nil
fmt.Printf("fail to start consumer, err:%v\\n", err)
return
//获取主题分区
partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
if err != nil
fmt.Printf("fail to get list of partition:err%v\\n", err)
return
fmt.Println(partitionList)
//遍历分区
for partition := range partitionList
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
//pc, err := consumer.ConsumePartition("web_log", int32(partition), 90)
if err != nil
fmt.Printf("failed to start consumer for partition %d,err:%v\\n", partition, err)
return
defer pc.AsyncClose()
// 异步从每个分区消费信息
wg.Add(1) //+1
go func(sarama.PartitionConsumer)
defer wg.Done() //-1
for msg := range pc.Messages()
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%s\\n", msg.Partition, msg.Offset, msg.Key, msg.Value)
(pc)
wg.Wait()
以上是关于go对接kafka的主要内容,如果未能解决你的问题,请参考以下文章