python3 端口扫描类

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python3 端口扫描类相关的知识,希望对你有一定的参考价值。

# coding=utf-8

from queue import Queue
import nmap
import threading
import requests
import chardet
import re
import json
import os

# 存储所有扫描的ip和端口服务
final_domains = []

# 存储每个ip的端口的临时列表
ports = []

# 全局锁
glock = threading.Lock()


# 端口服务扫描类
class PortScan(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self._queue = queue

    # 作为线程类,需要重写run方法
    def run(self):
        while not self._queue.empty():
            scan_ip = self._queue.get()  # 从队列出取出每一个扫描的ip
            glock.acquire()
            try:
                self.portscan(scan_ip)
                self.Scan(scan_ip)
            except Exception as e:
                print(e)
                pass
            glock.release()


    # 调用masscan识别端口
    def portscan(self, scan_ip):
        temp_ports = []  # 设定一个临时端口列表
        os.system(‘masscan.exe ‘ + scan_ip + ‘ -p 1-100 -oJ masscan.json --rate 2000‘)

        # 提取json文件中的端口
        with open(‘masscan.json‘, ‘r‘) as f:
            for line in f:
                if line.startswith(‘{ ‘):
                    temp = json.loads(line[:-2])  # 取出一条完整json形式的数据
                    temp_ports.append(str(temp["ports"][0]["port"]))# 端口取出加入临时端口中

        print(temp_ports)

        if len(temp_ports) > 25:
            temp_ports.clear()  # 如果端口数量大于30,说明可能存在防火墙,属于误报,清空列表
        else:
            ports.extend(temp_ports)  # 小于30则放到总端口列表里


    # 获取网站的web应用程序名和网站标题信息
    def Title(self,scan_url_port, service_name):
        try:
            resp = requests.get(scan_url_port, timeout=3, verify=False)
            # 获取网站的页面编码并且应用
            detectencode = chardet.detect(resp.content)  # 利用chardet模块检测编码
            response = re.findall(r‘<title>(.*?)</title>‘, resp.content.decode(detectencode[‘encoding‘]), re.S)  # re.S的作用 匹配的时候扩展到整个字符串(包括换行这些
)
            if response:  # 如果访问的时候正则匹配到<title>标签
                # 将页面解码为utf-8,获取中文标题
                # 如果访问的时候正则匹配到title标签
                res = response[0]
                banner = resp.headers[‘server‘]
                final_domains.append(scan_url_port + ‘	‘ + banner + ‘	‘ + res)
            else:
                final_domains.append(scan_url_port + ‘	‘ + service_name + ‘	‘ + "获取标题失败,请手动尝试!!!")
        except:
            pass


    # 调用nmap识别服务
    def Scan(self,scan_ip):
        nm = nmap.PortScanner()
        try:
            for port in ports:
                ret = nm.scan(scan_ip, port, arguments=‘-Pn -sS‘)
                service_name = ret[‘scan‘][scan_ip][‘tcp‘][int(port)][‘name‘]
                print(‘[*] 主机 ‘ + scan_ip + ‘ 的 ‘ + str(port) + ‘ 端口服务为: ‘ + service_name)
                if ‘http‘ in service_name or service_name == ‘sun-answerbook‘:
                    if service_name == ‘https‘ or service_name == ‘https-alt‘:
                        scan_url_port = ‘https://‘ + scan_ip + ‘:‘ + str(port)
                        self.Title(scan_url_port, service_name)
                    else:
                        scan_url_port = ‘http://‘ + scan_ip + ‘:‘ + str(port)
                        self.Title(scan_url_port, service_name)
                else:
                    # 形式为:["47.96.196.217:443    https","47.96.196.217:80    blackice-icecap"]....
                    final_domains.append(scan_ip + ‘:‘ + str(port) + ‘	端口服务为: ‘ + service_name)
        except Exception as e:
            print(e)
            pass
        ports.clear() # 扫一次清理一次


# 启用多线程扫描
def main():
    queue = Queue(1000)
    try:
        f = open(‘ip.txt‘, ‘r‘)  # 把文本中的内容按
分割加入到queue队列中
        for line in f.readlines():
            final_ip = line.strip(‘
‘)
            queue.put(final_ip)
            print("加入队列---->" + final_ip)
        threads = []  # 创建一个列表
        thread_count = 50
        for i in range(thread_count):
            threads.append(PortScan(queue))  # 列表中每个都作为一个线程进行扫描
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        f.close()

    except:
        pass


if __name__ == ‘__main__‘:
    main()  # 主进程
    tmp_domians = []
    for tmp_domain in final_domains:  # 循环一次 转移到tmp_domains里面
        if tmp_domain not in tmp_domians:
            tmp_domians.append(tmp_domain)
    for url in tmp_domians:
        with open(‘scan_url_port.txt‘, ‘a‘) as ff:
            ff.write(url+‘
‘)

以上是关于python3 端口扫描类的主要内容,如果未能解决你的问题,请参考以下文章

扫描端口占用情况的python脚本

Python3小工具——结合nmap扫描

端口扫描nmap

端口扫描工具mascan核心使用

Python 端口扫描(全连接,无多线程)

自制最简单的端口扫描器