Golang之发送消息至kafka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang之发送消息至kafka相关的知识,希望对你有一定的参考价值。

windows下安装zookeeper

1、安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的) 
2、安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/ 
3、重命名conf/zoo_sample.cfg 为conf/zoo.cfg 
4、编辑 conf/zoo.cfg,修改dataDir=D:\\zookeeper-3.3.6\\data\\ 
4、运行bin/zkServer.cmd

启动结果如下:

安装kafka

1、打开链接:http://kafka.apache.org/downloads.html下载kafka2.1.2 

2、打开config目录下的server.properties, 修改log.dirs为D:\\kafka_logs,

3、修改advertised.host.name=服务器ip

4、启动kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties

kafka链接zookeeper

kafka也提供了一个命令行消费者,接受消息并打印到标准输出。

bin/kafka-console-consumer.bat --zookeeper 127.0.0.1:2181 --topic nginx_log

golang写入kafka


package main

import (
"fmt"

"github.com/Shopify/sarama"
"time"
)

//消息写入kafka
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//生产者
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer close,err:", err)
return
}

defer client.Close()
var n int=0

for n<20{
n++
//创建消息
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test,hello chaoge!!")
//发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\\n,", pid, offset)
time.Sleep(10 * time.Millisecond)

}

}
 

goland运行结果:

kafka收到的数据:

 

以上是关于Golang之发送消息至kafka的主要内容,如果未能解决你的问题,请参考以下文章

log4j2发送消息至Kafka

Kafka 源码解读之 代码没报错但是消息却发送失败!

docker 配置 kafka+zookeeper,golang接入示例

zabbix告警信息推送至kafka

zabbix告警信息推送至kafka

Strimzi Kafka Bridge(桥接)实战之三:自制sdk(golang版本)