Python 使用模块模拟并发处理

Posted ledgua

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 使用模块模拟并发处理相关的知识,希望对你有一定的参考价值。

socketserver模块

我们可以通过socketserver模块来模拟并发的一个状态

接下来我们通过使用socketserver来编写程序模拟并发的一个过程

程序编写:

思路:

  1. 用户注册登录界面
  2. 选择功能
    • 功能的选择
      • 上传
      • 下载
      • 查看文件
      • 删除文件
  3. 对TCP协议粘包的处理

了解粘包:

  • 粘包就是当数据过大或者过小,导致一直存储在内存中。过小导致多次数据同时发送,过大导致数据无法接收完全。

粘包代码:

服务器
import socket
#生成一个socket对象
soc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#绑定地址跟端口号
soc.bind(('127.0.0.1',8001))
#监听(半连接池的大小),不是连接数
soc.listen(3)
#等着客户端来连接,conn相当于连接通道,addr是客户端的地址
while True:
    print('等待客户端连接')
    conn,addr=soc.accept()    #卡主,如果没有客户端连接,会一直卡在这,当有连接,才继续往下走
    print('有个客户端连接上了',addr)
    while True:
        try:
            data=conn.recv(1024)
            print(data)
            data2=conn.recv(1024)
            print(data2)
            data3=conn.recv(1024)
            print(data3)
        except Exception:

            break
    # 关闭通道
    conn.close()


# 关闭套接字
soc.close()
客服端
import socket

soc=socket.socket()

soc.connect(('127.0.0.1',8001))
while True:
    in_s=input('请输入要发送的数据:')
    soc.send(b'a')
    soc.send(b'b')
    soc.send(b'c')

通过模块实现并发

登录界面代码实现:
import os
import json
import time
import nbK
def register():
    user_dict = 
        'username':None,
        'password':None
    
    while True:
        print('--------注册---------')
        name = input('请输入你的用户名').strip()
        pwd = input('请输入你要使用的密码:').strip()
        re_pwd = input('确认要使用的密码:').strip()
        if name == 'q' or name == 'Q':
            break
        if os.path.isdir(f'user/name'):
            print('用户已经存在')
            continue
        if pwd == re_pwd:
            print('注册成功')
            os.mkdir(f'user/name')
            user_dict['username'] = name
            user_dict['password'] = pwd
            with open(f'user_info/name.json','w') as fw:
                json.dump(user_dict,fw)
                fw.flush()
            break
def login():
    s = 3
    while True:
        print('--------登录---------')
        name = input('请输入你的用户名')
        pwd = input('请输入你的密码')
        if not os.path.exists(f'user_info/name.json'):
            while s>0:
                print(f'\r用户不存在,s-1s后跳转到注册界面',end='')
                time.sleep(1)
                s-=1
            print('\n')
            register()
            continue
        with open(f'user_info/name.json','r') as fr:
            user_dic = json.load(fr)
        if name == user_dic.get('username') and pwd == user_dic.get('password'):
            print('登录成功')
            nbK.run(name)
            break
        else:
            print('用户名密码不正确')


def run():
    FUN_DIC = 
        '1':register,
        '2':login
    
    while True:
        print('''
        请选择你的选项
        1——注册
        2——登录
        ''')
        select = input('请输入要选择的功能').strip()
        if not select.isdigit():
            print('请输入正确的选项')
        if not FUN_DIC.get(select):
            print('没有你要选择的功能')
        FUN_DIC[select]()
run()
客户端界面:
import socket
import struct
import os
soc = socket.socket()
soc.connect(('127.0.0.1', 8080))
user_dic = 'username':None
def uploading():
    soc.send(b'1')
    while True:
        fife_name = input('请输入你要上传的文件名,加后缀哦')
        if fife_name == 'q' or fife_name == 'Q':
            break
        if '.' not in fife_name:
            print('请重新输入你要上传的文件夹名')
            continue
        #传入用户长度
        user_len = struct.pack('i',len(user_dic['username']))
        soc.send(user_len)
        #传入用户名
        soc.send(bytes(user_dic['username'].encode('utf8')))
        #传入文件名长度
        fife_name_len = struct.pack('i',len(fife_name))
        soc.send(fife_name_len)
        # 传入文件名
        soc.send(bytes(fife_name.encode('utf8')))
        with open(fife_name,'rb') as fr:
            byts = fr.read()
        #传入数据长度
        b_len = struct.pack('i',len(byts))
        soc.send(b_len)
        # #传入数据
        soc.send(byts)
        print(f'上传文件fife_name成功')


#下载文件
def download():
    soc.send(b'2')
    while True:
        try:
            user_info_list = os.listdir(f'F:\py\9.10并发\\user_name\user_dic["username"]')
            for index,name in enumerate(user_info_list):
                print(index+1,name)
            select = input('请选择你要下载的文件')
            if select == 'q' or select == 'Q':
                break
            if len(user_info_list) < int(select):
                print('没有你想要下载的文件')
                continue
            # 传入用户长度
            user_len = struct.pack('i', len(user_dic['username']))
            soc.send(user_len)
            # 传入用户名
            soc.send(bytes(user_dic['username'].encode('utf8')))
            #获取文件名
            fife_name = user_info_list[int(select)-1]
            # 传入文件名长度
            fife_name_len = struct.pack('i', len(fife_name))
            soc.send(fife_name_len)
            # 传入文件名
            soc.send(bytes(fife_name.encode('utf8')))
            # 获取长度信息
            re_len = soc.recv(4)
            # 获取真正的长度
            l = struct.unpack('i', re_len)[0]
            # 进行数据拼接
            count = 0
            date_total = b''
            while count < l:
                if l < 1024:  # 如果接受的数据小于1024 ,直接接受数据大小
                    data = soc.recv(l)
                else:  # 如果接受的数据大于1024
                    if l - count >= 1024:  # 总数据长度-count(目前收到多少,count就是多少) 如果还大于1024  ,在收1024
                        data = soc.recv(1024)
                    else:  # 总数据长度-count(目前收到多少,count就是多少) 如果小于1024,只收剩下的部分就可
                        data = soc.recv(l - count)
                date_total += data
                count += len(data)
            with open(f'fife_name', 'wb') as fw:
                fw.write(date_total)
                print(f'下载文件fife_name成功')
        except Exception:
            print('请输入要下载的文件序号')
            continue



#删除文件
#########
#偷懒写法#
#########
def delfife():
    while True:
        user_info_list = os.listdir(f'F:\py\9.10并发\\user_name\user_dic["username"]')
        for index, name in enumerate(user_info_list):
            print(index + 1, name)
        select = input('请选择你要删除的文件')
        if select == 'q' or select == 'Q':
            break
        if len(user_info_list)<int(select):
            print('没有你想要删除的文件')
        os.remove(f'F:\py\9.10并发\\user_name\user_dic["username"]\user_info_list[int(select)-1]')
        print('删除成功')

def run(name):
    FUN_DIC = 
        '1': uploading,
        '2': download,
        '3':delfife
    
    user_dic['username'] = name
    while True:
        print('''
        请选择你需要的功能:
        1-上传文件
        2-下载文件
        3-删除文件
        ''')
        select = input('请输入要选择的功能:').strip()
        if select == 'q' or select == 'Q':
            break
        if not select.isdigit():
            print('请输入正确的选项')
        if not FUN_DIC.get(select):
            print('没有你要选择的功能')
        FUN_DIC[select]()
服务器端
import socketserver
import struct
user_dic = 'username':None
class Mytcp(socketserver.BaseRequestHandler):
    #重写handle方法
    def handle(self):
        try:
            while True:
                select = self.request.recv(1)
                select = int(select)
                if select == 1:
                    print(self.client_address)
                    while True:
                        #用户名长度
                        age_len = self.request.recv(4)
                        if not age_len:
                            break
                        #用户名真正长度
                        age_l = struct.unpack('i',age_len)[0]
                        user_dic['username'] = self.request.recv(age_l).decode('utf8')
                        #文件名长度
                        fife_len = self.request.recv(4)
                        #文件名真正长度
                        fife_l = struct.unpack('i',fife_len)[0]
                        #读取文件名
                        fife_name = self.request.recv(fife_l).decode('utf8')
                        #获取长度信息
                        re_len = self.request.recv(4)
                        #获取真正的长度
                        l = struct.unpack('i',re_len)[0]
                        #进行数据拼接
                        count = 0
                        date_total =b''
                        while count < l:
                            if l < 1024:  # 如果接受的数据小于1024 ,直接接受数据大小
                                data = self.request.recv(l)
                            else:  # 如果接受的数据大于1024
                                if l - count >= 1024:  # 总数据长度-count(目前收到多少,count就是多少) 如果还大于1024  ,在收1024
                                    data = self.request.recv(1024)
                                else:  # 总数据长度-count(目前收到多少,count就是多少) 如果小于1024,只收剩下的部分就可
                                    data = self.request.recv(l - count)
                            date_total += data
                            count += len(data)
                        with open(f'F:\py\9.10并发\\user_name\user_dic.get("username")\fife_name','wb') as fw:
                            fw.write(date_total)
                else:
                    while True:
                        # 用户名长度
                        age_len = self.request.recv(4)
                        if not age_len:
                            break
                        # 用户名真正长度
                        age_l = struct.unpack('i', age_len)[0]
                        user_dic['username'] = self.request.recv(age_l).decode('utf8')
                        # 文件名长度
                        fife_len = self.request.recv(4)
                        # 文件名真正长度
                        fife_l = struct.unpack('i', fife_len)[0]
                        # 读取文件名
                        fife_name = self.request.recv(fife_l).decode('utf8')

                        ##打开文件
                        with open(f'F:\py\9.10并发\\user_name\user_dic["username"]\fife_name','rb') as fr:
                            byts = fr.read()
                            # 传入数据长度
                            b_len = struct.pack('i', len(byts))
                            self.request.send(b_len)
                            # #传入数据
                            self.request.send(byts)



                #第一个参数是用于绑定地址,第二个参数传一个类
                #ThreadingTCPServer自动开线程处理链接状态
        except Exception:
            pass

server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),Mytcp)
    #一直监听
server.serve_forever()

以上是关于Python 使用模块模拟并发处理的主要内容,如果未能解决你的问题,请参考以下文章

Python之asyncio模块的使用

Python并发复习

python3.X并发socketserver模块

Python 多处理 CPU 绑定并发,无需克隆 main

SocketServer-实现并发处理

如何使用 python 的 asyncio 模块正确创建和运行并发任务?