老男孩Day8作业:FTP
Posted 空s蝉灬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了老男孩Day8作业:FTP相关的知识,希望对你有一定的参考价值。
1、作业需求
开发简单的FTP:
1. 用户登陆
2. 上传/下载文件
3. 不同用户家目录不同
4. 查看当前目录下文件
5. 充分使用面向对象知识
2、流程图
3、目录结构
4、代码区
bin目录下的start.py程序执行文件
# -*- coding:utf-8 -*- # Author:D.Gray import os,sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from core import ftp_client fc = ftp_client.FTP_client()
# -*- coding:utf-8 -*- # Author:D.Gray import os,sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from core import ftp_server fs = ftp_server.FTP_server()
conf目下的setting.py系统配置文件
# -*- coding:utf-8 -*- # Author:D.Gray import os,sys,socket BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) #IP地址和端口 IP_PORT = ("localhost",6969) #数据文件路径 USER_FILE = BASE_DIR + r"\\db\\user.db" #用户文件目录 USER_HOME = BASE_DIR
core目录下主程序文件
# -*- coding:utf-8 -*- # Author:D.Gray import os,sys,socket from conf import setting from core import ftp_server from core import users class FTP_client(object): def __init__(self): self.client = setting.socket.socket() self.client.connect(setting.IP_PORT) self.user_obj = users.Users() self.help_info = { "get":"用于上传文件,例如:get readme.txt 即 get 文件名", "put":"用于下载文件,例如:put readme.txt 即 put 文件名", \'dir\':"用于显示当前目录下文件或文件详细信息 格式 ls " } if self.auth(): self.start() def auth(self): \'\'\' 用户登录验证函数 :return: \'\'\' while True: username = input("请输入用户名>>>:").strip() pwd = input("请输入用户密码>>>:").strip() auth_info = \'auth %s %s\'%(username,pwd) #格式化输出 auth 用户名 密码 self.client.send(auth_info.encode()) #将格式化后的内容发送给服务端 back_res = self.client.recv(1024).decode() if back_res == "ok": print(\'认证成功\') user = self.user_obj.get_user(username) self.current_user = username self.current_pwd = pwd self.current_path = user[\'home\'] self.current_dir = back_res[1] return True elif back_res == "Not password": print("\\033[31;1m密码不正确\\033[0m") else: print("\\033[31;1m用户不存在\\033[0m") def start(self): \'\'\' 输入指令上传下载文件函数 :return: \'\'\' while True: user_input = input(\'%s>>>:\'%self.current_user).strip() if len(user_input) == 0:continue user_input = user_input.split() if user_input[0] == \'q\':break if hasattr(self,user_input[0]): func = getattr(self,user_input[0]) func(user_input) else: print("\\033[31;1m请输入有效指令\\033[0m") continue def put(self,cmd): \'\'\' 从服务器端下载文件函数 :param cmd: :return: \'\'\' print(\'in the put:\',cmd) send_server_info = \'%s %s\'%(cmd[0],cmd[1]) #格式化输出[\'方法\',\'文件名\'] self.client.send(send_server_info.encode()) #将格式化输出内容发送给服务器端 server_back = self.client.recv(1024).decode() #接收服务器回调结果 print("接收服务器回调信息:",server_back) if server_back == "302": print(\'\\033[31;1m文件不存在\\033[0m\') else: file_totle_size = int(server_back) #从服务器端接收文件大小 print("您要下载的文件大小为:%sByte"%file_totle_size) self.client.send(\'可以开始下载了...\'.encode()) rever_file_size = 0 #接收到的文件大小 file_name_path = setting.USER_HOME + self.current_path + \'\\\\user_home\\\\\' + cmd[1] #print(file_name_path) with open(file_name_path,"wb") as f: while rever_file_size < file_totle_size: if file_totle_size - rever_file_size < 1024: #当剩余文件大小<1024 全部接收文件 size = file_totle_size - rever_file_size else: size = 1024 data = self.client.recv(size) #当剩余文件<1024全部接收文件,当剩余文件>1024每次只接收1024 rever_file_size += len(data) #每次接收数据时自动累计rever_file_size值 print("已接收%sByte"%rever_file_size) f.write(data) else: print(\'接受完毕\') def get(self,cmd): \'\'\' 本地上次文件给服务器端 :param cmd: 接收用户通过start函数输入的操作指令 :return: \'\'\' print(cmd) file_path = setting.USER_HOME + self.current_path +\'\\\\user_home\\\\\' + cmd[1] if os.path.isfile(file_path): file_totle_size = os.stat(file_path).st_size print(\'您要上传文件大小为【%sByte】\'%file_totle_size) file_info = \'%s %s %s\'%(cmd[0],cmd[1],file_totle_size) #格式化输出[\'操作指令\',\'文件名\',\'文件大小\'] self.client.send(file_info.encode()) #将格式化输出内容发送给服务器端 server_back = self.client.recv(1024).decode() #接收回调信息 if server_back == "300": print(\'可以上传文件了...\') send_file_size = 0 with open(file_path,\'rb\') as f: while send_file_size != file_totle_size: if file_totle_size-send_file_size <= 1024: data = f.read(file_totle_size-send_file_size) send_file_size += file_totle_size - send_file_size else: data = f.read(1024) send_file_size += len(data) print("已上传【%sByte】"%send_file_size) self.client.send(data) print("上传成功") else: print(\'\\033[31;1m文件不存在\\033[0m\') def dir(self,cmd): \'\'\' 查看服务端目录文件信息 :param cmd: :return: \'\'\' print(cmd) send_server_info = \'%s\'%cmd[0] #格式化输出用户指令 self.client.send(send_server_info.encode()) server_back = self.client.recv(1024).decode() #接收服务端回调 print("获取服务端回调信息:%s"%server_back) self.client.send("ok".encode()) #发送给服务端\'ok\' recv_size = 0 recv_data = b\'\' while recv_size < int(server_back): data = self.client.recv(1024) recv_data += data recv_size = len(data) print(recv_size) else: print(recv_data.decode()) def help(self,cmd): \'\'\' 查看帮助文档函数 :param cmd: :return: \'\'\' print(cmd) d = self.help_info print(d)
# -*- coding:utf-8 -*- # Author:D.Gray import os,sys,socket from conf import setting from core import ftp_client from core import users class FTP_server(object): def __init__(self): self.server = setting.socket.socket() self.server.bind(setting.IP_PORT) self.server.listen(5) self.user_obj = users.Users() #导入users文件并实例化Users类 self.start() def start(self): print("等待链接中...") while True: self.conn,self.addr = self.server.accept() print("一个新的链接:%s %s"%(self.conn,self.addr)) while True: self.data = self.conn.recv(1024) #接受客户端格式化输出内容 #print(\'data:\',self.conn) if not self.data: print("客户端断开") break cmd_res = self.data.decode().split() #以列表形式获取用户输入的[[方法名],[文件名]] cmd_action = cmd_res[0] #获取方法名 #print(\'in the start获取方法名:\',cmd_action) if hasattr(self,cmd_action): #判断用户输入的方法名是否存在 func = getattr(self,cmd_action) #执行对应的方法函数 func(cmd_res) #用户输入的[[方法名],[文件名]]传给方法函数 else: print("\\033[31;1m请输入有效命令\\033[0m") def auth(self,cmd): #print(\'auth:\',cmd) #接受客户端格式化输出的 auth 用户名 密码 user = self.user_obj.get_user(cmd[1]) #调用Users类中get_user方法,并把cmd[1](用户名)传参给get_user方法 print(\'in the ftp_server_auth:\',user) if user: if user[\'password\'] == cmd[2]: self.current_user = user self.current_path = user["home"] self.user_home = setting.USER_HOME self.conn.send(b"ok") else: self.conn.send(b"Not password") else: self.conn.send(b\'Not username\') def put(self,cmd): \'\'\' 上传文件函数 :param cmd: 接收用户输入的[[方法名],[文件名]] :return: \'\'\' #print(\'in the put:\',cmd) file_name_path = self.user_home + self.current_path +\'\\server_home\\\\\'+ cmd[1] print(\'文件路径\',file_name_path) if os.path.isfile(file_name_path): file_totle_size = os.stat(file_name_path).st_size #查看文件大小 self.conn.send(str(file_totle_size).encode()) #将文件大小发送给客户端 self.conn.recv(1024) #接收客户端消息 with open(file_name_path,\'rb\') as f: for line in f: #循环遍历文件内容 self.conn.send(line) #并将文件内容发送给客户端 print("send done>>>") else: print(\'\\033[31;1m文件不存在\\033[0m\') self.conn.send(\'302\'.encode()) def get(self,cmd): \'\'\' 接收客户端上传文件函数 :param cmd: :return: \'\'\' print(cmd) file_path = self.user_home + self.current_path + \'\\server_home\\\\\' + cmd[1] #文件路径 file_totle_size = cmd[2] #接收客户端上传文件大小 file_totle_size = int(file_totle_size) with open(file_path,\'wb\') as f: self.conn.send(\'300\'.encode()) #返回客户端参数300 revered_file_size = 0 #初始接收文件大小 while revered_file_size < file_totle_size: #开始接收客户端上传文件 if file_totle_size - revered_file_size <= 1024: size = file_totle_size-revered_file_size else: size = 1024 data = self.conn.recv(size) revered_file_size += len(data) f.write(data) else: print("文件接收完毕") def dir(self,cmd): \'\'\' 查看服务端目录文件信息函数 :param cmd: :return: \'\'\' print(cmd) file_path = self.user_home + self.current_path + \'\\server_home\\\\\' res = os.popen(\'%s %s\'%(cmd[0],file_path)).read() print(\'服务端文件目录信息:\',res) if len(res) == 0 : res = \'cmd has not output\' self.conn.send(str(len(res)).encode()) #服务端发送目录文件大小给客户端 self.conn.recv(1024) #接收客户端回调信息 "ok" self.conn.send(res.encode()) #服务端发送目录文件信息给客户端
#-*- Coding:utf-8 -*- # Author: D.Gray import os,sys,shelve from conf import setting class Users(object): def __init__(self): self.user_file = setting.USER_FILE self.users_read = self.read_users() def read_users(self): print(self.user_file) with open(self.user_file, \'r\') as f: user_read = eval(f.read()) return user_read def get_users(self): print(self.users_read) return self.users_read def get_user(self,username): for user in self.users_read: print(\'in the User_get_user:\',user) if user["username"] == username: return user
db目录下的数据文件
[ { "username":"alex", "password":"admin", "home":"\\\\home\\\\alex\\\\", } ]
以上是关于老男孩Day8作业:FTP的主要内容,如果未能解决你的问题,请参考以下文章