海量日志收集项目总结logagent开发
Posted wind-zhou
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了海量日志收集项目总结logagent开发相关的知识,希望对你有一定的参考价值。
海量日志收集项目总结(1)logagent开发
此部分源码:
简介
本项目用于系统中日志的采集处理。
对于日志采集业界其实已有成熟的ELK方案。
ELK方案
ELK:
- E:Elasticserach(建立索引)
- L: Logstash(日志采集)
- K: Kibana(可视化展示)
ELK在每台服务器上部署logstash,比较重量级,所以演化成客户端部署filebeat的EFK,由filebeat收集向logstash中写数据,最后落地到elasticsearch,通过kibana界面进行日志检索。
为什么本项目不适用ELK方案?
缺点:
- 运维成本?,每增加?个?志收集项,都需要?动修改配置
- 监控缺失,?法准确获取logstash的状态。?法做到定制化开发与维护
- ?法做到定制化开发与维护
项目架构
各部分作用
- logagent:这里应自己写的logagent代替ELK的Logstash进行日志收集
- kafka:分布式消息队列,具体作用的介绍间我的这篇博客kafka简介
- logTranfer:用于从Kafka中取出数据送入ES(完全需要自己写)
- ES:用于对日志建立索引并存储
- etcd:用于系统配置管理,起到了个数据库的作用
- kibana:用于对ES里的数据进行可视化展示,呈现的是一个web界面
logagent模块设计过程
logagent的主要作用是日志的收集,大概可划分为下面几个步骤:
- 从指定位置读取日志文件
- 将独到的内容发往kafka
- 动态的监视(watch)etcd中的数据,以实现热配置,etcd在这里起到了消息发布订阅的作用。
读文件时需要用到tail的第三方库 tail
github.com/hpcloud/tail
读取配置文件内容
这里我们使用GitHub上的第三方库https://github.com/go-ini
具体使用方法见 ini解析配置文件。
建立conf文件夹,目录结构为:
config.go:
package conf
type AppConf struct {
KafkaConf `ini:"kafka"`
}
type KafkaConf struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
}
config.ini:
[kafka]
address=127.0.0.1:9092
topic=zhouzheng
初始化kafka并发送信息
操作kafka时需要用到第三方库saramagithub.com/Shopify/sarama
注意如果go的版本为1.13,则go mod中sarama 版本号要改为v1.19.0
首先建立一个kafka的库,用于存放和kafka相关函数,包括kafka的初始化,以及向kafka发送信息。
目录结构如下:
kafka.go:
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
func Init(address []string,topic string)error{
//初始化kafka配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder("yuema")
// 连接kafka
//产生一个生产者客户端
client, err := sarama.NewSyncProducer(address, config)
if err != nil {
fmt.Println("初始化失败:, err:", err)
return err
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return err
}
fmt.Printf("pid:%v offset:%v
", pid, offset)
return err
}
此时main.go:
package main
import (
"fmt"
"github.com/wind-zhou/Logagent_demo/conf"
"github.com/wind-zhou/Logagent_demo/kafka"
"gopkg.in/ini.v1"
)
var (
cfg = new(conf.AppConf) //初始化一个全局cfg,用作后面配置文件解析映射
)
func main(){
//0. 解析配置文件的内容
err := ini.MapTo(cfg, "./conf/cfg.ini")
if err != nil {
fmt.Printf("解析文件出错:err=",err)
return
}
fmt.Println("解析文件成功")
fmt.Printf("%v
",cfg)
//1.初始化kafka
err=kafka.Init([]string{cfg.KafkaConf.Address},cfg.KafkaConf.Topic)
if err!=nil{
fmt.Println("初始化失败:, err:", err)
return
}
fmt.Println("kafka init success")
}
调试结果为:
E:goProjectsrcgithub.comwind-zhouLogagent_demo>main.exe
解析文件成功
&{{127.0.0.1:9092 zhouzheng}}
pid:0 offset:18
kafka init success
在终端创建消费者,观察数据:
加入tail模块
tail模块可以才指定位置读取文件信息,并将信息发往kafka,配置文件的位置信息可以写入配置文件中。
就是把某个档案文件的最后几行显示到终端上,假设该档案有更新,tail会自己主动刷新,确保你看到最新的档案内容?,在日志收集中可以实时的监测日志的变化。
测试tail
package tail
import (
"fmt"
"github.com/hpcloud/tail"
"time"
)
//从指定位置读取文件内容
func Init(fileName string)(err error){
//fileName := "./my.log"
config := tail.Config{
ReOpen: true, // 重新打开
Follow: true, // 是否跟随
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件的哪个地方开始读
MustExist: false, // 文件不存在不报错
Poll: true,
}
tails, err := tail.TailFile(fileName, config) //创建一个tail对象
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}
var (
line *tail.Line
ok bool
)
for {
line, ok = <-tails.Lines//遍历chan,读取日志内容
if !ok {
fmt.Printf("tail file close reopen, filename:%s
", tails.Filename)
time.Sleep(time.Second)
continue
}
fmt.Println("line:", line.Text)
}
}
此时main.go中调用Init
调试:
在my.log中写入内容:
终端显示成功:
至此tail测试成功,接下来任务是把独到的内容发到kafka
tail连接kafka
问题关键就是在tail读取完后,将数据发往kafka。
构造一个SendToKafka函数,往kafka里发数据,该函数在tail处死话完成后调用。
kafka.go:
创建func SendToKafka()
tail.go
在初始化后调用SendToKafka()
调试:
此时在my.log中写入数据,应该可以在终端创建的kafka消费者中读取到。
测试成功。
添加etcd模块
上面的日志产生收集和发送都只涉及到了单个topic,如果不同的业务线则产生会不同的topic日志,且存储在不同位置,这个时候就需要对他们进行批量的管理,如要求系统启动时要开启n个任务,从n个业务线分别拉去日志,此刻etcd派上了用场(etcd本质上是个数据库,只是他可以实现许多功能,如消息的发布订阅,配置管理等),
关于etcd的操作需要下载第三方库,这个库目前为止,由于版本兼容性等问题,使用时可能会有不少坑,具体问题见我这两篇博客
关于etcd的及具体操作:
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
// etcd client put/get demo
// use etcd/clientv3
type LogEntry struct {
Path string`jaon:"path"`//日志存放路径
Topic string`json:"topic"`//要发往kafka那个topic
}
var LogEtcdConf =make([]LogEntry,1000)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v
", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
value:=`[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/linux.log","topic":"linux_log"}]
`
_, err = cli.Put(ctx, "xxx", value)
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v
", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "xxx",clientv3.WithPrefix())
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v
", err)
return
}
for _, ev := range resp.Kvs { //遍历切片得到结构体指针
fmt.Printf("%s:%s
", ev.Key, ev.Value)
//对接受的数据反序列化
err=json.Unmarshal(ev.Value,&LogEtcdConf)//返回的ev.Value也是切片类型
if err!=nil{
fmt.Printf("json.Unmarshal failed ,err=%V
",err)
return
}
}
for _,value:=range LogEtcdConf{
fmt.Printf("%v
",value)
}
}
代码说明:
这段代码用golang操作etcd实现的put和get操作,etcd为一个分布式的KEY-VALUE存储结构,put时。key=”xxx“,value=[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/linux.log","topic":"linux_log"}]
这里resp.Kvs是一个切片,只是这个例子中,切片的数据只有一个。
value是不同的主题的日志的配置信息组成的切片,然后将其json序列化,因此是一个字符串。
测试结果:
etcd.go
由此可知,我们可以预先将value信息存入etcd(或通过client终端,或通过golang),然后再程序中使用etcd的GET拉去这些信息,并根据这些信息去读取和发送日志。
上面这些我们可以设置一个初始化函数来完成这项工作。
etcd.go:
? Init()
? GetConf()
注意:
这里的etcd初始化要放在tail初始化之前,因为tail要根据etcd拉取的配置创建对象,这里由于变成的多个topic,因此需要对tail进行改进。
tail模块改进
因为涉及到多个topic日志采集,每个topic都需要建立一个对象,因此要建立一个结构体对其进行管理。
这时logagent的逻辑变为:
- 初始化etcd(就是和etcd集群间连接)
- 从etcd中拉取拉取信息(内容为不同的topic及其存储的路径)
- 初始化kafka(和kafka集群建立连接)
- 遍历再etcd拉取的信息,每个topic建立一个tailObj实例。(用于去不同的文件读取日志)
- 再每次建立tailObj实例时,创建一个goroutine,往kafka里发送数据。
tail模块的目录结构:
tail.go
上面代码中最后要加一个循环等待,否则goroutine会因主线程退出而销毁。
tail_mgr.go
package tail
import (
"fmt"
"github.com/hpcloud/tail"
"github.com/wind-zhou/Logagent_demo/kafka"
"time"
)
//为了管理不同的tail对象,建立一个结构体
type TailTask struct {
Path string //存放日志路径
Topic string //日志主题
instance *tail.Tail //tail对象实例
}
//构造一个函数,对每个任务初始化
func NewTailTask(path string,topic string)(tailTaskObj *TailTask){
tailTaskObj=&TailTask{
Path: path,
Topic: topic,
}
tailTaskObj.instance=tailTaskObj.Init(path)
return
}
//用来初始化任务结构体中的实例
func (t *TailTask)Init(path string)(tailObj *tail.Tail){
//初始化配置
config := tail.Config{
ReOpen: true, // 重新打开
Follow: true, // 是否跟随
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件的哪个地方开始读
MustExist: false, // 文件不存在不报错
Poll: true,
}
tailObj, err := tail.TailFile(path, config) //创建一个tail对象
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}
return
}
//往kafka发送数据
func (t *TailTask)run(topic string){
for {
select {
case line:= <-t.instance.Lines: //从tail中取数据
kafka.SendToKafka(topic,line.Text) //把读取的数据发往kafka
default:
time.Sleep(time.Second)
continue
}
}
}
main.go
package main
import (
"fmt"
"github.com/wind-zhou/Logagent_demo/conf"
"github.com/wind-zhou/Logagent_demo/etcd"
"github.com/wind-zhou/Logagent_demo/kafka"
"github.com/wind-zhou/Logagent_demo/tail"
"gopkg.in/ini.v1"
)
var (
cfg = new(conf.AppConf)
LogEtcdConf =make([]*etcd.LogEntry,1000) //用于接受etcd拉取的配置
)
func main(){
//0. 解析配置文件的内容
err := ini.MapTo(cfg, "./conf/cfg.ini")
if err != nil {
fmt.Printf("解析文件出错:err=",err)
return
}
fmt.Println("解析文件成功")
fmt.Printf("%v
",cfg)
//1.初始化etcd
err=etcd.Init([]string{cfg.EtcdConf.Address},cfg.EtcdConf.Timeout)
if err != nil {
fmt.Printf("初始化etcd失败,err=%v
",err)
return
}
fmt.Println("connect to etcd success")
//拉取信息
LogEtcdConf,err=etcd.GetConf(cfg.EtcdConf.Key)
if err != nil {
fmt.Printf("拉取配置失败:err=%v
",err)
return
}
fmt.Println("拉取配置成功")
//显示拉取的信息
for _,value:=range LogEtcdConf{
fmt.Printf("%v
",value)
}
//2.初始化kafka
//就是连上kafka
err=kafka.Init([]string{cfg.KafkaConf.Address})
if err!=nil{
fmt.Println("初始化失败:, err:", err)
return
}
fmt.Println("kafka init success")
//3. 初始化tail
tail.Init(LogEtcdConf)
fmt.Println("tail init success")
}
调试结果为:
测试结果表明,程序运行后可以建立连接,并读取数据,最后通过自建的kafka消费者终端可以观察到数据。
优化:增加配置热更改功能
要求:
程序运行时时,如果更改etcd中的信息,例如增加一个topic项,后删除一个topic项,系统可以根据变化动态的创建和销毁进程。
这里需要用到etcd的Watch功能,即派一个哨兵去监视信息的变化。
watch测试
先看一下etcd的Watch的输出格式:
wtach的代码:
当更新etcd时:
这是程序返回结果:
有上述实验可知,系统可以检测到etcd的变化,那接下来的问题就是,如何将这种变化告诉tail,并动态的控制goroutine。答案是:通道。
再watch到变化后,将value放入到一个通道中,在tail模块中,创建一个goroutine在后台不断的监听通道里的数据,如果拿到数据,就执行相应操作。
etcd.go
这部分的核心代码是 WatchConf()函数
主要的作用是取出信息,发往通道,这里要提一下的是判断更i新的类型,只有两种类型PUT和DELETE
tail.go
拿到更新后,tail模块的goroutine读取通道信息,并发往kafak。
package tail
import (
"fmt"
"github.com/wind-zhou/Logagent_demo/etcd"
"time"
)
var tskMgr *taillogMgr
//从指定位置读取文件内容
func Init(LogEtcdConf []*etcd.LogEntry){
tskMgr=&taillogMgr{
logEntry: LogEtcdConf,//把当前日志收集项配置信息保存起来
tskMap:make(map[string]*TailTask,16),//用来存储各个task
newConfChan: make(chan []*etcd.LogEntry),//无缓冲区通道,用来接收热更改配置
}
//1.遍历切片,得到每个tailtask
for _,logentry := range LogEtcdConf { //遍历切片得到结构体指针
tailObj:=NewTailTask(logentry.Path,logentry.Topic)//创建了一个tail实例
mk:=fmt.Sprintf("%s_%s",logentry.Path,logentry.Topic)//用path和topic拼接一个字符串用作每次的key值
tskMgr.tskMap[mk]=tailObj//将实例放入map
}
//test
fmt.Println("开始的数据项为:")
for k,v:=range tskMgr.tskMap{
fmt.Printf("%v:%v
",k,v)
}
//select { }//这个select的作用是,等待时间,为了是等待协程,也可以是for{}
//创建一个goroutine,在后台默默的监听通道,一读取etcd变化的信息,并处理
go tskMgr.run()//负责从chan读取更新的配置
}
//监听自己的newConfChan,有了新配置就处理
func (t *taillogMgr)run(){
for {
select {
case newConf:= <- t.newConfChan:
//
//fmt.Println("--------------------")
//fmt.Println("tail模块收到的配置为:")
//for _,value:=range newConf{
// fmt.Printf("%v
",value)
//}
//fmt.Println("--------------------")
//1.判断有没有新增的配置项
for _,conf:=range newConf{
mk:=fmt.Sprintf("%s_%s",conf.Path,conf.Topic)
_,ok:=t.tskMap[mk]//判断是否该项是否为原来的配置项
if ok{
//原来就有
continue
}else {
//新增的
tailObj:=NewTailTask(conf.Path,conf.Topic)//NewTailTask会根据配置文件建立和日志的联系,并读取
fmt.Printf("tail task %s_%s 启动了了
",tailObj.Path,tailObj.Topic)
t.tskMap[mk]=tailObj
}
}
//2.判断有没有删除的配置项
//找出t.logEntry有但newconf没有的,删除掉
for _,c1:=range t.logEntry{ //从原来配置中一次拿出配置项,去新的配置中逐一比较
isDelete:=true
for _,c2:=range newConf {
if c2.Path == c1.Path && c2.Topic == c1.Topic {
isDelete = false
continue
}
}
if isDelete{
//把c1对应的这个tailObj停掉,怎么停掉这个之前的协程呢?用context
mk:=fmt.Sprintf("%s_%s",c1.Path,c1.Topic)
t.tskMap[mk].canCel()
delete(t.tskMap, mk) //删除记录
//fmt.Println("*********************************")
//fmt.Println("删除某个日志项后,系统存储的日志项为:")
//
//for _,v:=range t.tskMap{
// fmt.Printf("%v
",v)
//}
//fmt.Println("*********************************")
}
}
//2.配置删除
fmt.Println("新配置来了",newConf)
default:
time.Sleep(time.Second)
}
}
}
//向外暴露一个通道
func NewConfChan() chan<- []*etcd.LogEntry{
return tskMgr.newConfChan
}
这块代码的复杂之处是根据拿到的信息判断里面各个日志收集项的具体变化,判断是开启还是关闭goroutine。
tail_mgr.go
这两个地方是通过context控制进程的退出,每次开启新的TailTask时就创建一个上下文。
main.go
主函数中开启一个协程,不断的在后台watch etcd的变化。
这个模块许多的地方的细节都需要注意,例如:
如何在不同的模块使用一个通道交流数据,这里时构造了一个函数,用于暴露通道,并且,通道是引用类型,因此赋值时相当于传递了自身的地址。
如何使用context控制goroutine退出。
还有一些基础的东西例如,同步的锁的使用,map,切片的操作等
调试结果:
最开始系统有三个配置项:
然后运行另外一个程序将配置项改为两个:
这是在系统可以检测到配置的变化:
这是在 d:/xxx/linux.log_linux_log下日志,那么kafka将接受不到信息:
此时若再次把配置改为三个,那么linux_log主题日志将会重新发往kafka。
测试成功。
>目前这里有还一个bug:
>当开始时三个日志项,然后切换从两个日志项时,系统可以识别变化,然后再切换成三个日志项,系统也能识别变化,但若开始时两个日志项,第一次切换成三个日志项时可以识别变化,但若再次切换成两个时,系统便不能识别,再新配置中被删除的日志项的goroutine无法被关闭。
>
logagent的优化
本模块中tail在读取到日志内容后,会直接调用kafka.SendToKafka函数往kafka发数据,这里函数调用函数,是一个同步操作,因此当tail读取日志和发往kafka速度不匹配时就会使一方进行等待,浪费性能。
改进的方法是通过通道使同步操作变成异步操作(是不是和消息队列很像!)
之前的方式:
改进后:
改进后相当于把之前的动作拆成了两部分:
- tail拿到数据发往通道
- 起一个goroutine不断地在通道中取数据(在kafka初始化时调用此goroutine)
tail_mgr.go
kafka.go
以上是关于海量日志收集项目总结logagent开发的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot + Kafka + ELK 完成海量日志收集(超详细)
SpringBoot+Kafka+ELK 完成海量日志收集(超详细)