服务端解析

Posted dominic-ji

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了服务端解析相关的知识,希望对你有一定的参考价值。

服务端

项目目录结构

技术分享图片

conf目录

    setting.py:配置信息相关

db目录

    models.py:数据库表对应程序中的类

interface目录

    admin_interface.py:管理员相关操作的接口

    common_interface.py:公共操作的相关接口(登录,注册)

    user_interface.py:用户相关操作的接口

lib目录

    common.py:公共方法

orm目录:

    fuckorm.py:单例版orm框架

    mysql_singleton.py:数据库连接类

ormpool目录:

    db_pool.py:数据库链接池

    fuckorm_pool.py:连接池版orm框架

    mysql_pool.py:连接池版数据库连接类

server目录:

    tcpServer.py:服务端核心代码

    use_data.py:存放用户信息,和全局锁

movie_list目录:

    存放客户端上传上来的电影

start.py:启动文件

 

 各文件功能代码

setting.py

技术分享图片
import os

host = 127.0.0.1
port = 3306
user = root
password = 123456
database = youku2
charset = utf8
autocommit = True

BASE_DIR = os.path.dirname(os.path.dirname(__file__))
BASE_DB = os.path.join(BASE_DIR, db)
BASE_MOVIE = os.path.join(BASE_DIR, movie)
BASE_MOVIE_LIST = os.path.join(BASE_DIR, movie_list)
server_address = (127.0.0.1, 8087)
配置信息

models.py

技术分享图片
# 用单例版
# from orm.fuckorm import Model, StringField, IntegerField


# 用池版
from ormpool.fuckorm_pool import Model, StringField, IntegerField


class User(Model):
    table_name = userinfo
    id = IntegerField(id, primary_key=True)
    name = StringField(name)
    password = StringField(password)
    locked = IntegerField(locked, default=0)
    is_vip= IntegerField(is_vip,default=0)
    user_type = StringField(user_type)


class Movie(Model):
    table_name = movie
    id = IntegerField(id, primary_key=True)
    name = StringField(name)
    path = StringField(path)
    is_free = IntegerField(is_free,default=1)
    is_delete = IntegerField(is_delete,default=0)
    create_time = StringField(create_time)
    user_id = IntegerField(user_id)
    file_md5=StringField(file_md5)


class Notice(Model):
    table_name = notice
    id = IntegerField(id, primary_key=True)
    name = StringField(name)
    content = StringField(content)
    user_id = IntegerField(user_id)
    create_time = StringField(create_time)


class DownloadRecord(Model):
    table_name = download_record
    id = IntegerField(id, primary_key=True)
    user_id = IntegerField(user_id)
    movie_id = IntegerField(movie_id)
程序中的类与数据表对应

admin_interface.py

技术分享图片
from conf import setting
import os
from db import models
from lib import common
@common.login_auth
def upload_movie(user_dic, conn):
    ‘‘‘
    上传视频功能
    :param user_dic:
    :param conn:
    :return:
    ‘‘‘
    recv_size = 0
    print(----->, user_dic[file_name])
    file_name = common.get_uuid(user_dic[file_name]) + user_dic[file_name]
    path = os.path.join(setting.BASE_MOVIE_LIST, file_name)
    with open(path, wb) as f:
        while recv_size < user_dic[file_size]:
            recv_data = conn.recv(1024)
            f.write(recv_data)
            recv_size += len(recv_data)
            # print(recvsize:%s filesize:%s % (recv_size, user_dic[file_size]))
    print(%s :上传成功 % file_name)
    movie = models.Movie(name=file_name, path=path, is_free=user_dic[is_free],
                         user_id=user_dic[user_id], file_md5=user_dic[file_md5])
    movie.save()
    back_dic = {flag: True, msg: 上传成功}
    common.send_back(back_dic,conn)
@common.login_auth
def delete_movie(user_dic,conn):
    ‘‘‘
    删除视频,不是真正的删除,在视频表中的is_delete字段设为1
    :param user_dic:
    :param conn:
    :return:
    ‘‘‘
    movie = models.Movie.select_one(id=user_dic[movie_id])
    movie.is_delete = 1
    movie.update()
    back_dic = {flag: True, msg: 电影删除成功}
    common.send_back(back_dic, conn)

@common.login_auth
def release_notice(user_dic,conn):
    ‘‘‘
    发布公告功能,取出字典中的公告名字,公告内容,用户id,存入数据库
    :param user_dic:
    :param conn:
    :return:
    ‘‘‘
    notice = models.Notice(name=user_dic[notice_name], content=user_dic[notice_content],
                           user_id=user_dic[user_id])
    notice.save()
    back_dic = {flag: True, msg: 公告发布成功}
    common.send_back(back_dic, conn)

@common.login_auth
def check_movie(user_dic,conn):
    ‘‘‘
    通过md5校验数据中是否该电影已经存在了
    :param user_dic:
    :param conn:
    :return:
    ‘‘‘
    movie = models.Movie.select_one(file_md5=user_dic[file_md5])
    if movie:
        back_dic = {flag: False, msg: 该电影已经存在}
    else:
        back_dic = {flag: True}
    common.send_back(back_dic,conn)
管理员接口功能

common_interface.py

技术分享图片
from db import models
from lib import common
from interface import user_interface
from server import use_data as da


def login(user_dic, conn):
    ‘‘‘
    登录功能,登录成功,将用户信息以{"addr":[session,user_id]}的形式,放到内存中,
    多线程操作,必须加锁,锁需要在主线程中生成
    :param user_dic:
    :param conn:
    :return:
    ‘‘‘
    user = models.User.select_one(name=user_dic[name])
    if user:  # 用户存在
        if user.user_type == user_dic[user_type]:
            if user.password == user_dic[password]:
                session = common.get_uuid(user_dic[name])
                da.mutex.acquire()
                if user_dic[addr] in da.alive_user:
                    # 如果当前的客户端已经登录,再次登录的时候,把原来的用户踢出,再重新加入进去
                    da.alive_user.pop(user_dic[addr])
                da.alive_user[user_dic[addr]] = [session, user.id]
                da.mutex.release()
                back_dic = {flag: True, session: session, is_vip: user.is_vip, msg: login success}
                if user_dic[user_type] == user:
                    last_notice = user_interface.check_notice_by_count(1)
                    back_dic[last_notice] = last_notice
            else:
                back_dic = {flag: False, msg: password error}
        else:
            back_dic = {flag: False, msg: 登录类型不匹配}
    else:
        back_dic = {flag: False, msg: user do not exisit}
    common.send_back(back_dic, conn)


def register(user_dic, conn):
    ‘‘‘
    注册功能
    :param user_dic:
    :param conn:
    :return:
    ‘‘‘
    user = models.User.select_one(name=user_dic[name])
    if user:  # 用户存在
        back_dic = {flag: False, msg: user is exisit}
    else:
        user = models.User(name=user_dic[name], password=user_dic[password], user_type=user_dic[user_type])
        user.save()
        back_dic = {flag: True, msg: register success}

    common.send_back(back_dic, conn)
公共接口

user_interface.py

技术分享图片
# 注册(用手机号注册,密码用md5加密)
# 登录(登录后显示最新一条公告)
# 冲会员
# 查看视频(即将所有视频循环打印出来)
# 下载普通视频(非会员下载视频需要等30s广告,会员下载无需等待)
# 下载收费视频(非会员下载需要10元,会员下载需要5元)
# 查看观影记录(就是查看自己下载过的视频)
# 查看公告(包括历史公告)
from db import models
import os
from lib import common


@common.login_auth
def buy_member(user_dic, conn):
    ‘‘‘
    购买会员功能,直接将is_vip字段设为1
    :param user_dic:
    :param conn:
    :return:
    ‘‘‘
    user = models.User.select_one(id=user_dic[user_id])
    user.is_vip = 1
    user.update()
    back_dic = {flag: True, msg: buy success}
    common.send_back(back_dic, conn)


@common.login_auth
def get_movie_list(user_dic, conn):
    ‘‘‘
    获取视频列表:取出全部视频,过滤掉删除的视频,根据前台传来的查询条件,把电影放到列表里
    :param user_dic:
    :param conn:
    :return:
    ‘‘‘
    back_dic = {}
    movie_list = models.Movie.select_all()
    back_movie_list = []
    if movie_list:  # 不为空,继续查询,为空直接返回false
        for movie in movie_list:
            if not movie.is_delete:
                # 拼成一个列表[电影名字,收费/免费电影id]
                if user_dic[movie_type] == all:
                    # 全部
                    back_movie_list.append([movie.name, 免费 if movie.is_free else 收费, movie.id])
                elif user_dic[movie_type] == free:
                    # 免费电影
                    if movie.is_free:  # 免费的才往列表里放
                        back_movie_list.append([movie.name, 免费, movie.id])
                else:
                    # 收费电影
                    if not movie.is_free:  # 收费的才往列表里放
                        back_movie_list.append([movie.name, 收费, movie.id])

        if back_movie_list:
            back_dic = {flag: True, movie_list: back_movie_list}
        else:
            back_dic = {flag: False, msg: 暂无可查看影片}
    else:
        back_dic = {flag: False, msg: 暂无影片}
    common.send_back(back_dic, conn)


@common.login_auth
def download_movie(user_dic, conn):
    movie = models.Movie.select_one(id=user_dic[movie_id])
    if not movie:  # 电影不存在,返回false
        back_dic = {flag: False, msg: 该电影不存在}
        common.send_back(back_dic, conn)
        return
    user = models.User.select_one(id=user_dic[user_id])
    send_back_dic = {flag: True}
    if user_dic[movie_type] == free:  # 下载免费电影,非会员需要等待;下载收费电影,不需要等待了直接下
        if user.is_vip:
            send_back_dic[wait_time] = 0
        else:
            send_back_dic[wait_time] = 30

    send_back_dic[filename] = movie.name
    send_back_dic[filesize] = os.path.getsize(movie.path)
    # 把下载记录保存到记录表中
    down_record = models.DownloadRecord(user_id=user_dic[user_id], movie_id=movie.id)
    down_record.save()
    common.send_back(send_back_dic, conn)
    with open(movie.path, rb)as f:
        for line in f:
            conn.send(line)


@common.login_auth
def check_notice(user_dic, conn):
    ‘‘‘
    查看公告功能
    :param user_dic:
    :param conn:
    :return:
    ‘‘‘
    # 直接调用通过条数查询的接口,传入None表示全查
    notice_list = check_notice_by_count(count=None)
    if notice_list:
        back_dic={flag: True, notice_list: notice_list}
    else:
        back_dic={flag: False, msg: 暂无公告}

    common.send_back(back_dic, conn)


def check_notice_by_count(count=None):
    ‘‘‘
    查看功能的方法,供内部调用
    count 为None,查全部,为1 查一条
    :param count:
    :return:
    ‘‘‘
    notice_list = models.Notice.select_all()
    back_notice_list = []
    if notice_list:  # 不为空,继续查询,为空直接返回false
        if not count:
            for notice in notice_list:
                back_notice_list.append({notice.name: notice.content})
        else:  # 查一条
            notice_list=sorted(notice_list,key=lambda notice:notice.create_time)
            last_row=len(notice_list)-1
            back_notice_list.append({notice_list[last_row].name: notice_list[last_row].content})
        return back_notice_list
    else:
        return False


@common.login_auth
def check_download_record(user_dic, conn):
    ‘‘‘
    查看下载记录:
    先通过user_id到DownloadRecord表中查到下载的每一条记录,
    通过每一条记录中的电影id再去电影表查询电影,取出名字,返回
    :param user_dic:
    :return:
    ‘‘‘
    download_record = models.DownloadRecord.select_all(user_id=user_dic[user_id])
    if not download_record:
        back_dic = {flag: False, msg: 暂无观影记录}
        common.send_back(back_dic, conn)
    else:
        download_list = []
        for record in download_record:
            movie = models.Movie.select_one(id=record.movie_id)
            download_list.append(movie.name)
        back_dic = {flag: True, msg: buy success, download_list: download_list}
        common.send_back(back_dic, conn)
用户接口

common.py

技术分享图片
import hashlib
import os
import time
import json
import struct

def login_auth(func):
    def wrapper(*args, **kwargs):
        from server import use_data as mu
        for value in mu.alive_user.values():
            if value[0] == args[0][session]:
                args[0][user_id] = value[1]
                break
        if not args[0].get(user_id, None):
            send_back({flag: False, msg: 您没有登录}, args[1])
        else:
            return func(*args, **kwargs)

    return wrapper


def get_uuid(name):
    md = hashlib.md5()
    md.update(name.encode(utf-8))
    md.update(str(time.clock()).encode(utf-8))
    return md.hexdigest()


def get_time():
    now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    return now_time


def get_colck_time():
    return str(time.clock())


def get_bigfile_md5(file_path):
    if os.path.exists(file_path):
        md = hashlib.md5()
        filesize = os.path.getsize(file_path)
        file_list = [0, filesize // 3, (filesize // 3) * 2, filesize - 10]
        with open(file_path, rb) as f:
            for line in file_list:
                f.seek(line)
                md.update(f.read(10))
        return md.hexdigest()


def send_back(back_dic, conn):

    head_json_bytes = json.dumps(back_dic).encode(utf-8)
    conn.send(struct.pack(i, len(head_json_bytes)))  # 先发报头的长度
    conn.send(head_json_bytes)
公共方法

funcorm.py

技术分享图片
from orm import mysql_singleton


class Field(object):
    def __init__(self, name, column_type, primary_key, default):
        self.name = name  # 列名
        self.column_type = column_type  # 数据类型
        self.primary_key = primary_key  # 是否为主键
        self.default = default  # 默认值


class StringField(Field):
    def __init__(self, name=None, column_type=varchar(100), primary_key=False, default=None):
        super().__init__(name, column_type, primary_key, default)


class IntegerField(Field):
    def __init__(self, name=None, primary_key=False, default=0):
        super().__init__(name, int, primary_key, default)


class ModelMetaclass(type):
    def __new__(cls, name, bases, attrs):
        if name == "Model":
            return type.__new__(cls, name, bases, attrs)
        table_name = attrs.get(table_name, None)
        if not table_name:
            raise TypeError(没有表名)
        primary_key = None  # 查找primary_key字段

        # 保存列类型的对象
        mappings = dict()
        for k, v in attrs.items():
            # 是列名的就保存下来
            if isinstance(v, Field):
                mappings[k] = v
                if v.primary_key:
                    # 找到主键:
                    if primary_key:
                        raise TypeError(主键重复: %s % k)
                    primary_key = k

        for k in mappings.keys():
            attrs.pop(k)
        if not primary_key:
            raise TypeError(没有主键)

        # 给cls增加一些字段:
        attrs[mapping] = mappings
        attrs[primary_key] = primary_key
        attrs[table_name] = table_name
        return type.__new__(cls, name, bases, attrs)


class Model(dict, metaclass=ModelMetaclass):
    def __init__(self, **kw):
        super(Model, self).__init__(**kw)

    def __getattr__(self, key):  # .访问属性触发
        try:
            return self[key]
        except KeyError:
            raise AttributeError(没有属性:%s % key)

    def __setattr__(self, key, value):
        self[key] = value

    @classmethod
    def select_all(cls, **kwargs):
        ms = mysql_singleton.Mysql().singleton()
        if kwargs:  # 当有参数传入的时候
            key = list(kwargs.keys())[0]
            value = kwargs[key]
            sql = "select * from %s where %s=?" % (cls.table_name, key)
            sql = sql.replace(?, %s)
            re = ms.select(sql, value)
        else:  # 当无参传入的时候查询所有
            sql = "select * from %s" % cls.table_name
            re = ms.select(sql)
        return [cls(**r) for r in re]

    @classmethod
    def select_one(cls, **kwargs):
        # 此处只支持单一条件查询
        key = list(kwargs.keys())[0]
        value = kwargs[key]
        ms = mysql_singleton.Mysql().singleton()
        sql = "select * from %s where %s=?" % (cls.table_name, key)

        sql = sql.replace(?, %s)
        re = ms.select(sql, value)
        if re:
            return cls(**re[0])
        else:
            return None

    def save(self):
        ms = mysql_singleton.Mysql().singleton()
        fields = []
        params = []
        args = []
        for k, v in self.mapping.items():
            fields.append(v.name)
            params.append(?)
            args.append(getattr(self, k, v.default))
        sql = "insert into %s (%s) values (%s)" % (self.table_name, ,.join(fields), ,.join(params))
        sql = sql.replace(?, %s)
        ms.execute(sql, args)

    def update(self):
        ms = mysql_singleton.Mysql().singleton()
        fields = []
        args = []
        pr = None
        for k, v in self.mapping.items():
            if v.primary_key:
                pr = getattr(self, k, v.default)
            else:
                fields.append(v.name + =?)
                args.append(getattr(self, k, v.default))
        sql = "update %s set %s where %s = %s" % (
            self.table_name, , .join(fields), self.primary_key, pr)

        sql = sql.replace(?, %s)
        print(sql)
        ms.execute(sql, args)
单例版orm

mysql_singleton.py

技术分享图片
from conf import setting
import pymysql


class Mysql:
    __instance = None
    def __init__(self):
        self.conn = pymysql.connect(host=setting.host,
                                    user=setting.user,
                                    password=setting.password,
                                    database=setting.database,
                                    charset=setting.charset,
                                    autocommit=setting.autocommit)
        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)

    def close_db(self):
        self.conn.close()

    def select(self, sql, args=None):
        self.cursor.execute(sql, args)
        rs = self.cursor.fetchall()
        return rs

    def execute(self, sql, args):
        try:
            self.cursor.execute(sql, args)
            affected = self.cursor.rowcount
            # self.conn.commit()
        except BaseException as e:
            print(e)
        return affected

    @classmethod
    def singleton(cls):
        if not cls.__instance:
            cls.__instance = cls()
        return cls.__instance
数据库链接类

db_pool.py

技术分享图片
import pymysql
from conf import setting
from DBUtils.PooledDB import PooledDB

POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=6,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,
    # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。
    ping=0,
    # ping MySQL服务端,检查是否服务可用。
    host=setting.host,
    port=setting.port,
    user=setting.user,
    password=setting.password,
    database=setting.database,
    charset=setting.charset,
    autocommit=setting.autocommit
)
数据库连接池

funcorm_pool.py

技术分享图片
from ormpool import mysql_pool


class Field(object):
    def __init__(self, name, column_type, primary_key, default):
        self.name = name  # 列名
        self.column_type = column_type  # 数据类型
        self.primary_key = primary_key  # 是否为主键
        self.default = default  # 默认值


class StringField(Field):
    def __init__(self, name=None, ddl=varchar(100), primary_key=False, default=None):
        super().__init__(name, ddl, primary_key, default)


class IntegerField(Field):
    def __init__(self, name=None, primary_key=False, default=0):
        super().__init__(name, int, primary_key, default)


class ModelMetaclass(type):
    def __new__(cls, name, bases, attrs):
        if name == "Model":
            return type.__new__(cls, name, bases, attrs)
        table_name = attrs.get(table_name, None)
        if not table_name:
            raise TypeError(没有表名)
        primary_key = None  # 查找primary_key字段

        # 保存列类型的对象
        mappings = dict()
        for k, v in attrs.items():
            # 是列名的就保存下来
            if isinstance(v, Field):
                mappings[k] = v
                if v.primary_key:
                    # 找到主键:
                    if primary_key:
                        raise TypeError(主键重复: %s % k)
                    primary_key = k

        for k in mappings.keys():
            attrs.pop(k)
        if not primary_key:
            raise TypeError(没有主键)

        # 给cls增加一些字段:
        attrs[mapping] = mappings
        attrs[primary_key] = primary_key
        attrs[table_name] = table_name
        return type.__new__(cls, name, bases, attrs)


class Model(dict, metaclass=ModelMetaclass):
    def __init__(self, **kw):
        super(Model, self).__init__(**kw)

    def __getattr__(self, key):  # .访问属性触发
        try:
            return self[key]
        except KeyError:
            raise AttributeError(没有属性:%s % key)

    def __setattr__(self, key, value):
        self[key] = value

    @classmethod
    def select_all(cls, **kwargs):
        ms = mysql_pool.MysqlPool()
        if kwargs:  # 当有参数传入的时候
            key = list(kwargs.keys())[0]
            value = kwargs[key]
            sql = "select * from %s where %s=?" % (cls.table_name, key)
            sql = sql.replace(?, %s)
            re = ms.select(sql, value)
        else:  # 当无参传入的时候查询所有
            sql = "select * from %s" % cls.table_name
            re = ms.select(sql)
        return [cls(**r) for r in re]

    @classmethod
    def select_one(cls, **kwargs):
        # 此处只支持单一条件查询
        key = list(kwargs.keys())[0]
        value = kwargs[key]
        ms = mysql_pool.MysqlPool()
        sql = "select * from %s where %s=?" % (cls.table_name, key)

        sql = sql.replace(?, %s)
        re = ms.select(sql, value)
        if re:
            return cls(**re[0])
        else:
            return None

    def save(self):
        ms = mysql_pool.MysqlPool()
        fields = []
        params = []
        args = []
        for k, v in self.mapping.items():
            fields.append(v.name)
            params.append(?)
            args.append(getattr(self, k, v.default))
        sql = "insert into %s (%s) values (%s)" % (self.table_name, ,.join(fields), ,.join(params))
        sql = sql.replace(?, %s)
        ms.execute(sql, args)

    def update(self):
        ms = mysql_pool.MysqlPool()
        fields = []
        args = []
        pr = None
        for k, v in self.mapping.items():
            if v.primary_key:
                pr = getattr(self, k, None)
            else:
                fields.append(v.name + =?)
                args.append(getattr(self, k, v.default))
        sql = "update %s set %s where %s = %s" % (
            self.table_name, , .join(fields), self.primary_key, pr)

        sql = sql.replace(?, %s)
        print(sql)
        ms.execute(sql, args)
连接池版orm

mysql_pool.py

技术分享图片
import pymysql
from ormpool import db_pool
from threading import current_thread


class MysqlPool:
    def __init__(self):
        self.conn = db_pool.POOL.connection()
        # print(db_pool.POOL)
        # print(current_thread().getName(), 拿到连接, self.conn)
        # print(current_thread().getName(), 池子里目前有, db_pool.POOL._idle_cache, \\r\\n)
        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)

    def close_db(self):
        self.cursor.close()
        self.conn.close()

    def select(self, sql, args=None):
        self.cursor.execute(sql, args)
        rs = self.cursor.fetchall()
        return rs

    def execute(self, sql, args):

        try:
            self.cursor.execute(sql, args)
            affected = self.cursor.rowcount
            # self.conn.commit()
        except BaseException as e:
            print(e)
        finally:
            self.close_db()
        return affected
池版数据库链接

tcpServer.py

技术分享图片
import json
import socket
import struct
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from threading import current_thread

from conf import setting
from interface import common_interface, admin_interface, user_interface
from server import use_data

server_pool = ThreadPoolExecutor(10)
mutex = Lock()
use_data.mutex = mutex
dispatch_dic = {
    login: common_interface.login,
    register: common_interface.register,
    upload: admin_interface.upload_movie,
    delete_movie: admin_interface.delete_movie,
    download_movie: user_interface.download_movie,
    upload: admin_interface.upload_movie,
    release_notice: admin_interface.release_notice,
    buy_member: user_interface.buy_member,
    get_movie_list: user_interface.get_movie_list,
    check_notice: user_interface.check_notice,
    check_download_record: user_interface.check_download_record,
    check_movie: admin_interface.check_movie
}


def working(conn, addr):
    print(current_thread().getName())
    while True:
        try:
            head_struct = conn.recv(4)
            if not head_struct: break
            head_len = struct.unpack(i, head_struct)[0]
            head_json = conn.recv(head_len).decode(utf-8)
            head_dic = json.loads(head_json)
            # 分发之前,先判断是不是伪造
            head_dic[addr] = str(addr)

            dispatch(head_dic, conn)

        except Exception as e:
            print(错误信息:, e)
            conn.close()
            # 把服务器保存的用户信息清掉
            mutex.acquire()
            if str(addr) in use_data.alive_user:
                use_data.alive_user.pop(str(addr))
            # print(***********end*************%s%len(login_user_data.alive_user))
            mutex.release()

            print(客户端:%s :断开链接 % str(addr))
            break


def dispatch(head_dic, conn):
    if head_dic[type] not in dispatch_dic:
        back_dic = {flag: False, msg: 请求不存在}
        send_back(back_dic, conn)
    else:
        dispatch_dic[head_dic[type]](head_dic, conn)


def send_back(back_dic, conn):
    head_json_bytes = json.dumps(back_dic).encode(utf-8)
    conn.send(struct.pack(i, len(head_json_bytes)))  # 先发报头的长度
    conn.send(head_json_bytes)


def server_run():
    socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socket_server.bind(setting.server_address)
    socket_server.listen(5)

    while True:
        conn, addr = socket_server.accept()
        print(客户端:%s 链接成功 % str(addr))
        server_pool.submit(working, conn, addr)

    socket_server.close()
服务端核心功能

use_data.py

技术分享图片
alive_user = {}
mutex = None
用户登录信息,互斥锁

start.py

技术分享图片
import os, sys

path = os.path.dirname(__file__)
sys.path.append(path)
from server import tcpServer

if __name__ == __main__:
    tcpServer.server_run()
启动文件

 

以上是关于服务端解析的主要内容,如果未能解决你的问题,请参考以下文章

Binder源码解析(从客户端到服务端代码流程)

Binder源码解析(从客户端到服务端代码流程)

Tomcat根据JSP生成Servlet机制解析

Motan在服务provider端用于处理request的线程池

ABB Server(服务端)代码解析

android 在服务器端生成json格式数据,在客户端怎么解析