服务端解析
Posted dominic-ji
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了服务端解析相关的知识,希望对你有一定的参考价值。
服务端
项目目录结构
conf目录
setting.py:配置信息相关
db目录
models.py:数据库表对应程序中的类
interface目录
admin_interface.py:管理员相关操作的接口
common_interface.py:公共操作的相关接口(登录,注册)
user_interface.py:用户相关操作的接口
lib目录
common.py:公共方法
orm目录:
fuckorm.py:单例版orm框架
mysql_singleton.py:数据库连接类
ormpool目录:
db_pool.py:数据库链接池
fuckorm_pool.py:连接池版orm框架
mysql_pool.py:连接池版数据库连接类
server目录:
tcpServer.py:服务端核心代码
use_data.py:存放用户信息,和全局锁
movie_list目录:
存放客户端上传上来的电影
start.py:启动文件
各文件功能代码
setting.py
import os host = ‘127.0.0.1‘ port = 3306 user = ‘root‘ password = ‘123456‘ database = ‘youku2‘ charset = ‘utf8‘ autocommit = True BASE_DIR = os.path.dirname(os.path.dirname(__file__)) BASE_DB = os.path.join(BASE_DIR, ‘db‘) BASE_MOVIE = os.path.join(BASE_DIR, ‘movie‘) BASE_MOVIE_LIST = os.path.join(BASE_DIR, ‘movie_list‘) server_address = (‘127.0.0.1‘, 8087)
models.py
# 用单例版 # from orm.fuckorm import Model, StringField, IntegerField # 用池版 from ormpool.fuckorm_pool import Model, StringField, IntegerField class User(Model): table_name = ‘userinfo‘ id = IntegerField(‘id‘, primary_key=True) name = StringField(‘name‘) password = StringField(‘password‘) locked = IntegerField(‘locked‘, default=0) is_vip= IntegerField(‘is_vip‘,default=0) user_type = StringField(‘user_type‘) class Movie(Model): table_name = ‘movie‘ id = IntegerField(‘id‘, primary_key=True) name = StringField(‘name‘) path = StringField(‘path‘) is_free = IntegerField(‘is_free‘,default=1) is_delete = IntegerField(‘is_delete‘,default=0) create_time = StringField(‘create_time‘) user_id = IntegerField(‘user_id‘) file_md5=StringField(‘file_md5‘) class Notice(Model): table_name = ‘notice‘ id = IntegerField(‘id‘, primary_key=True) name = StringField(‘name‘) content = StringField(‘content‘) user_id = IntegerField(‘user_id‘) create_time = StringField(‘create_time‘) class DownloadRecord(Model): table_name = ‘download_record‘ id = IntegerField(‘id‘, primary_key=True) user_id = IntegerField(‘user_id‘) movie_id = IntegerField(‘movie_id‘)
admin_interface.py
from conf import setting import os from db import models from lib import common @common.login_auth def upload_movie(user_dic, conn): ‘‘‘ 上传视频功能 :param user_dic: :param conn: :return: ‘‘‘ recv_size = 0 print(‘----->‘, user_dic[‘file_name‘]) file_name = common.get_uuid(user_dic[‘file_name‘]) + user_dic[‘file_name‘] path = os.path.join(setting.BASE_MOVIE_LIST, file_name) with open(path, ‘wb‘) as f: while recv_size < user_dic[‘file_size‘]: recv_data = conn.recv(1024) f.write(recv_data) recv_size += len(recv_data) # print(‘recvsize:%s filesize:%s‘ % (recv_size, user_dic[‘file_size‘])) print(‘%s :上传成功‘ % file_name) movie = models.Movie(name=file_name, path=path, is_free=user_dic[‘is_free‘], user_id=user_dic[‘user_id‘], file_md5=user_dic[‘file_md5‘]) movie.save() back_dic = {‘flag‘: True, ‘msg‘: ‘上传成功‘} common.send_back(back_dic,conn) @common.login_auth def delete_movie(user_dic,conn): ‘‘‘ 删除视频,不是真正的删除,在视频表中的is_delete字段设为1 :param user_dic: :param conn: :return: ‘‘‘ movie = models.Movie.select_one(id=user_dic[‘movie_id‘]) movie.is_delete = 1 movie.update() back_dic = {‘flag‘: True, ‘msg‘: ‘电影删除成功‘} common.send_back(back_dic, conn) @common.login_auth def release_notice(user_dic,conn): ‘‘‘ 发布公告功能,取出字典中的公告名字,公告内容,用户id,存入数据库 :param user_dic: :param conn: :return: ‘‘‘ notice = models.Notice(name=user_dic[‘notice_name‘], content=user_dic[‘notice_content‘], user_id=user_dic[‘user_id‘]) notice.save() back_dic = {‘flag‘: True, ‘msg‘: ‘公告发布成功‘} common.send_back(back_dic, conn) @common.login_auth def check_movie(user_dic,conn): ‘‘‘ 通过md5校验数据中是否该电影已经存在了 :param user_dic: :param conn: :return: ‘‘‘ movie = models.Movie.select_one(file_md5=user_dic[‘file_md5‘]) if movie: back_dic = {‘flag‘: False, ‘msg‘: ‘该电影已经存在‘} else: back_dic = {‘flag‘: True} common.send_back(back_dic,conn)
common_interface.py
from db import models from lib import common from interface import user_interface from server import use_data as da def login(user_dic, conn): ‘‘‘ 登录功能,登录成功,将用户信息以{"addr":[session,user_id]}的形式,放到内存中, 多线程操作,必须加锁,锁需要在主线程中生成 :param user_dic: :param conn: :return: ‘‘‘ user = models.User.select_one(name=user_dic[‘name‘]) if user: # 用户存在 if user.user_type == user_dic[‘user_type‘]: if user.password == user_dic[‘password‘]: session = common.get_uuid(user_dic[‘name‘]) da.mutex.acquire() if user_dic[‘addr‘] in da.alive_user: # 如果当前的客户端已经登录,再次登录的时候,把原来的用户踢出,再重新加入进去 da.alive_user.pop(user_dic[‘addr‘]) da.alive_user[user_dic[‘addr‘]] = [session, user.id] da.mutex.release() back_dic = {‘flag‘: True, ‘session‘: session, ‘is_vip‘: user.is_vip, ‘msg‘: ‘login success‘} if user_dic[‘user_type‘] == ‘user‘: last_notice = user_interface.check_notice_by_count(1) back_dic[‘last_notice‘] = last_notice else: back_dic = {‘flag‘: False, ‘msg‘: ‘password error‘} else: back_dic = {‘flag‘: False, ‘msg‘: ‘登录类型不匹配‘} else: back_dic = {‘flag‘: False, ‘msg‘: ‘user do not exisit‘} common.send_back(back_dic, conn) def register(user_dic, conn): ‘‘‘ 注册功能 :param user_dic: :param conn: :return: ‘‘‘ user = models.User.select_one(name=user_dic[‘name‘]) if user: # 用户存在 back_dic = {‘flag‘: False, ‘msg‘: ‘user is exisit‘} else: user = models.User(name=user_dic[‘name‘], password=user_dic[‘password‘], user_type=user_dic[‘user_type‘]) user.save() back_dic = {‘flag‘: True, ‘msg‘: ‘register success‘} common.send_back(back_dic, conn)
user_interface.py
# 注册(用手机号注册,密码用md5加密) # 登录(登录后显示最新一条公告) # 冲会员 # 查看视频(即将所有视频循环打印出来) # 下载普通视频(非会员下载视频需要等30s广告,会员下载无需等待) # 下载收费视频(非会员下载需要10元,会员下载需要5元) # 查看观影记录(就是查看自己下载过的视频) # 查看公告(包括历史公告) from db import models import os from lib import common @common.login_auth def buy_member(user_dic, conn): ‘‘‘ 购买会员功能,直接将is_vip字段设为1 :param user_dic: :param conn: :return: ‘‘‘ user = models.User.select_one(id=user_dic[‘user_id‘]) user.is_vip = 1 user.update() back_dic = {‘flag‘: True, ‘msg‘: ‘buy success‘} common.send_back(back_dic, conn) @common.login_auth def get_movie_list(user_dic, conn): ‘‘‘ 获取视频列表:取出全部视频,过滤掉删除的视频,根据前台传来的查询条件,把电影放到列表里 :param user_dic: :param conn: :return: ‘‘‘ back_dic = {} movie_list = models.Movie.select_all() back_movie_list = [] if movie_list: # 不为空,继续查询,为空直接返回false for movie in movie_list: if not movie.is_delete: # 拼成一个列表[‘电影名字‘,‘收费/免费‘,‘电影id‘] if user_dic[‘movie_type‘] == ‘all‘: # 全部 back_movie_list.append([movie.name, ‘免费‘ if movie.is_free else ‘收费‘, movie.id]) elif user_dic[‘movie_type‘] == ‘free‘: # 免费电影 if movie.is_free: # 免费的才往列表里放 back_movie_list.append([movie.name, ‘免费‘, movie.id]) else: # 收费电影 if not movie.is_free: # 收费的才往列表里放 back_movie_list.append([movie.name, ‘收费‘, movie.id]) if back_movie_list: back_dic = {‘flag‘: True, ‘movie_list‘: back_movie_list} else: back_dic = {‘flag‘: False, ‘msg‘: ‘暂无可查看影片‘} else: back_dic = {‘flag‘: False, ‘msg‘: ‘暂无影片‘} common.send_back(back_dic, conn) @common.login_auth def download_movie(user_dic, conn): movie = models.Movie.select_one(id=user_dic[‘movie_id‘]) if not movie: # 电影不存在,返回false back_dic = {‘flag‘: False, ‘msg‘: ‘该电影不存在‘} common.send_back(back_dic, conn) return user = models.User.select_one(id=user_dic[‘user_id‘]) send_back_dic = {‘flag‘: True} if user_dic[‘movie_type‘] == ‘free‘: # 下载免费电影,非会员需要等待;下载收费电影,不需要等待了直接下 if user.is_vip: send_back_dic[‘wait_time‘] = 0 else: send_back_dic[‘wait_time‘] = 30 send_back_dic[‘filename‘] = movie.name send_back_dic[‘filesize‘] = os.path.getsize(movie.path) # 把下载记录保存到记录表中 down_record = models.DownloadRecord(user_id=user_dic[‘user_id‘], movie_id=movie.id) down_record.save() common.send_back(send_back_dic, conn) with open(movie.path, ‘rb‘)as f: for line in f: conn.send(line) @common.login_auth def check_notice(user_dic, conn): ‘‘‘ 查看公告功能 :param user_dic: :param conn: :return: ‘‘‘ # 直接调用通过条数查询的接口,传入None表示全查 notice_list = check_notice_by_count(count=None) if notice_list: back_dic={‘flag‘: True, ‘notice_list‘: notice_list} else: back_dic={‘flag‘: False, ‘msg‘: ‘暂无公告‘} common.send_back(back_dic, conn) def check_notice_by_count(count=None): ‘‘‘ 查看功能的方法,供内部调用 count 为None,查全部,为1 查一条 :param count: :return: ‘‘‘ notice_list = models.Notice.select_all() back_notice_list = [] if notice_list: # 不为空,继续查询,为空直接返回false if not count: for notice in notice_list: back_notice_list.append({notice.name: notice.content}) else: # 查一条 notice_list=sorted(notice_list,key=lambda notice:notice.create_time) last_row=len(notice_list)-1 back_notice_list.append({notice_list[last_row].name: notice_list[last_row].content}) return back_notice_list else: return False @common.login_auth def check_download_record(user_dic, conn): ‘‘‘ 查看下载记录: 先通过user_id到DownloadRecord表中查到下载的每一条记录, 通过每一条记录中的电影id再去电影表查询电影,取出名字,返回 :param user_dic: :return: ‘‘‘ download_record = models.DownloadRecord.select_all(user_id=user_dic[‘user_id‘]) if not download_record: back_dic = {‘flag‘: False, ‘msg‘: ‘暂无观影记录‘} common.send_back(back_dic, conn) else: download_list = [] for record in download_record: movie = models.Movie.select_one(id=record.movie_id) download_list.append(movie.name) back_dic = {‘flag‘: True, ‘msg‘: ‘buy success‘, ‘download_list‘: download_list} common.send_back(back_dic, conn)
common.py
import hashlib import os import time import json import struct def login_auth(func): def wrapper(*args, **kwargs): from server import use_data as mu for value in mu.alive_user.values(): if value[0] == args[0][‘session‘]: args[0][‘user_id‘] = value[1] break if not args[0].get(‘user_id‘, None): send_back({‘flag‘: False, ‘msg‘: ‘您没有登录‘}, args[1]) else: return func(*args, **kwargs) return wrapper def get_uuid(name): md = hashlib.md5() md.update(name.encode(‘utf-8‘)) md.update(str(time.clock()).encode(‘utf-8‘)) return md.hexdigest() def get_time(): now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) return now_time def get_colck_time(): return str(time.clock()) def get_bigfile_md5(file_path): if os.path.exists(file_path): md = hashlib.md5() filesize = os.path.getsize(file_path) file_list = [0, filesize // 3, (filesize // 3) * 2, filesize - 10] with open(file_path, ‘rb‘) as f: for line in file_list: f.seek(line) md.update(f.read(10)) return md.hexdigest() def send_back(back_dic, conn): head_json_bytes = json.dumps(back_dic).encode(‘utf-8‘) conn.send(struct.pack(‘i‘, len(head_json_bytes))) # 先发报头的长度 conn.send(head_json_bytes)
funcorm.py
from orm import mysql_singleton class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name # 列名 self.column_type = column_type # 数据类型 self.primary_key = primary_key # 是否为主键 self.default = default # 默认值 class StringField(Field): def __init__(self, name=None, column_type=‘varchar(100)‘, primary_key=False, default=None): super().__init__(name, column_type, primary_key, default) class IntegerField(Field): def __init__(self, name=None, primary_key=False, default=0): super().__init__(name, ‘int‘, primary_key, default) class ModelMetaclass(type): def __new__(cls, name, bases, attrs): if name == "Model": return type.__new__(cls, name, bases, attrs) table_name = attrs.get(‘table_name‘, None) if not table_name: raise TypeError(‘没有表名‘) primary_key = None # 查找primary_key字段 # 保存列类型的对象 mappings = dict() for k, v in attrs.items(): # 是列名的就保存下来 if isinstance(v, Field): mappings[k] = v if v.primary_key: # 找到主键: if primary_key: raise TypeError(‘主键重复: %s‘ % k) primary_key = k for k in mappings.keys(): attrs.pop(k) if not primary_key: raise TypeError(‘没有主键‘) # 给cls增加一些字段: attrs[‘mapping‘] = mappings attrs[‘primary_key‘] = primary_key attrs[‘table_name‘] = table_name return type.__new__(cls, name, bases, attrs) class Model(dict, metaclass=ModelMetaclass): def __init__(self, **kw): super(Model, self).__init__(**kw) def __getattr__(self, key): # .访问属性触发 try: return self[key] except KeyError: raise AttributeError(‘没有属性:%s‘ % key) def __setattr__(self, key, value): self[key] = value @classmethod def select_all(cls, **kwargs): ms = mysql_singleton.Mysql().singleton() if kwargs: # 当有参数传入的时候 key = list(kwargs.keys())[0] value = kwargs[key] sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace(‘?‘, ‘%s‘) re = ms.select(sql, value) else: # 当无参传入的时候查询所有 sql = "select * from %s" % cls.table_name re = ms.select(sql) return [cls(**r) for r in re] @classmethod def select_one(cls, **kwargs): # 此处只支持单一条件查询 key = list(kwargs.keys())[0] value = kwargs[key] ms = mysql_singleton.Mysql().singleton() sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace(‘?‘, ‘%s‘) re = ms.select(sql, value) if re: return cls(**re[0]) else: return None def save(self): ms = mysql_singleton.Mysql().singleton() fields = [] params = [] args = [] for k, v in self.mapping.items(): fields.append(v.name) params.append(‘?‘) args.append(getattr(self, k, v.default)) sql = "insert into %s (%s) values (%s)" % (self.table_name, ‘,‘.join(fields), ‘,‘.join(params)) sql = sql.replace(‘?‘, ‘%s‘) ms.execute(sql, args) def update(self): ms = mysql_singleton.Mysql().singleton() fields = [] args = [] pr = None for k, v in self.mapping.items(): if v.primary_key: pr = getattr(self, k, v.default) else: fields.append(v.name + ‘=?‘) args.append(getattr(self, k, v.default)) sql = "update %s set %s where %s = %s" % ( self.table_name, ‘, ‘.join(fields), self.primary_key, pr) sql = sql.replace(‘?‘, ‘%s‘) print(sql) ms.execute(sql, args)
mysql_singleton.py
from conf import setting import pymysql class Mysql: __instance = None def __init__(self): self.conn = pymysql.connect(host=setting.host, user=setting.user, password=setting.password, database=setting.database, charset=setting.charset, autocommit=setting.autocommit) self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) def close_db(self): self.conn.close() def select(self, sql, args=None): self.cursor.execute(sql, args) rs = self.cursor.fetchall() return rs def execute(self, sql, args): try: self.cursor.execute(sql, args) affected = self.cursor.rowcount # self.conn.commit() except BaseException as e: print(e) return affected @classmethod def singleton(cls): if not cls.__instance: cls.__instance = cls() return cls.__instance
db_pool.py
import pymysql from conf import setting from DBUtils.PooledDB import PooledDB POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=6, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。 ping=0, # ping MySQL服务端,检查是否服务可用。 host=setting.host, port=setting.port, user=setting.user, password=setting.password, database=setting.database, charset=setting.charset, autocommit=setting.autocommit )
funcorm_pool.py
from ormpool import mysql_pool class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name # 列名 self.column_type = column_type # 数据类型 self.primary_key = primary_key # 是否为主键 self.default = default # 默认值 class StringField(Field): def __init__(self, name=None, ddl=‘varchar(100)‘, primary_key=False, default=None): super().__init__(name, ddl, primary_key, default) class IntegerField(Field): def __init__(self, name=None, primary_key=False, default=0): super().__init__(name, ‘int‘, primary_key, default) class ModelMetaclass(type): def __new__(cls, name, bases, attrs): if name == "Model": return type.__new__(cls, name, bases, attrs) table_name = attrs.get(‘table_name‘, None) if not table_name: raise TypeError(‘没有表名‘) primary_key = None # 查找primary_key字段 # 保存列类型的对象 mappings = dict() for k, v in attrs.items(): # 是列名的就保存下来 if isinstance(v, Field): mappings[k] = v if v.primary_key: # 找到主键: if primary_key: raise TypeError(‘主键重复: %s‘ % k) primary_key = k for k in mappings.keys(): attrs.pop(k) if not primary_key: raise TypeError(‘没有主键‘) # 给cls增加一些字段: attrs[‘mapping‘] = mappings attrs[‘primary_key‘] = primary_key attrs[‘table_name‘] = table_name return type.__new__(cls, name, bases, attrs) class Model(dict, metaclass=ModelMetaclass): def __init__(self, **kw): super(Model, self).__init__(**kw) def __getattr__(self, key): # .访问属性触发 try: return self[key] except KeyError: raise AttributeError(‘没有属性:%s‘ % key) def __setattr__(self, key, value): self[key] = value @classmethod def select_all(cls, **kwargs): ms = mysql_pool.MysqlPool() if kwargs: # 当有参数传入的时候 key = list(kwargs.keys())[0] value = kwargs[key] sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace(‘?‘, ‘%s‘) re = ms.select(sql, value) else: # 当无参传入的时候查询所有 sql = "select * from %s" % cls.table_name re = ms.select(sql) return [cls(**r) for r in re] @classmethod def select_one(cls, **kwargs): # 此处只支持单一条件查询 key = list(kwargs.keys())[0] value = kwargs[key] ms = mysql_pool.MysqlPool() sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace(‘?‘, ‘%s‘) re = ms.select(sql, value) if re: return cls(**re[0]) else: return None def save(self): ms = mysql_pool.MysqlPool() fields = [] params = [] args = [] for k, v in self.mapping.items(): fields.append(v.name) params.append(‘?‘) args.append(getattr(self, k, v.default)) sql = "insert into %s (%s) values (%s)" % (self.table_name, ‘,‘.join(fields), ‘,‘.join(params)) sql = sql.replace(‘?‘, ‘%s‘) ms.execute(sql, args) def update(self): ms = mysql_pool.MysqlPool() fields = [] args = [] pr = None for k, v in self.mapping.items(): if v.primary_key: pr = getattr(self, k, None) else: fields.append(v.name + ‘=?‘) args.append(getattr(self, k, v.default)) sql = "update %s set %s where %s = %s" % ( self.table_name, ‘, ‘.join(fields), self.primary_key, pr) sql = sql.replace(‘?‘, ‘%s‘) print(sql) ms.execute(sql, args)
mysql_pool.py
import pymysql from ormpool import db_pool from threading import current_thread class MysqlPool: def __init__(self): self.conn = db_pool.POOL.connection() # print(db_pool.POOL) # print(current_thread().getName(), ‘拿到连接‘, self.conn) # print(current_thread().getName(), ‘池子里目前有‘, db_pool.POOL._idle_cache, ‘\\r\\n‘) self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) def close_db(self): self.cursor.close() self.conn.close() def select(self, sql, args=None): self.cursor.execute(sql, args) rs = self.cursor.fetchall() return rs def execute(self, sql, args): try: self.cursor.execute(sql, args) affected = self.cursor.rowcount # self.conn.commit() except BaseException as e: print(e) finally: self.close_db() return affected
tcpServer.py
import json import socket import struct from concurrent.futures import ThreadPoolExecutor from threading import Lock from threading import current_thread from conf import setting from interface import common_interface, admin_interface, user_interface from server import use_data server_pool = ThreadPoolExecutor(10) mutex = Lock() use_data.mutex = mutex dispatch_dic = { ‘login‘: common_interface.login, ‘register‘: common_interface.register, ‘upload‘: admin_interface.upload_movie, ‘delete_movie‘: admin_interface.delete_movie, ‘download_movie‘: user_interface.download_movie, ‘upload‘: admin_interface.upload_movie, ‘release_notice‘: admin_interface.release_notice, ‘buy_member‘: user_interface.buy_member, ‘get_movie_list‘: user_interface.get_movie_list, ‘check_notice‘: user_interface.check_notice, ‘check_download_record‘: user_interface.check_download_record, ‘check_movie‘: admin_interface.check_movie } def working(conn, addr): print(current_thread().getName()) while True: try: head_struct = conn.recv(4) if not head_struct: break head_len = struct.unpack(‘i‘, head_struct)[0] head_json = conn.recv(head_len).decode(‘utf-8‘) head_dic = json.loads(head_json) # 分发之前,先判断是不是伪造 head_dic[‘addr‘] = str(addr) dispatch(head_dic, conn) except Exception as e: print(‘错误信息:‘, e) conn.close() # 把服务器保存的用户信息清掉 mutex.acquire() if str(addr) in use_data.alive_user: use_data.alive_user.pop(str(addr)) # print(‘***********end*************%s‘%len(login_user_data.alive_user)) mutex.release() print(‘客户端:%s :断开链接‘ % str(addr)) break def dispatch(head_dic, conn): if head_dic[‘type‘] not in dispatch_dic: back_dic = {‘flag‘: False, ‘msg‘: ‘请求不存在‘} send_back(back_dic, conn) else: dispatch_dic[head_dic[‘type‘]](head_dic, conn) def send_back(back_dic, conn): head_json_bytes = json.dumps(back_dic).encode(‘utf-8‘) conn.send(struct.pack(‘i‘, len(head_json_bytes))) # 先发报头的长度 conn.send(head_json_bytes) def server_run(): socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_server.bind(setting.server_address) socket_server.listen(5) while True: conn, addr = socket_server.accept() print(‘客户端:%s 链接成功‘ % str(addr)) server_pool.submit(working, conn, addr) socket_server.close()
use_data.py
alive_user = {}
mutex = None
start.py
import os, sys path = os.path.dirname(__file__) sys.path.append(path) from server import tcpServer if __name__ == ‘__main__‘: tcpServer.server_run()
以上是关于服务端解析的主要内容,如果未能解决你的问题,请参考以下文章