markdown 构建influxdata监控
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了markdown 构建influxdata监控相关的知识,希望对你有一定的参考价值。
# Influxdata 组件
## 组件介绍
1. Telegraf 开源插件驱动的服务器代理,用于收集和报告指标,例如cpu、内存、磁盘、http服务等等。
2. InfluxDB 开源时间序列数据库,旨在处理高写入和查询负载。
3. Chronograf 开源Web应用程序,用于基础架构监控,警报管理,数据可视化和数据库管理。
4. Kapacitor 开源数据处理框架,可以轻松创建警报,运行ETL作业并检测异常情况。
在以上组件中InfluxDB是核心组件,telegraf将收集到的信息发送到InfluxDB中,Chronograf提供图形化界面展示InfluxDB中的数据,开源版本的不支持权限管理,如果需要权限管理可以用grafana替代,grafana新版本也提供设置报警策略的,但功能比较单一。报警策略的设置推荐Chronograf+Kapacitor组合,内置TICKScript脚本,功能强大,可配置性较强。
## 业务场景
1. 计算机硬件资源监控不在考虑范围内,运维同学已经做了 。
2. 监控KafkaConsumer消费,日志堆积及时报警
3. 监控http某一具体业务的网络流量,流量异常及时报警
4. 其他业务需求
## 数据采集
初期考虑应用程序直连InfluxDB,将数据按telegraf上报数据的格式直接写到InfluxDB中,但是类似于KafkaConsumer这类分布式应用,需要先将数据聚合到一起再上报到InfluxDB中,比较麻烦。后来研究了一下telegraf的插件机制,直接扩展telegraf的inputs插件。
## 扩展telegraf插件
1. 下载代码:git clone https://github.com/influxdata/telegraf.git
2. 在telegraf/plugins/inputs目录下创建 kafka_consumer_offset/kafka_consumer_offset.go,具体代码在文档末尾
3. 修改telegraf/plugins/inputs/all/all.go文件,注册kafka_consumer_offset组件
4. 在telegraf目录下执行make命令进行编译,需要配置代理。
---
## kafka_consumer_offset.go
```golang
package kafka_consumer_offset
import (
"github.com/bsm/sarama-cluster"
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"log"
)
type ConsumerConfig struct {
ConsumerGroup string
Topics []string
}
type KafkaConsumer struct {
Brokers []string
ConsumerConfigs []ConsumerConfig
}
var sampleConfig = `
## kafka servers
brokers = ["localhost:9092"]
[[inputs.kafka_consumer_offset.consumer_configs]]
consumer_group="consumer_group_id_1"
topics = ["topic1", "topic2"]
[[inputs.kafka_consumer_offset.consumer_configs]]
consumer_group="consumer_group_id_2"
topics = ["topic1", "topic2"]
`
type offsetInfo struct {
//当前消费的offset
Offset int64
//partition中的最新的offset
Length int64
}
func (kc *KafkaConsumer) closeCoordinator(broker *sarama.Broker, err error, client cluster.Client, consumerGroup string) {
if broker != nil {
_ = broker.Close()
}
switch err {
case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
_ = client.RefreshCoordinator(consumerGroup)
}
}
/**
从kafka中获取指定消费者的中topic的当前消费offset以及每个partition的总偏移量
*/
func (kc *KafkaConsumer) fetchOffsets(consumerGroup string, topics []string) (map[string]map[int32]offsetInfo, error) {
req := &sarama.OffsetFetchRequest{
Version: 1,
ConsumerGroup: consumerGroup,
}
client, err := cluster.NewClient(kc.Brokers, nil)
if err != nil {
return nil, err
}
defer client.Close()
broker, err := client.Coordinator(consumerGroup)
if err != nil {
kc.closeCoordinator(broker, err, *client, consumerGroup)
return nil, err
}
defer broker.Close()
offsets := make(map[string]map[int32]offsetInfo, len(topics))
for _, topic := range topics {
partitions, err := client.Partitions(topic)
if err != nil {
continue
}
offsets[topic] = make(map[int32]offsetInfo, len(partitions))
for _, partition := range partitions {
//offsets[topic][partition] = offsetInfo{Offset: -1}
req.AddPartition(topic, partition)
newestOffset, _ := client.GetOffset(topic, partition, -1)
offsets[topic][partition] = offsetInfo{Length: newestOffset}
}
}
resp, err := broker.FetchOffset(req)
log.Print()
if err != nil {
kc.closeCoordinator(broker, err, *client, consumerGroup)
return nil, err
}
for topic, v := range resp.Blocks {
for partition, block := range v {
if block.Err == sarama.ErrNoError {
//获取指定partition中最新offset
//newestOffset, _ := client.GetOffset(topic, partition, -1)
oi := offsets[topic][partition]
offsets[topic][partition] = offsetInfo{Length: oi.Length, Offset:block.Offset}
//offsets[topic][partition] = offsetInfo{Offset: block.Offset, Length: newestOffset}
} else {
continue
}
}
}
return offsets, nil
}
func (kc *KafkaConsumer) SampleConfig() string {
return sampleConfig
}
func (kc *KafkaConsumer) Description() string {
return "Read metrics from Kafka topic(s)"
}
func (kc *KafkaConsumer) SetParser(parser parsers.Parser) {
}
func (kc *KafkaConsumer) Gather(acc telegraf.Accumulator) error {
for _, cc := range kc.ConsumerConfigs {
offsets, err := kc.fetchOffsets(cc.ConsumerGroup, cc.Topics)
if err != nil {
acc.AddError(err)
return nil
}
//计算topic所有partition offset之和
oi := offsetInfo{Offset: 0, Length: 0}
for topic, tos := range offsets {
for _,os := range tos {
if os.Offset > 0 {
oi.Offset += os.Offset
}
if os.Length > 0 {
oi.Length += os.Length
}
}
fields := map[string]interface{} {
"offset": oi.Offset,
"length": oi.Length,
"lag": oi.Length - oi.Offset, //有多少没消费的
}
tags := map[string]string {
"consumer_group": cc.ConsumerGroup,
"topic": topic,
}
acc.AddCounter("kafka_consumer_offset", fields, tags )
}
}
return nil
}
func init() {
inputs.Add("kafka_consumer_offset", func() telegraf.Input {
return &KafkaConsumer{}
})
}
```
以上是关于markdown 构建influxdata监控的主要内容,如果未能解决你的问题,请参考以下文章
Centos7.X 搭建Grafana+Jmeter+Influxdb 性能实时监控平台(不使用docker)
Centos7.X 搭建Grafana+Jmeter+Influxdb 性能实时监控平台(不使用docker)