Golang之发送消息至kafka
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang之发送消息至kafka相关的知识,希望对你有一定的参考价值。
windows下安装zookeeper
1、安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的)
2、安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/
3、重命名conf/zoo_sample.cfg 为conf/zoo.cfg
4、编辑 conf/zoo.cfg,修改dataDir=D:\\zookeeper-3.3.6\\data\\
4、运行bin/zkServer.cmd
启动结果如下:
安装kafka
1、打开链接:http://kafka.apache.org/downloads.html下载kafka2.1.2
2、打开config目录下的server.properties, 修改log.dirs为D:\\kafka_logs,
3、修改advertised.host.name=服务器ip
4、启动kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties
kafka链接zookeeper
kafka也提供了一个命令行消费者,接受消息并打印到标准输出。
bin/kafka-console-consumer.bat --zookeeper 127.0.0.1:2181 --topic nginx_log
golang写入kafka
package main
import (
"fmt"
"github.com/Shopify/sarama"
"time"
)
//消息写入kafka
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//生产者
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer close,err:", err)
return
}
defer client.Close()
var n int=0
for n<20{
n++
//创建消息
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test,hello chaoge!!")
//发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\\n,", pid, offset)
time.Sleep(10 * time.Millisecond)
}
}
goland运行结果:
kafka收到的数据:
以上是关于Golang之发送消息至kafka的主要内容,如果未能解决你的问题,请参考以下文章