SELECTORS模块实现并发简单版FTP

Posted Remember

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SELECTORS模块实现并发简单版FTP相关的知识,希望对你有一定的参考价值。

环境:windows, python 3.5
功能:
使用SELECTORS模块实现并发简单版FTP
允许多用户并发上传下载文件

结构:
ftp_client ---|
bin ---|
start_client.py ......启动客户端
conf---|
config.py ......客户端参数配置
system.ini ......客户端参数配置文件
core---|
clients.py ......客户端主程序
home ......默认下载路径
ftp_server ---|
bin ---|
start_server.py ......启动服务端
conf---|
config.py ......服务端参数配置
system.ini ......服务端参数配置文件
core---|
servers.py ......服务端主程序
db ---|
data.py ......存取用户数据
home ......默认上传路径

功能实现:
客户端输入命令,根据命令通过映射进入相应方法,上传或者下载;
服务端用SELECTORS模块实现并发,接收数据,通过action用映射进入相应方法;


如何使用:
启动start_server.py,启动start_client.py;
在客户端输入get pathname/put pathname进行上传下载文件,开启多个客户端可并发上传下载。

core:
技术分享图片
#!/usr/bin/env python
# -*-coding:utf-8-*-
# Author:zh
import socket
import os
import json
import random
import conf
PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep   # 默认下载存放路径为当前路径的home目录下


class FtpClient(object):
    def __init__(self):
        self.client = socket.socket()

    def connect(self, ip, port):
        self.client.connect((ip, port))

    def interactive(self):
        # 入口
        while True:
            cmd = input(">>").strip()
            if len(cmd) == 0:
                continue
            cmd_list = cmd.split()
            if len(cmd_list) == 2:
                cmd_str = cmd_list[0]
                if hasattr(self, cmd_str):
                    func = getattr(self, cmd_str)
                    func(cmd)
                else:
                    print("输入格式错误")
                    self.help()
            else:
                print("输入格式错误")
                self.help()

    @staticmethod
    def help():
        msg = ‘‘‘
get filepath(下载文件)
put filepath(上传文件)
        ‘‘‘
        print(msg)

    def get(self, *args):
        # 下载
        data = args[0].split()
        action = data[0]
        path = data[1]
        send_msg = {
            "action": action,  # 第一次请求,需要服务端返回文件信息
            "file_path": path  # F:\oracle课程\(必读)11gOCP考试流程与须知.rar
        }
        self.client.send(json.dumps(send_msg).encode())
        data = self.client.recv(1024)
        data = json.loads(data.decode())
        if data["sign"] == "0":
            file_length = data["length"]
            file_path = PATH+data["filename"]
            if os.path.isfile(file_path):
                file_path = file_path + str(random.randint(1,1000))
            file = open(file_path, "wb")
            send_msg_2 = {
                "action": "get_file_data",  # 第二次请求,请求客户端给发送文件数据
                "file_path": path  # 第二次请求依然需要传入文件路径,也可以在服务端利用全局变量保存
            }
            self.client.send(json.dumps(send_msg_2).encode())
            get_length = 0
            while get_length < int(file_length):
                data = self.client.recv(1024)
                file.write(data)
                get_length += len(data)
            else:
                file.close()
            if get_length == int(file_length):
                print("下载成功")
            else:
                print("文件传输失败")
                os.remove(file_path)  # 如果传输过程出现问题,删除文件
        else:
            print("文件不存在")

    def put(self, *args):
        # 上传
        data = args[0].split()
        action = data[0]
        file_path = data[1]
        if os.path.isfile(file_path):
            msg = {
                "action": action,  # 上传第一次发送,发送文件大小和姓名
                "length": os.stat(file_path).st_size,
                "filename": file_path[file_path.rfind("\\") + 1:]
            }
            self.client.send(json.dumps(msg).encode())
            self.client.recv(1024)  # 避免黏包
            file = open(file_path, "rb")
            for line in file:
                self.client.send(line)  # 上传第二次发送,发送文件数据,没有action,在服务端通过try,json报错的进入接收数据的方法里
            else:
                file.close()
            sign = self.client.recv(1024)
            if sign == b0:
                print("上传成功")
            else:
                print("上传失败")
        else:
            print("文件不存在")


def run():
    data = conf.config.Configuration()
    conf_data = data.get_config()
    obj = FtpClient()
    obj.connect(conf_data[0][1], int(conf_data[1][1]))
    obj.interactive()
clients

 

core:

技术分享图片
#!/usr/bin/env python
# -*-coding:utf-8-*-
# Author:zh
import selectors
import socket
import json
import os
import random
import conf
PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep  # 默认上传存放路径为当前路径的home目录下


class SelectorFtp(object):

    def __init__(self):
        self.sel = selectors.DefaultSelector()

    def connect(self, ip, port):
        sock = socket.socket()
        sock.bind((ip, port))
        sock.listen(100)
        sock.setblocking(False)
        self.sel.register(sock, selectors.EVENT_READ, self.accept)
        while True:
            events = self.sel.select()
            for key, mask in events:
                callback = key.data
                callback(key.fileobj, mask)

    def accept(self, sock, mask):
        conn, addr = sock.accept()  # Should be ready
        conn.setblocking(False)
        self.sel.register(conn, selectors.EVENT_READ, self.interactive)

    def interactive(self, conn, mask):
        # 回调函数,每次接收消息都判断action,根据action分别调用不同函数信息交互
        try:
            data = conn.recv(1024)
            if data:
                try:
                    cmd_dict = json.loads(data.decode())
                    action = cmd_dict[action]
                    if hasattr(self, action):
                        func = getattr(self, action)
                        func(cmd_dict, conn)
                except (UnicodeDecodeError, json.decoder.JSONDecodeError, TypeError) as e:
                    # put发送文件数据,无法通过json打包,采取json报错后进入方法循环接收
                    self.put_file_data(data, conn)
            else:
                self.sel.unregister(conn)
                conn.close()
        except ConnectionResetError as e:
            print(e)
            # 客户端意外断开时注销链接
            self.sel.unregister(conn)
            conn.close()

    def get(self, *args):
        # 下载第一步:判断文件是否存在并返回文件大小和文件名
        conn = args[1]
        file_path = args[0]["file_path"]
        if os.path.isfile(file_path):
            sign = "0"
            length = os.stat(file_path).st_size
        else:
            sign = "1"
            length = None
        msg = {
            "sign": sign,
            "length": length,
            "filename": file_path[file_path.rfind("\\")+1:]
        }
        conn.send(json.dumps(msg).encode())

    def get_file_data(self, *args):
        # 下载第二步,传送文件数据
        conn = args[1]
        file_path = args[0]["file_path"]
        file = open(file_path, "rb")
        for line in file:
            conn.send(line)
        else:
            file.close()

    def put(self, *args):
        # 上传第一步,接收文件大小和文件名
        conn = args[1]
        self.length = args[0]["length"]
        self.filename = PATH + args[0]["filename"]
        if os.path.isfile(self.filename):
            self.filename = self.filename + str(random.randint(1, 1000))
        self.file = open(self.filename, "wb")
        self.recv_size = 0
        conn.send(b"0")  # 避免客户端黏包

    def put_file_data(self, *args):
        # json报错后进入此循环接收数据
        conn = args[1]
        self.recv_size += len(args[0])
        self.file.write(args[0])
        if int(self.length) == self.recv_size:
            self.file.close()
            conn.send(b"0")


def run():
    data = conf.config.Configuration()
    conf_data = data.get_config()
    obj = SelectorFtp()
    obj.connect(conf_data[0][1], int(conf_data[1][1]))
servers

 

config:

技术分享图片
#!/usr/bin/env python
# -*-coding:utf-8-*-
# _author_=zh
import os
import configparser
PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


class Configuration(object):
    def __init__(self):
        self.config = configparser.ConfigParser()
        self.name = PATH+os.sep+"conf"+os.sep+"system.ini"

    def init_config(self):
        # 初始化配置文件,ip :客户端IP,port:客户端端口
        if not os.path.exists(self.name):
            self.config["config"] = {"ip": "localhost", "port": 1234}
            self.config.write(open(self.name, "w", encoding="utf-8", ))

    def get_config(self, head="config"):
        ‘‘‘
        获取配置文件数据
        :param head: 配置文件的section,默认取初始化文件config的数据
        :return:返回head中的所有数据(列表)
        ‘‘‘
        self.init_config()  # 取文件数据之前生成配置文件
        self.config.read(self.name, encoding="utf-8")
        if self.config.has_section(head):
            section = self.config.sections()
            return self.config.items(section[0])
config

 















































以上是关于SELECTORS模块实现并发简单版FTP的主要内容,如果未能解决你的问题,请参考以下文章

python之路——作业:Select FTP(仅供参考)

python第五十四天--第十周作业

python之selectors模块

python 协程, 异步IO Select 和 selectors 模块 多并发演示

selecter模块默认使用epoll 实现IO多路复用,展示单线程的并发效果

selectors实现高并发