markdown 构建influxdata监控

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了markdown 构建influxdata监控相关的知识,希望对你有一定的参考价值。

# Influxdata 组件

## 组件介绍

1. Telegraf 开源插件驱动的服务器代理,用于收集和报告指标,例如cpu、内存、磁盘、http服务等等。
2. InfluxDB 开源时间序列数据库,旨在处理高写入和查询负载。
3. Chronograf 开源Web应用程序,用于基础架构监控,警报管理,数据可视化和数据库管理。
4. Kapacitor 开源数据处理框架,可以轻松创建警报,运行ETL作业并检测异常情况。

在以上组件中InfluxDB是核心组件,telegraf将收集到的信息发送到InfluxDB中,Chronograf提供图形化界面展示InfluxDB中的数据,开源版本的不支持权限管理,如果需要权限管理可以用grafana替代,grafana新版本也提供设置报警策略的,但功能比较单一。报警策略的设置推荐Chronograf+Kapacitor组合,内置TICKScript脚本,功能强大,可配置性较强。

## 业务场景

1. 计算机硬件资源监控不在考虑范围内,运维同学已经做了 。
2. 监控KafkaConsumer消费,日志堆积及时报警
3. 监控http某一具体业务的网络流量,流量异常及时报警
4. 其他业务需求

## 数据采集

初期考虑应用程序直连InfluxDB,将数据按telegraf上报数据的格式直接写到InfluxDB中,但是类似于KafkaConsumer这类分布式应用,需要先将数据聚合到一起再上报到InfluxDB中,比较麻烦。后来研究了一下telegraf的插件机制,直接扩展telegraf的inputs插件。

## 扩展telegraf插件

1. 下载代码:git clone https://github.com/influxdata/telegraf.git
2. 在telegraf/plugins/inputs目录下创建 kafka_consumer_offset/kafka_consumer_offset.go,具体代码在文档末尾
3. 修改telegraf/plugins/inputs/all/all.go文件,注册kafka_consumer_offset组件
4. 在telegraf目录下执行make命令进行编译,需要配置代理。

---

## kafka_consumer_offset.go

```golang
package kafka_consumer_offset

import (
    "github.com/bsm/sarama-cluster"
    "github.com/Shopify/sarama"
    "github.com/influxdata/telegraf"
    "github.com/influxdata/telegraf/plugins/inputs"
    "github.com/influxdata/telegraf/plugins/parsers"
    "log"
)

type ConsumerConfig struct {
    ConsumerGroup string
    Topics        []string
}

type KafkaConsumer struct {
    Brokers         []string
    ConsumerConfigs []ConsumerConfig
}

var sampleConfig = `
    ## kafka servers
    brokers = ["localhost:9092"]

    [[inputs.kafka_consumer_offset.consumer_configs]]
        consumer_group="consumer_group_id_1"
        topics = ["topic1", "topic2"]
    [[inputs.kafka_consumer_offset.consumer_configs]]
        consumer_group="consumer_group_id_2"
    topics = ["topic1", "topic2"]
`

type offsetInfo struct {
    //当前消费的offset
    Offset   int64
    //partition中的最新的offset
    Length int64
}

func (kc *KafkaConsumer) closeCoordinator(broker *sarama.Broker, err error, client cluster.Client, consumerGroup string) {
    if broker != nil {
        _ = broker.Close()
    }

    switch err {
    case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
        _ = client.RefreshCoordinator(consumerGroup)
    }
}

/**
    从kafka中获取指定消费者的中topic的当前消费offset以及每个partition的总偏移量
 */
func (kc *KafkaConsumer) fetchOffsets(consumerGroup string, topics []string) (map[string]map[int32]offsetInfo, error) {

    req := &sarama.OffsetFetchRequest{
        Version:       1,
        ConsumerGroup: consumerGroup,
    }
    client, err := cluster.NewClient(kc.Brokers, nil)

    if err != nil {
        return nil, err
    }

    defer client.Close()

    broker, err := client.Coordinator(consumerGroup)
    if err != nil {
        kc.closeCoordinator(broker, err, *client, consumerGroup)
        return nil, err
    }

    defer broker.Close()
    offsets := make(map[string]map[int32]offsetInfo, len(topics))

    for _, topic := range topics {
        partitions, err := client.Partitions(topic)
        if err != nil {
            continue
        }

        offsets[topic] = make(map[int32]offsetInfo, len(partitions))
        for _, partition := range partitions {
            //offsets[topic][partition] = offsetInfo{Offset: -1}
            req.AddPartition(topic, partition)
            newestOffset, _ := client.GetOffset(topic, partition, -1)
            offsets[topic][partition] = offsetInfo{Length: newestOffset}
        }
    }

    resp, err := broker.FetchOffset(req)
    log.Print()
    if err != nil {
        kc.closeCoordinator(broker, err, *client, consumerGroup)
        return nil, err
    }

    for topic, v := range resp.Blocks {
        for partition, block := range v {
            if block.Err == sarama.ErrNoError {
                    //获取指定partition中最新offset
                    //newestOffset, _ := client.GetOffset(topic, partition, -1)
                     oi := offsets[topic][partition]
                    offsets[topic][partition] = offsetInfo{Length: oi.Length, Offset:block.Offset}
                    //offsets[topic][partition] = offsetInfo{Offset: block.Offset, Length: newestOffset}

            } else {
                continue
            }
        }
    }

    return offsets, nil
}

func (kc *KafkaConsumer) SampleConfig() string {
    return sampleConfig
}

func (kc *KafkaConsumer) Description() string {
    return "Read metrics from Kafka topic(s)"
}

func (kc *KafkaConsumer) SetParser(parser parsers.Parser) {

}

func (kc *KafkaConsumer) Gather(acc telegraf.Accumulator) error {
    for _, cc := range kc.ConsumerConfigs {
        offsets, err := kc.fetchOffsets(cc.ConsumerGroup, cc.Topics)
        if err != nil {
            acc.AddError(err)
            return nil
        }
        //计算topic所有partition offset之和
        oi := offsetInfo{Offset: 0, Length: 0}
        for topic, tos := range offsets {

            for _,os := range tos {
                if os.Offset > 0 {
                    oi.Offset += os.Offset
                }

                if os.Length > 0 {
                    oi.Length += os.Length
                }
            }

            fields := map[string]interface{} {
                "offset": oi.Offset,
                "length": oi.Length,
                "lag": oi.Length - oi.Offset, //有多少没消费的
            }

            tags := map[string]string {
                "consumer_group": cc.ConsumerGroup,
                "topic": topic,
            }

            acc.AddCounter("kafka_consumer_offset", fields, tags )
        }
    }

    return nil
}

func init() {
    inputs.Add("kafka_consumer_offset", func() telegraf.Input {
        return &KafkaConsumer{}
    })
}
```

以上是关于markdown 构建influxdata监控的主要内容,如果未能解决你的问题,请参考以下文章

系统监控平台

Centos7.X 搭建Grafana+Jmeter+Influxdb 性能实时监控平台(不使用docker)

Centos7.X 搭建Grafana+Jmeter+Influxdb 性能实时监控平台(不使用docker)

Ocelot监控

telegraf+influxdb+grafana开源监控架构

jmeter+influxdb+grafana性能测试监控