import flask, json
from tools import op_mysql
server = flask.Flask(__name__)
@server.route(‘/get_user‘, methods=[‘get‘, ‘post‘]) # 定义接口访问的路径和请求方式
def get_all_user():
sql = ‘select * from bt_stu;‘
res = op_mysql(host=‘211.149.218.16‘, user=‘jxz‘, passwd=‘123456‘, db=‘jxz‘, sql=sql)
response = json.dumps(res, ensure_ascii=False) # 把list转成json串
return response # 只能返回字符串
@server.route(‘/add_user‘, methods=[‘post‘])
def add_user():
user_id = flask.request.values.get(‘id‘)
username = flask.request.values.get(‘u‘)
if user_id and username:
sql = "insert into stu values (‘%s‘,‘%s‘);" % (user_id, username)
res = op_mysql(host=‘211.149.218.16‘, user=‘jxz‘, passwd=‘123456‘, db=‘jxz‘, sql=sql)
response = {‘code‘:308,‘msg‘:‘添加成功‘}
else:
response = {‘code‘:503,‘msg‘:‘必填參數未填!‘}
return json.dumps(response,ensure_ascii=False)
server.run(port=8081, debug=True) # 启动接口,定义端口,debug=True指在修改接口后不需重启
import pymysql, redis
def op_mysql(host, user, passwd, db, sql, port=3306, charset=‘utf8‘):
conn = pymysql.connect(host=host, user=user,
password=passwd,
port=port, db=db,
charset=charset)
cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
cur.execute(sql)
sql_start = sql[:6].upper() # 取sql前6个字符串判断sql语句类型
if sql_start == ‘SELECT‘:
res = cur.fetchall()
else:
conn.commit()
res = ‘OK‘
cur.close()
conn.close()
return res
def op_redis(host, user, passwd, k, v=None, port=6379, db=0):
r = redis.Redis(host=host, password=passwd, port=port, db=db)
if v:
r.set(k, v)
res = ‘OK‘
else:
res = r.get(k)
if res: # 判断是否get到数据
res = r.get(k).decode()
else:
res = None
return res
if __name__ == ‘__main__‘: # 其他文件调用时,不会执行
sql = ‘select * from bt_stu;‘
sql2 = "update bt_stu set class = ‘天蝎座‘ where id = 599"
res = op_mysql(host=‘211.149.218.16‘, user=‘jxz‘, passwd=‘123456‘, db=‘jxz‘, sql=sql)
print(res)