大数据集群硬盘资源监控

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据集群硬盘资源监控相关的知识,希望对你有一定的参考价值。

文章目录

需求

  1. 统计集群服务器硬盘每日增量
  2. 统计HDFS每天数据增量
  3. 统计数仓每天数据增量
  4. 统计数仓ODS层和DWD层每天数据增量

硬盘资源监控命令

监控项命令默认单位
某台服务器硬盘使用量dfKB
HDFS总使用量hadoop fs -dfB
HDFS上某个文件夹的大小hadoop fs -du -s /B

mysql存储数据

建库

CREATE DATABASE data_management;
USE data_management;

建表

CREATE TABLE item_info(
  item_id    INT           PRIMARY KEY   COMMENT '监控项ID',
  item_name  VARCHAR(255)  NOT NULL      COMMENT '监控项名称',
  item_unit  VARCHAR(63)   DEFAULT NULL  COMMENT '计量单位'
)COMMENT='监控项信息表';
CREATE TABLE item_records(
  item_id           INT       COMMENT '监控项ID',
  monitoring_time   DATETIME  COMMENT '监测时间',
  monitoring_value  BIGINT    COMMENT '监测值',
  PRIMARY KEY(item_id,monitoring_time)
)COMMENT='监控项流水表';

插入数据

INSERT item_info VALUES
(1,'HDFS数据总使用量','B'),
(2,'数仓占用量(含副本)','B'),
(3,'数仓ODS层占用量(含副本)','B'),
(4,'数仓DWD层占用量(含副本)','B'),
(5,'hadoop105服务器硬盘使用','KB'),
(6,'hadoop106服务器硬盘使用','KB'),
(7,'hadoop107服务器硬盘使用','KB');

创建视图

-- 最近30天,HDFS每天总使用量(平均值)
CREATE OR REPLACE VIEW hdfs_used_ymd AS
SELECT
    DATE(monitoring_time) AS ymd,
    AVG(monitoring_value) AS b
FROM item_records
WHERE item_id=1 AND DATEDIFF(NOW(),monitoring_time)<31
GROUP BY DATE(monitoring_time);
-- 计算每天变化量
CREATE OR REPLACE VIEW hdfs_ymd AS
SELECT
    t1.ymd AS ymd,
    t1.b AS b,
    t1.b-t2.b AS increase
FROM hdfs_used_ymd t1
INNER JOIN hdfs_used_ymd t2
ON t1.ymd=DATE_ADD(t2.ymd,INTERVAL 1 DAY);
-- 最近30天各服务器硬盘使用(平均值)
CREATE OR REPLACE VIEW df_used_hosts_ymd AS
SELECT
    t1.item_name AS hostname,
    DATE(t2.monitoring_time) AS ymd,
    AVG(t2.monitoring_value) AS kb
FROM(
    SELECT item_id,item_name FROM item_info
    WHERE item_name LIKE '%服务器硬盘使用'
)t1 LEFT JOIN (
    -- 筛选最近30天
    SELECT item_id,monitoring_time,monitoring_value
    FROM item_records
    WHERE DATEDIFF(NOW(),monitoring_time)<31
)t2 ON t1.item_id=t2.item_id
GROUP BY DATE(t2.monitoring_time),t1.item_id;
-- 最近30天所有服务器硬盘使用总和
CREATE OR REPLACE VIEW df_used_ymd AS
SELECT ymd,SUM(kb)/1024/1024 AS gb
FROM df_used_hosts_ymd
GROUP BY ymd;
-- 最近30天所有服务器硬盘总使用及变化
CREATE OR REPLACE VIEW df_used AS
SELECT
    t1.ymd AS ymd,
    t1.gb AS gb,
    t1.gb-t2.gb AS increase
FROM df_used_ymd t1
INNER JOIN df_used_ymd t2
ON t1.ymd=DATE_ADD(t2.ymd,INTERVAL 1 DAY);

Python3获取数据

from time import strftime
from subprocess import check_output
from re import split
from pymysql.connections import Connection  # conda install pymysql

# 要监控的HDFS文件夹,Key对应MySQL的data_management.item_info的item_id
HDFS_PATH = 
    2: '/user/hive/warehouse/ec.db',
    3: '/user/hive/warehouse/ec.db/ods',
    4: '/user/hive/warehouse/ec.db/dwd',

# 要监控的节点硬盘,Key对应MySQL的data_management.item_info的item_id
HOSTNAMES = 
    5: 'hadoop105',
    6: 'hadoop106',
    7: 'hadoop107',



def _cmd(command) -> str:
    """指定用户执行Shell命令,并返回结果"""
    return check_output('sudo -i -u yellow ' + command, shell=True).decode().strip()


def _mysql_formatting(a: int, b: str, c: int) -> dict:
    """数据写入MySQL的字段格式"""
    return 'item_id': a, 'monitoring_time': b, 'monitoring_value': c


def _now() -> str:
    return strftime('%Y-%m-%d %H:%M:%S')


def hadoop_fs_df() -> dict:
    """查询HDFS数据量"""
    result = _cmd('hadoop fs -df')
    now = _now()
    dt = k: v for k, v in zip(*(split(r'\\s+', line) for line in result.split('\\n')))
    print(dt)
    return _mysql_formatting(1, now, int(dt['Used']))


def hadoop_fs_du(item_id=2) -> dict:
    """查询HDFS文件夹大小"""
    result = _cmd('hadoop fs -du -s ' + HDFS_PATH[item_id])
    now = _now()
    print(result)
    value = split(r'\\s+', result)[1]
    return _mysql_formatting(item_id, now, int(value))


def hadoop_fs_du_ods() -> dict:
    """查询ODS层文件夹大小"""
    return hadoop_fs_du(3)


def hadoop_fs_du_dwd() -> dict:
    """查询DWD层文件夹大小"""
    return hadoop_fs_du(4)


def df(item_id) -> dict:
    """查询指定服务器硬盘使用,执行用户具有sudo权限,集群之间root用户可免密登录"""
    hostname = HOSTNAMES[item_id]
    cmd = 'sudo ssh  df /'.format(hostname)
    print(cmd)
    result = _cmd(cmd)
    now = _now()
    dt = k: v for k, v in zip(*(split(r'\\s+', line) for line in result.split('\\n')))
    print(dt)
    return _mysql_formatting(item_id, now, int(dt['Used']))


def df_hadoop105() -> dict:
    """查询hadoop105服务器硬盘使用"""
    return df(5)


def df_hadoop106() -> dict:
    """查询hadoop106服务器硬盘使用"""
    return df(6)


def df_hadoop107() -> dict:
    """查询hadoop107服务器硬盘使用"""
    return df(7)


class Mysql:
    def __init__(self, host, database, password, user='root'):
        self.db = Connection(
            user=user,
            password=password,
            host=host,
            database=database,
            port=3306,
            charset='UTF8')
        self.cursor = self.db.cursor()

    def __del__(self):
        self.cursor.close()
        self.db.close()

    def commit(self, sql):
        print(sql)
        try:
            self.cursor.execute(sql)
            self.db.commit()
        except Exception as e:
            print(e)

    def insert(self, dt, tb):
        ls = [(k, v) for k, v in dt.items() if v is not None]
        sql = 'INSERT %s (' % tb + ','.join(i[0] for i in ls) + \\
              ') VALUES (' + ','.join('%r' % i[1] for i in ls) + ')'
        self.commit(sql)


if __name__ == '__main__':
    db = Mysql(
        host='hadoop105',
        password='密码',
        database='data_management')
    TABLE = 'item_records'
    db.insert(hadoop_fs_df(), TABLE)
    db.insert(hadoop_fs_du(), TABLE)
    db.insert(hadoop_fs_du_ods(), TABLE)
    db.insert(hadoop_fs_du_dwd(), TABLE)
    db.insert(df_hadoop105(), TABLE)
    db.insert(df_hadoop106(), TABLE)
    db.insert(df_hadoop107(), TABLE)

定时任务

crontab -e
# 每天跑一次
0 0 * * * /home/miniconda/miniconda3/bin/python /home/miniconda/df.py

以上是关于大数据集群硬盘资源监控的主要内容,如果未能解决你的问题,请参考以下文章

大数据集群资源监控

大数据集群资源监控Zabbix

大数据资源监控—— IDC机房集群指标获取

zabbix监控ESXi上虚拟机硬盘的增长量及占用量

blackbox

一篇运维老司机的大数据平台监控宝典-联通大数据集群平台监控体系进程详解