python 实现端口连通性检测

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 实现端口连通性检测相关的知识,希望对你有一定的参考价值。

#!/bin/env python
#AUTHOR:karl
#DATE:2018-1-3
#VERSION:V1.0
######################
import time
import os
import paramiko
import datetime
import sys
import MySQLdb
private_key = paramiko.RSAKey.from_private_key_file('/home/appdeploy/.ssh/id_rsa')
def TimeStampToTime(timestamp):
    timeStruct = time.localtime(timestamp)
    return time.strftime('%Y-%m-%d %H:%M:%S',timeStruct)
def Data_mysql(info):
    try:
        string=info
        Sip=string.split(" ")[0]
        Dip=string.split(" ")[1]
        Port=string.split(" ")[2]
        Result=string.split(" ")[3]
        con = MySQLdb.connect(host='localhost',user="root",passwd="********",db="zabbix",port=3306,charset="utf8")
        dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        cursor=con.cursor()
        sql="""INSERT INTO Balant_telnet (telnet_sourceip,telnet_desip,telnet_port,create_time,telnet_result) VALUES (%s,%s,%s,%s,%s) """
        valuse=(Sip,Dip,Port,dt,Result)
        result=cursor.execute(sql,valuse)
        cursor.close()
        con.commit()
        con.close()
    except MySQLdb.Error, e:
        print "Error %d: %s" % (e.args[0], e.args[1])
        sys.exit(1)
def check_port(ager):
    for index in range(len(ager.keys())):
        hostname=ager.keys()[index]
        ssh=paramiko.SSHClient()
        try:
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(hostname,port=22,username='appdeploy',pkey=private_key,timeout=5)
            cmd="sh /home/appdeploy/monitor.sh {value}".format(value=ager[ager.keys()[index]]) 
            stdin,stdout,stderr = ssh.exec_command(cmd)
            for file_d in stdout.readlines():  
                print file_d
                Data_mysql(file_d)
        except:
            result=cmd+','+'failed'+'\n'
        ssh.close()
def get_parameter():
    ager={}
    value=[]
    with open("monitor_config") as context:
        for line in context:
            while not "," in list(line):
                host=line.strip()[1:-1]
                value=[]
                break
            line=line.strip().split(",")
            value=value+line
            ager[str(host)]=value
    return ager
parm=get_parameter()
check_port(parm)

配置文件:

[10.117.194.23]

10.117.194.77,10.116.41.82,9920

10.117.194.77,10.116.47.12,1080

10.117.194.77,10.116.45.56,1081

[10.117.194.24]

10.117.194.78,10.116.41.82,9920

10.117.194.78,10.116.47.12,1080

10.117.194.78,10.116.45.56,1081

10.117.194.78,10.116.145.33,8001

【】里面的IP 是物理IP,下面分别对应着应用的源IP,目标IP和测试端口号


agent 直接用shell 实现


数据库中效果 

技术分享图片


最后通过zabbix 的API 解析下就能展示出来了。




以上是关于python 实现端口连通性检测的主要内容,如果未能解决你的问题,请参考以下文章

怎样检测TCP/UDP端口的连通性 netcat udp端口检查

怎样检测TCP/UDP端口的连通性

Linux系列:检测TCP/UDP端口的连通性

Linux系列:检测TCP/UDP端口的连通性

linux服务器怎么检测连通性

【udp】如何检测UDP端口的连通性