python-flask框架&mock接口开发
Posted tour8
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python-flask框架&mock接口开发相关的知识,希望对你有一定的参考价值。
1、开发一个get接口
1 import flask 2 import json 3 """ 4 flask轻量级的web开发框架 5 """ 6 7 # 初始化一个服务 8 server = flask.Flask(__name__) 9 10 11 @server.route(\'/api/payment\') # 装饰器:一个接口 12 def payment(): 13 data = {"code":301,"msg":"支付处理中","amount":30000} 14 return json.dumps(data,ensure_ascii=False) 15 16 # http://localhost:9999/api/payment 17 # http://127.0.0.1:9999/api/payment 18 server.run(port=9999,debug=True)
结果:
2、post接口
1 import flask 2 import json 3 """ 4 flask轻量级的web开发框架 5 """ 6 7 # 初始化一个服务 8 server = flask.Flask(__name__) 9 10 11 @server.route(\'/api/payment\') # 装饰器:一个get接口 12 def payment(): 13 data = {"code":301,"msg":"支付处理中","amount":30000} 14 return json.dumps(data,ensure_ascii=False) 15 16 17 @server.route(\'/api/login\',methods=[\'post\']) # 装饰器:一个post接口 18 def login(): 19 data = {"code":0,"msg":"登录成功"} 20 return json.dumps(data,ensure_ascii=False) 21 22 # http://localhost:9999/api/payment 23 # http://127.0.0.1:9999/api/payment 24 server.run(port=9999,debug=True)
结果:
3、启动服务,非本地也可以访问
1 # 启动后,别人可以访问 2 # http://ip:9999/api/payment 3 server.run(host=\'0.0.0.0\',port=9999,debug=True)
运行后,手机访问(同一个局域网):
4、实例1
import flask import json,pymysql """ flask轻量级的web开发框架 """ # 初始化一个服务 server = flask.Flask(__name__) def op_mysql(sql): connect = pymysql.connect(host=\'xxx\', port=3306, db=\'xxx\', password=\'xxx\', charset=\'utf8\', autocommit=True, user=\'jxz\' ) cursor = connect.cursor(pymysql.cursors.DictCursor) try: cursor.execute(sql) except Exception as e: result = {\'error\':\'sql错误\'} else: result = cursor.fetchall() finally: cursor.close() connect.close() return result @server.route(\'/api/payment\') # 装饰器:一个get接口 def payment(): status = flask.request.values.get(\'status\') if status == \'success\': data = {"code": 0, "msg": "支付成功", "amount": 30000} elif status == \'process\': data = {"code":305,"msg":"支付处理中"} elif status == \'fail\': data = {"code": -1, "msg": "支付失败"} else: data = {"code": 400, "msg": "支付状态错误"} return json.dumps(data,ensure_ascii=False) @server.route(\'/api/login\',methods=[\'post\']) # 装饰器:一个post接口 def login(): data = {"code":0,"msg":"登录成功"} return json.dumps(data,ensure_ascii=False) @server.route(\'/api/account/data\') def account_data(): #传入参数 account_name = flask.request.values.get(\'account\') # 获取json格式的参考 # args = flask.request.json.get(\'xxx\') if account_name: result = op_mysql(\'select * from gtm_account where account="%s";\'% account_name) else: result = op_mysql(\'select * from gtm_account;\') data = {"code":0,"msg":"success","data":result} return data # # 启动服务:启动后只能本地访问 # # http://localhost:9999/api/payment # # http://127.0.0.1:9999/api/payment # server.run(port=9999,debug=True) # 启动服务:启动后别人可以访问 # http://ip:9999/api/payment # debug=True表示调试模式,改了之后立即生效,不需要重新启动服务 server.run(host=\'0.0.0.0\',port=9999,debug=True)
结果1:
结果2:
结果3:
5、实例2(操作数据库)
1 import flask 2 import json,pymysql,hashlib,redis,time 3 4 # 初始化一个服务 5 server = flask.Flask(__name__) 6 7 8 def op_mysql(sql,one_tag=False): 9 connect = pymysql.connect(host=\'xxx\', 10 port=3306, 11 db=\'xxx\', 12 password=\'xxx\', 13 charset=\'utf8\', 14 autocommit=True, 15 user=\'jxz\' 16 ) 17 cursor = connect.cursor(pymysql.cursors.DictCursor) 18 try: 19 cursor.execute(sql) 20 except Exception as e: 21 return \'\' 22 else: 23 if one_tag: 24 result = cursor.fetchone() 25 else: 26 result = cursor.fetchall() 27 finally: 28 cursor.close() 29 connect.close() 30 return result 31 32 33 # 加密 34 # 加盐 35 def md5(s,salt=\'lzh\'): 36 s = str(s)+salt 37 m = hashlib.md5(s.encode()) 38 return m.hexdigest() 39 40 41 # 操作redis 42 def op_redis(key,value=None,expire=60*60*2): 43 r = redis.Redis(host=\'xxx\',password=\'xxx\',decode_responses=True) 44 if value: 45 r.set(key,value,expire) 46 else: 47 return r.get(key) 48 49 50 51 def check_username(name): 52 sql = \'select * from app_myuser where username="%s";\' % name 53 result = op_mysql(sql) 54 if result: 55 return True 56 else: 57 return False 58 59 60 # 注册 61 @server.route(\'/api/register\',methods=[\'post\']) 62 def register(): 63 # 获取参数 64 username = flask.request.values.get(\'username\') 65 pwd = flask.request.values.get(\'pwd\') 66 cpwd = flask.request.values.get(\'cpwd\') 67 68 if username and pwd and cpwd: 69 if check_username(username): 70 data = {"error_code":401,"msg":"用户已存在"} 71 elif pwd != cpwd: 72 data = {"error_code":400,"msg":"两次输入的密码不正确"} 73 else: 74 pwd = md5(pwd) 75 sql = \'insert into app_myuser(username,passwd) values ("%s","%s");\' % (username,pwd) 76 op_mysql(sql) 77 data = {"error_code": 0, "msg": "注册成功"} 78 else: 79 data = {"error_code":400,"msg":"必填参数不能为空"} 80 return json.dumps(data) 81 82 83 # 登录 84 @server.route(\'/api/login\',methods=[\'post\']) # 装饰器:一个post接口 85 def login(): 86 # 获取参数 87 username = flask.request.values.get(\'username\') 88 pwd = flask.request.values.get(\'pwd\') 89 90 if username and pwd: 91 sql = \'select * from app_myuser where username="%s";\' % username 92 result = op_mysql(sql,True) 93 if result: 94 if md5(pwd) == result.get(\'passwd\'): 95 96 token = md5(username+str(time.time())) 97 info = {\'username\':username,\'id\':result.get(id)} 98 op_redis(token,json.dumps(info)) 99 100 data = {"code": 0, "msg": "登录成功","token":token} 101 else: 102 data = {"code": 403, "msg": "账号/密码错误"} 103 else: 104 data = {"code": 401, "msg": "用户不存在"} 105 else: 106 data = {"code": 400, "msg": "必填参数不能为空"} 107 108 return json.dumps(data,ensure_ascii=False) 109 110 111 # 支付 112 @server.route(\'/api/payment\') # 装饰器:一个get接口 113 def payment(): 114 token = flask.request.values.get(\'token\') 115 amount= flask.request.values.get(\'amount\') 116 117 # todo:check_amount 118 if token: 119 result = op_redis(token) 120 if result: 121 amount = float(amount) 122 result = json.loads(result) 123 userId = result.get(\'id\') 124 sql = \'update app_myuser set balance -= %s where id=%s\' % (amount,userId) 125 op_mysql(sql) 126 data = {"code": 0, "msg": "支付成功"} 127 else: 128 data = {"code": 401, "msg": "未登录"} 129 else: 130 data = {"code": 401, "msg": "未登录"} 131 132 return json.dumps(data,ensure_ascii=False) 133 134 # 启动服务 135 server.run(host=\'0.0.0.0\',port=9999,debug=True)
6、加密
1 import hashlib 2 3 s = \'123456\' 4 # md5加密是不可逆的 5 m = hashlib.md5(s.encode()) # bytes类型 6 # m = hashlib.sha256(s.encode()) 7 result = m.hexdigest() 8 print(result) 9 print(len(result))
以上是关于python-flask框架&mock接口开发的主要内容,如果未能解决你的问题,请参考以下文章
python-flask 框架使用 flask_mongoengine
前端测试框架Jest系列教程 -- Mock Functions