简单的单进程FTP服务器的实现
Posted 江湖乄夜雨
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了简单的单进程FTP服务器的实现相关的知识,希望对你有一定的参考价值。
一、功能说明:
1.本程序基于socket实现客户端与服务器端的单进程交互
2.用到的用户名:whw,密码abc123——服务器端密码的验证利用hashlib模块进行MD5的编码以确保通信安全。
3.客户端登陆成功后可以查看自己再服务器上的文件夹里文件的列表;可以在自己所在的目录随意切换;可以将服务器端自己文件夹中的文件下载到客户端;可以将自己本端的文件下载到服务器端自己的文件夹里去
4.客户端上传并下载文件有日志记录
二、目录结构
WHW_FTP
├── client
│ ├── bin #客户端入口程序目录(客户端文件保存的目录)
│ │ └── whw_client.py #客户端的入口程序
│ │
│ └── logics #配置文件目录
│ └── ftp_client.py #客户端与服务器端交互的逻辑
│
├── server
│ ├── bin #服务器端入口程序目录
│ │ └── whw_server.py #服务器端的入口程序
│ │
│ ├── conf #存放的是用户信息与程序用到的参数
│ │ ├── accounts.ini #客户信息
│ │ └── settings.py #程序用到的其他固定参数
│ │
│ └── core #服务器端程序的主逻辑存放地
│ │ ├── ftp_server.py #服务器端与客户端交互的程序
│ │ └── logger.py #记录日志的逻辑
│ │ └── management.py #负责处理客户端命令行参数的逻辑
│ │
│ └── home #存放的是用户目录(用户在server端的文件存放于此)
│ │ └── whw #客户文件夹
│ └── log #存放的是日志信息
│ └── whw.log #日志文件
└── README.txt
三、程序源代码
import os import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) if __name__ == \'__main__\': from logics import ftp_client client = ftp_client.Ftp_client() client.interactive() print(BASE_DIR)
import optparse import socket import json import os class Ftp_client: \'\'\'ftp客户端\'\'\' MSG_SIZE = 1024 #消息最长1024 def __init__(self): #初始化 self.username = None parser = optparse.OptionParser() parser.add_option("-s", "--server", dest="server", help="ftp server ip_addr") parser.add_option("-P", "--port", type="int", dest="port", help="ftp server port") parser.add_option("-u", "--username", dest="username", help="username info") parser.add_option("-p", "--password", dest="password", help="password info") #传参 self.options, self.args = parser.parse_args() #检查参数是否合法 self.argv_verification() #建立连接 self.make_connection() def argv_verification(self): \'\'\'检查参数合法性\'\'\' #客户端-s跟-P后面不能跟空 if not self.options.server or not self.options.port: exit(\'Error:must supply server and port parameters!\') def interactive(self): """处理与Ftpserver的所有交互""" if self.auth(): while 1: user_input = input(\'[%s]>>:\' % self.username).strip() if not user_input: continue # 将命令分割切片~~~ cmd_list = user_input.split() # 反射~~~ if hasattr(self, \'_%s\' % cmd_list[0]): func = getattr(self, \'_%s\' % cmd_list[0]) func(cmd_list[1:]) def auth(self): """用户认证!!!""" count = 0 while count < 3: username = input("username:").strip() if not username: continue password = input("password:").strip() cmd = { \'action_type\': \'auth\', \'username\': username, \'password\': password } #给server发送客户端用户的认证信息 self.whw_sock.send(json.dumps(cmd).encode("utf-8")) #然后接收server的反馈 response = self.get_response() if response.get(\'status_code\') == 200: # pass auth self.username = username self.terminal_display = "[%s]>>:" % self.username self.current_dir = "\\\\" return True else: print(response.get("status_msg")) count += 1 def get_response(self): """获取服务器端返回的信息""" data = self.whw_sock.recv(self.MSG_SIZE) return json.loads(data.decode(\'utf-8\')) def make_connection(self): \'\'\'建立socket链接\'\'\' self.whw_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #注意optparse模块得到的是两个对象,而不是字典与列表!要用对象的操作取值 self.whw_sock.connect((self.options.server,self.options.port)) def parameter_check(self,args,min_args=None,max_args=None,exact_args=None): \'\'\'命令参数个数合法性检查\'\'\' if min_args: if len(args) < min_args: print(\'must provide at least %s parameters,but %s received!\'%(min_args,len(args))) return False if max_args: if len(args)>max_args: print(\'must provide at most %s parameters,but %s received!\' % (max_args, len(args))) return False if exact_args: if len(args)!=exact_args: print(\'need exactly %s parameters,but %s received!\' % (exact_args, len(args))) return False return True def send_msg(self,action_type,**kwargs ): """打包消息并发送到远程""" msg_data = { \'action_type\': action_type, \'fill\':\'\'#做成定长的 } #把两个字典合成一个,update方法 msg_data.update(kwargs) bytes_msg = json.dumps(msg_data).encode(\'utf-8\') if len(bytes_msg) < self.MSG_SIZE: msg_data[\'fill\'] = msg_data[\'fill\'].zfill( self.MSG_SIZE - len(bytes_msg)) bytes_msg = json.dumps(msg_data).encode() self.whw_sock.send(bytes_msg) def _get(self,cmd_args): \'\'\'从ftp server下载\'\'\' if self.parameter_check(cmd_args,min_args=1): filename = cmd_args[0] self.send_msg(action_type=\'get\',filename=filename) response = self.get_response() if response.get(\'status_code\') == 301: file_size = response.get(\'filesize\') receive_size = 0 #进度条功能 progress_generator = self.progress_bar(file_size) progress_generator.__next__() #注意打开方式为wb f = open(\'%s.download\'%filename,\'wb\') #循环接收 while receive_size < file_size: if file_size - receive_size <8192:#last recv data = self.whw_sock.recv(file_size-receive_size) else: data = self.whw_sock.recv(8192) receive_size += len(data) f.write(data) #打印进度条 progress_generator.send(receive_size) else: print(\'\\n\') print(\'---file [%s] recv done,received size [%s]---\'% (filename,file_size)) f.close() os.replace(\'%s.download\'%filename,filename) else: print(response.get(\'status_msg\')) def _ls(self,args): \'\'\'显示当前目录的文件列表\'\'\' self.send_msg(action_type=\'ls\') response = self.get_response() #定长的 1024 if response.get(\'status_code\') == 302: cmd_result_size = response.get(\'cmd_result_size\') received_size = 0 cmd_resule = b\'\' while received_size < cmd_result_size: #最后一次接收 小于8192 if cmd_result_size - received_size < 8192: data = self.whw_sock.recv(cmd_result_size - received_size) else: data = self.whw_sock.recv(8192) cmd_resule += data received_size += len(data) else: #windows上gbk解码 print(cmd_resule.decode(\'gbk\')) def _cd(self,cmd_args): """切换目录""" #只能跟一个参数 if self.parameter_check(cmd_args, exact_args=1): target_dir = cmd_args[0] self.send_msg(\'cd\',target_dir=target_dir) response = self.get_response() if response.get("status_code") == 350:#dir changed self.terminal_display = "[/%s]" % response.get(\'current_dir\') self.current_dir = response.get(\'current_dir\') def progress_bar(self,total_size,current_percent=0,last_percent=0): \'\'\'进度条功能\'\'\' while 1: received_size = yield current_percent current_percent = int(received_size / total_size *100) if current_percent > last_percent: print("*" * int(current_percent / 2) + "{percent}%".format(percent=current_percent), end=\'\\r\', flush=True) last_percent = current_percent # 把本次循环的percent赋值给last def _put(self,cmd_args): """上传本地文件到服务器""" #先检查命令的合法性 if self.parameter_check(cmd_args, exact_args=1): local_file = cmd_args[0] if os.path.isfile(local_file): total_size = os.path.getsize(local_file) self.send_msg(\'put\',file_size=total_size,filename=local_file) f = open(local_file,\'rb\') uploaded_size = 0 progress_generator = self.progress_bar(total_size) progress_generator.__next__() for line in f: self.whw_sock.send(line) uploaded_size += len(line) progress_generator.send(uploaded_size) else: print(\'\\n\') print(\'file upload done\'.center(50,\'-\')) f.close()
import os import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) if __name__ == \'__main__\': from core import management argv_parser = management.Management_tool(sys.argv) # 解析并执行指令 argv_parser.execute()
[whw] name = WangHongWei password = e99a18c428cb38d5f260853678922e03 expire = 2019-01-01
import os BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) HOST = \'0.0.0.0\' PORT = 9090 #用户的家目录 USER_HOME_DIR = os.path.join(BASE_DIR,\'home\') #账户信息 ACCOUNT_FILE = os.path.join(BASE_DIR,\'conf\',\'accounts.ini\') MAX_SOCKET_LISTEN = 5 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) #log文件目录 LOGGING_FILE = os.path.join(BASE_DIR,\'log\',\'whw.log\')
import socket import json import configparser import hashlib import os import subprocess import time from conf import settings from core import logger class Ftp_server: \'\'\'处理与客户端所有交互的socket server\'\'\' #提前定义 交互信息的状态码 STATUS_CODE = { 200: "Passed authentication!", 201: "Wrong username or password!", 300: "File does not exist !", 301: "File exist , and this msg include the file size- !", 302: "This msg include the msg size!", 350: "Dir changed !", 351: "Dir doesn\'t exist !", 401: "File exist ,ready to re-send !", 402: "File exist ,but file size doesn\'t match!", } #消息最长定义为1024 大于的话另做处理 # 消息长度最长为1024 MSG_SIZE = 1024 def __init__(self,management_instance): #可以调用management的实例对象 self.management_instance = management_instance #server启动需要的参数——从settings中取 self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.sock.bind((settings.HOST,settings.PORT)) self.sock.listen(settings.MAX_SOCKET_LISTEN) #用户信息 self.accounts = self.load_accounts() #存放用户的登陆信息 self.user_obj = None #用户当前目录信息 self.user_current_dir = None def run_forever(self): \'\'\'启动socket server\'\'\' print(\'Starting whw_server on %s:%s\'.center(50,\'*\') % (settings.HOST,settings.PORT) ) while 1: #accept()接收client发送的指令信息 self.request,self.addr = self.sock.accept() print(\'got a new connection from %s...\' % (self.addr,)) try: #所有的交互放到handle方法里 self.handle() except Exception as e: print(\'Error happened with client,close connection\',e) self.request.close() def handle(self): \'\'\'处理与用户的所有指令的交互\'\'\' #循环接收数据 while 1: #服务器收到的信息 raw_data = self.request.recv(self.MSG_SIZE) #空信息...断开链接 if not raw_data: print(\'connection %s is lost...\' % (self.addr,)) #删除链接信息、 del self.request,self.addr break data = json.loads(raw_data.decode(\'utf-8\'))#str action_type = data.get(\'action_type\') #反射 根据指令类型调用相应的方法 if action_type: if hasattr(self,\'_%s\'%action_type): func = getattr(self,\'_%s\'%action_type) func(data) else: print(\'invalid command!\') def load_accounts(self): \'\'\'加载所有账号信息\'\'\' config_obj = configparser.ConfigParser() config_obj.read(settings.ACCOUNT_FILE) print(\'所有用户名:\',config_obj.sections()) return config_obj def _auth(self, data): """处理用户认证请求""" print("auth ", data) if self.authenticate(data.get(\'username\'), data.get(\'password\')): print(\'pass auth....\') self.send_response(status_code=200) else: self.send_response(status_code=201) def authenticate(self,username,password): \'\'\'用户认证方法\'\'\' if username in self.accounts: _password = self.accounts[username][\'password\'] md5_obj = hashlib.md5() md5_obj.update(password.encode()) md5_password = md5_obj.hexdigest() if md5_password == _password: # 认证成功后 把用户的信息存下来 self.user_obj = self.accounts[username] #认证成功后 把用户文件的位置存下来 self.user_obj[\'home\'] = os.path.join(settings.USER_HOME_DIR,username) #ls方法用到,让用户觉得切换到用户的目录了 self.user_current_dir = self.user_obj[\'home\'] return True else: print(\'wrong username or password~\') return False else: print(\'wrong username or password~~\') return False def send_response(self,status_code,*args,**kwargs): """打包发送消息给客户端#用户信息状态码与状态的对应关系""" data = kwargs data[\'status_code\'] = status_code data[\'status_msg\'] = self.STATUS_CODE[status_code] data[\'fill\'] = \'\' bytes_data = json.dumps(data).encode(\'utf-8\') #制作定长的报头 if len(bytes_data) < self.MSG_SIZE: #zfill——返回指定长度字符串,原字符串右对齐,前面填充0 data[\'fill\'] = data[\'fill\'].zfill(self.MSG_SIZE - len(bytes_data)) bytes_data = json.dumps(data).encode(\'utf-8\') #将信息发送给客户端 self.request.send(bytes_data) def _get(self,data): \'\'\'客户端下载文件需要的方法\'\'\' file_name = data.get(\'filename\') #这里需要将用户的home路径与文件名拼接,注意这里home的调用方法############## full_path = os.path.join(self.user_obj[\'home\'],file_name) if os.path.isfile(full_path): file_size = os.stat(full_path).st_size self.send_response(301,filesize=file_size) print(\'ready to send file\') #开始发送文件 f = open(full_path,\'rb\') for line in f: self.request.send(line) else: print(\'file 【%s】 send done...\'%file_name) f.close() logger.write_logger(\'客户<%s>从服务器下载文件【%s】\' % (self.user_obj[\'java 简单的代码片段,展示如何将javaagent附加到运行JVM进程