大数据集群硬盘资源监控

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据集群硬盘资源监控相关的知识,希望对你有一定的参考价值。

文章目录

需求

  1. 统计集群服务器硬盘每日增量
  2. 统计HDFS每天数据增量
  3. 统计数仓每天数据增量
  4. 统计数仓ODS层和DWD层每天数据增量

硬盘资源监控命令

监控项命令默认单位
某服务器硬盘使用量dfKB
HDFS总使用量hadoop fs -dfB
HDFS上某个文件夹的大小hadoop fs -du -s /B

代码

Python用于获取和处理数据,mysql用于存储数据
选择MySQL8原因:8版本支持WITH AS窗口函数
选择Python2原因:CentOS7自带Python2,免安装

MySQL

安装MySQL8

https://blog.csdn.net/Yellow_python/article/details/104184171

建表

建库

CREATE DATABASE data_management;
USE data_management;

建表

CREATE TABLE item_info(
  item_id    INT           PRIMARY KEY   COMMENT '监控项ID',
  item_name  VARCHAR(255)  NOT NULL      COMMENT '监控项名称',
  item_unit  VARCHAR(63)   DEFAULT NULL  COMMENT '计量单位'
)COMMENT='监控项信息表';
CREATE TABLE item_records(
  item_id           INT       NOT NULL  COMMENT '监控项ID',
  monitoring_time   DATETIME  NOT NULL  COMMENT '监测时间',
  monitoring_value  BIGINT    NOT NULL  COMMENT '监测值',
  PRIMARY KEY(item_id,monitoring_time)
)COMMENT='监控项流水表';

插入数据

INSERT item_info VALUES
(1,'HDFS数据总使用量','B'),
(2,'数仓占用量(含副本)','B'),
(3,'数仓ODS层占用量(含副本)','B'),
(4,'数仓DWD层占用量(含副本)','B'),
(5,'hadoop105服务器硬盘使用','KB'),
(6,'hadoop106服务器硬盘使用','KB'),
(7,'hadoop107服务器硬盘使用','KB');

创建视图

视图 便于 查看每日环比增量 以及 提供数据接口

CREATE OR REPLACE VIEW hdfs_ymd AS
-- 最近30天,HDFS每天总硬盘使用(平均值)
WITH hdfs_used_ymd AS(
    SELECT
        DATE(monitoring_time) AS ymd,
        AVG(monitoring_value) AS b
    FROM item_records
    WHERE item_id=1 AND DATEDIFF(NOW(),monitoring_time)<31
    GROUP BY DATE(monitoring_time))
-- 计算每天变化量
SELECT
    t1.ymd AS ymd,
    t1.b/1024/1024/1024 AS gb,
    (t1.b-t2.b)/1024/1024/1024 AS increase
FROM hdfs_used_ymd t1
INNER JOIN hdfs_used_ymd t2
ON t1.ymd=DATE_ADD(t2.ymd,INTERVAL 1 DAY)
ORDER BY ymd;
-- 最近30天所有服务器硬盘总使用及变化
CREATE OR REPLACE VIEW df_ymd AS
WITH
-- 筛选监控项目:服务器硬盘使用
df_id AS(
    SELECT item_id FROM item_info
    WHERE item_name LIKE '%服务器硬盘使用'),
-- 筛选最近30天
r30 AS(
    SELECT
        item_id,
        DATE(monitoring_time) AS ymd,
        monitoring_value
    FROM item_records
    WHERE DATEDIFF(NOW(),monitoring_time)<31),
-- 最近30天各服务器硬盘使用(平均值)
df_used_hosts_ymd AS(
    SELECT
        r30.ymd AS ymd,
        AVG(r30.monitoring_value) AS kb
    FROM df_id LEFT JOIN r30 ON df_id.item_id=r30.item_id
    GROUP BY r30.ymd,df_id.item_id),
-- 最近30天所有服务器硬盘使用总和
df_used_ymd AS(
    SELECT ymd,SUM(kb) AS kb
    FROM df_used_hosts_ymd
    GROUP BY ymd)
-- 计算每日增量
SELECT
    ymd,
    kb/1048576 AS gb,
    (kb-LAG(kb,1,NULL)OVER(ORDER BY ymd))/1048576 AS increase
FROM df_used_ymd;

创建一个元数据表,存储视图信息

CREATE TABLE metadata_view(
  metadata_name  VARCHAR(255)  PRIMARY KEY  COMMENT '元数据名称',
  explanation    VARCHAR(255)  NOT NULL     COMMENT '元数据说明'
)COMMENT='视图的元数据信息';
-- 创建视图后,写入视图说明
INSERT metadata_view VALUES
('hdfs_ymd','最近30天,HDFS每天总使用量及每日增量'),
('hdfs_ymd.increase','每日增量'),
('df_ymd','最近30天,所有服务器硬盘使用及每日增量之和'),
('df_ymd.increase','每日增量');

Python2

注意:Python2脚本和MySQL8要在同一台服务器,便于执行mysql -e
执行用户:非root

#!/usr/bin/python2
# coding=utf-8
from os import system
from subprocess import check_output


def execute(cmd):
    """执行Linux命令"""
    print(cmd)
    system(cmd)


def evaluate(cmd):
    """执行Linux命令并获取返回值,Python2返回值类型为str"""
    print(cmd)
    return check_output(cmd, shell=True).strip()


class Mysql:
    def __init__(self, password, user='root', host=None, database=None):
        self.cmd = 'sudo mysql -u"" -p"" '.format(user, password)
        if host:
            self.cmd += '-h"" '.format(host)
        if database:
            self.cmd += '-D"" '.format(database)

    def e(self, sql):
        return self.cmd + '-e""'.format(sql)

    def insert(self, dt, tb):
        ls = [(k, v) for k, v in dt.items() if v is not None]
        sql = 'INSERT %s (' % tb + ','.join(i[0] for i in ls) + \\
              ') VALUES (' + ','.join('%r' % i[1] for i in ls) + ')'
        execute(self.e(sql))


if __name__ == '__main__':
    from re import split
    # 要监控的HDFS文件夹,Key对应MySQL表.item_info的item_id
    HDFS_PATH = 
        2: '/user/hive/warehouse/ec.db',
        3: '/user/hive/warehouse/ec.db/ods',
        4: '/user/hive/warehouse/ec.db/dwd',
    
    # 要监控的节点硬盘,Key对应MySQL表.item_info的item_id
    HOSTNAMES = 
        5: 'hadoop105',
        6: 'hadoop106',
        7: 'hadoop107',
    

    def _now():
        return evaluate('date +"%Y-%m-%d %H:%M:%S"')

    def _mysql_formatting(a, b, c):
        """数据写入MySQL的字段格式"""
        return 'item_id': a, 'monitoring_time': b, 'monitoring_value': c

    def hadoop_fs_df():
        """查询HDFS数据量"""
        result = evaluate('hadoop fs -df')
        now = _now()
        print(result)
        dt = k: v for k, v in zip(*(split(r'\\s+', line) for line in result.split('\\n')))
        return _mysql_formatting(1, now, int(dt['Used']))

    def hadoop_fs_du(item_id=2):
        """查询HDFS文件夹大小"""
        result = evaluate('hadoop fs -du -s ' + HDFS_PATH[item_id])
        now = _now()
        print(result)
        value = split(r'\\s+', result)[1]
        return _mysql_formatting(item_id, now, int(value))

    def hadoop_fs_du_ods():
        """查询ODS层文件夹大小"""
        return hadoop_fs_du(3)

    def hadoop_fs_du_dwd():
        """查询DWD层文件夹大小"""
        return hadoop_fs_du(4)

    def df(item_id):
        """查询指定服务器硬盘使用,执行用户具有sudo权限,集群之间root用户可免密登录"""
        hostname = HOSTNAMES[item_id]
        cmd = 'sudo ssh  df /'.format(hostname)
        result = evaluate(cmd)
        print(result)
        now = _now()
        dt = k: v for k, v in zip(*(split(r'\\s+', line) for line in result.split('\\n')))
        return _mysql_formatting(item_id, now, int(dt['Used']))

    def df_hadoop105():
        """查询hadoop105服务器硬盘使用"""
        return df(5)

    def df_hadoop106():
        """查询hadoop106服务器硬盘使用"""
        return df(6)

    def df_hadoop107():
        """查询hadoop107服务器硬盘使用"""
        return df(7)

    db = Mysql(
        password='密码',
        database='data_management')
    TABLE = 'item_records'
    db.insert(hadoop_fs_df(), TABLE)
    db.insert(hadoop_fs_du(), TABLE)
    db.insert(hadoop_fs_du_ods(), TABLE)
    db.insert(hadoop_fs_du_dwd(), TABLE)
    db.insert(df_hadoop105(), TABLE)
    db.insert(df_hadoop106(), TABLE)
    db.insert(df_hadoop107(), TABLE)

定时任务

crontab -e
# 每天跑一次
0 0 * * * python2 脚本路径.py

或使用DolphinScheduler


补充

原本使用Python3,代码保留
3版本优势:代码阅读性和维护性更好,开发效率更高
2版本优势:移植性更好,复制可用,不需要装环境

from time import strftime
from subprocess import check_output
from re import split
from pymysql.connections import Connection  # conda install pymysql

# 要监控的HDFS文件夹,Key对应MySQL表item_info的item_id
HDFS_PATH = 
    2: '/user/hive/warehouse/ec.db',
    3: '/user/hive/warehouse/ec.db/ods',
    4: '/user/hive/warehouse/ec.db/dwd',

# 要监控的节点硬盘,Key对应MySQL表item_info的item_id
HOSTNAMES = 
    5: 'hadoop105',
    6: 'hadoop106',
    7: 'hadoop107',



def _cmd(command) -> str:
    """指定用户执行Shell命令,并返回结果"""
    return check_output('sudo -i -u yellow ' + command, shell=True).decode().strip()


def _mysql_formatting(a: int, b: str, c: int) -> dict:
    """数据写入MySQL的字段格式"""
    return 'item_id': a, 'monitoring_time': b, 'monitoring_value': c


def _now() -> str:
    return strftime('%Y-%m-%d %H:%M:%S')


def hadoop_fs_df() -> dict:
    """查询HDFS数据量"""
    result = _cmd('hadoop fs -df')
    now = _now()
    dt = k: v for k, v in zip(*(split(r'\\s+', line) for line in result.split('\\n')))
    print(dt)
    return _mysql_formatting(1, now, int(dt['Used']))


def hadoop_fs_du(item_id=2) -> dict:
    """查询HDFS文件夹大小"""
    result = _cmd('hadoop fs -du -s ' + HDFS_PATH[item_id])
    now = _now()
    print(result)
    value = split(r'\\s+', result)[1]
    return _mysql_formatting(item_id, now, int(value))


def hadoop_fs_du_ods() -> dict:
    """查询ODS层文件夹大小"""
    return hadoop_fs_du(3)


def hadoop_fs_du_dwd() -> dict:
    """查询DWD层文件夹大小"""
    return hadoop_fs_du(4)


def df(item_id) -> dict:
    """查询指定服务器硬盘使用,执行用户具有sudo权限,集群之间root用户可免密登录"""
    hostname = HOSTNAMES[item_id]
    cmd = 'sudo ssh  df /'.format(hostname)
    print(cmd)
    result = _cmd(cmd)
    now = _now()
    dt = k: v for k, v in zip(*(split(r'\\s+', line) for line in result.split('\\n')))
    print(dt)
    return _mysql_formatting(item_id, now, int(dt['Used']))


def df_hadoop105() -> dict:
    """查询hadoop105服务器硬盘使用"""
    return df(5)


def df_hadoop106() -> dict:
    """查询hadoop106服务器硬盘使用"""
    return df(6)


def df_hadoop107() -> dict:
    """查询hadoop107服务器硬盘使用"""
    return df(7)


class Mysql:
    def __init__(self, host, database, password, user='root'):
        self.db = Connection(
            user=user,
            password=password,
            host=host,
            database=database,
            port=3306,
            charset='UTF8')
        self.cursor = self.db.cursor()

    def __del__(self):
        self.cursor.close()
        self.db.close()

    def commit(self, sql):
        print(sql)
        try:
            self.cursor.execute(sql)
            self.db.commit()
        except Exception as e:
            print(e)

    def insert(self, dt, tb):
        ls = [(k, v) for k, v in dt.items() if v is not None]
        sql = 'INSERT %s (' % tb + ','.join(i[0] for i in ls) + \\
              ') VALUES (' + ','.join('%r' % i[1] for i in ls) + ')'
        self.commit(sql)


if __name__ == '__main__':
    db = Mysql(
        host='hadoop107',
        password='密码',
        database='data_management')
    TABLE = 'item_records'
    db.insert(hadoop_fs_df(), TABLE)
    db.insert(hadoop_fs_du(), TABLE)
    db.insert(hadoop_fs_du_ods(), TABLE)
    db.insert(hadoop_fs_du_dwd(), TABLE)
    db.insert(df_hadoop105(), TABLE大数据集群资源监控

大数据集群资源监控Zabbix

大数据资源监控—— IDC机房集群指标获取

zabbix监控ESXi上虚拟机硬盘的增长量及占用量

blackbox

一篇运维老司机的大数据平台监控宝典-联通大数据集群平台监控体系进程详解