flask-web ——RPC实际项目业务简析
Posted 胖虎是只mao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flask-web ——RPC实际项目业务简析相关的知识,希望对你有一定的参考价值。
一、需求背景
在首页中 获取特定用户的推荐文章列表 需要web系统和推荐系统配合
- 有web系统告知 推荐系统 用户id是谁
- 推荐系统 根据用户id 决定 推荐的文章id
- web系统 根据推荐的文章id 查询文章数据,返回给客户端
RPC业务实现
RPC接口分析
调用请求
- channel_id
- user_id
- article_num 推荐的文章数量 10
- timestamp 时间戳 明确 跟推荐系统索要历史推荐数据还是最新的推荐结果
- 如果timestamp传递的是最新的当前时间,则表明索要新的推荐数据
- 如果传递的是历史的时间,则索要历史推荐
调用返回值
[article_id 文章id [1,2,3,4,5.6] 埋点
曝光参数 exposure
请求上一页历史推荐的时间戳 time_stamp
[
{
article_id
{
click: 'click param'
collect: 'collct param'
share:
read:
}
}
{
article_id
[
click: 'click param'
collect: 'collct param'
share:
read:
]
}
]
使用IDL 接口定义语言 将上述接口写到文件中
- gRPC -> IDL ProtoBuf protobuf proto
二、推荐系统接口定义
接口原型
接口名称: user_recommend
调用参数:
UserRequest:
user_id # 用户id
channel_id # 频道id
article_num # 推荐的文章数量
time_stamp # 推荐的时间戳
返回数据:
ArticleResponse:
expousre # 曝光埋点数据
time_stamp # 推荐的时间戳
recommends: # 推荐结果
article_id # 文章id
track: # 关于文章的埋点数据
click # 用户点击行为的埋点参数
collect # 用户收藏的埋点参数
share # 用户分享的埋点参数
read # 用户进入文章详情的埋点参数
使用Protobuf 定义的接口如下:
使用protobuf定义的接口文件通常以proto作为文件后缀名
在toutiao-backend/common/rpc
目录下新建reco.proto文件
syntax = "proto3";
// 使用message定义数据类型
message UserRequest {
string user_id=1;
int32 channel_id=2;
int32 article_num=3;
int64 time_stamp=4;
}
message Track {
string click=1;
string collect=2;
string share=3;
string read=4;
}
message Article {
int64 article_id=1;
Track track=2;
}
message ArticleResponse {
string exposure=1;
int64 time_stamp=2;
repeated Article recommends=3;
}
// 使用service 定义一组服务
service UserRecommend {
// 使用rcp 定义被调用的方法(函数)
rpc user_recommend(UserRequest) returns(ArticleResponse) {}
// rpc simpla_recommend() returns () {}
}
代码生成
安装protobuf
编译器和grpc
库
pip install grpcio-tools
编译生成代码
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. reco.proto
- -I表示搜索proto文件中被导入文件的目录
--python_out
表示保存生成Python文件的目录,生成的文件中包含接口定义中的数据类型--grpc_python_out
表示保存生成Python文件的目录,生成的文件中包含接口定义中的服务类型
在toutiao-backend/common/rpc
目录下执行上述命令,会自动生成如下两个rpc调用辅助代码模块:
reco_pb2.py
保存根据接口定义文件中的数据类型生成的python类reco_pb2_grpc.py
保存根据接口定义文件中的服务方法类型生成的python调用RPC方法
三、服务端代码demo
为了方便看到效果,我们编写补全服务端代码。
注意:此处实际推荐的代码在后续推荐系统课程中会涉及到
在toutiao-backend/common/rpc
目录下新建server.py
文件
import reco_pb2
import reco_pb2_grpc
import grpc
from concurrent.futures import ThreadPoolExecutor
import time
# rpc接口定义中服务对应成Python的类
# 首先补全被调用的函数代码
class UserRecommendService(reco_pb2_grpc.UserRecommendServicer):
"""
通过子类继承重写的方式
"""
# 在接口定义的同名方法中补全,被调用时应该执行的逻辑
def user_recommend(self, request, context):
"""
在接口中定义的用户推荐方法
:param request: 调用时的请求参数对象 UserRequest
:param context: 通过此对象可以设置调用返回的异常信息
:return:
"""
# 获取调用的参数
# request是调用的请求数据对象
user_id = request.user_id
channel_id = request.channel_id
article_num = request.article_num
time_stamp = request.time_stamp
# 决定调用返回数据
response = reco_pb2.ArticleResponse()
response.exposure = 'exposure param'
response.time_stamp = round(time.time()*1000)
recommends = []
for i in range(article_num):
article = reco_pb2.Article()
article.track.click = 'click param {}'.format(i+1)
article.track.collect = 'collect param {}'.format(i+1)
article.track.share = 'share param {}'.format(i+1)
article.track.read = 'read param {}'.format(i+1)
article.article_id = i+1
recommends.append(article)
# 注意 对于列表类型的赋值使用extend
response.recommends.extend(recommends)
# 最终要返回一个调用结果
return response
# 创建rpc的服务器
def serve():
"""
rpc服务端启动方法
"""
# 创建一个rpc服务器
server = grpc.server(ThreadPoolExecutor(max_workers=10))
# 向服务器中添加被调用的服务方法
reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendService(), server)
# 微服务器绑定ip地址和端口
server.add_insecure_port('127.0.0.1:8888')
# 启动rpc服务,开启服务器运行, start()方法是非阻塞方法
server.start()
# start()不会阻塞,此处需要加上循环睡眠 防止程序退出
while True:
time.sleep(10)
if __name__ == '__main__':
serve()
四、编写客户端
在toutiao-backend/common/rpc目录下新建client.py
import grpc
import reco_pb2
import reco_pb2_grpc
import time
def feed_articles(stub):
# 构建rpc调用的调用参数
user_request = reco_pb2.UserRequest()
user_request.user_id = '1'
user_request.channel_id = 1
user_request.article_num = 10
user_request.time_stamp = round(time.time()*1000)
# 通过stub进行方法调用,并接收调用返回值
# ret -> ArticleResponse 对象
ret = stub.user_recommend(user_request)
print('ret={}'.format(ret))
def run():
"""
rpc客户端调用的方法
用with的优点是 断开服务器不需要close,中断with语句就行
"""
# 使用with语句连接rpc服务器
# 构建连接rpc服务器的对象
with grpc.insecure_channel('127.0.0.1:8888') as channel:
# 创建调用rpc远端服务的辅助对象stub
stub = reco_pb2_grpc.UserRecommendStub(channel)
# 通过stub进行rpc调用
feed_articles(stub)
if __name__ == '__main__':
run()
五、首页新闻推荐接口编写
在toutiao-backend/toutiao/resources/news/article.py
中编写
from rpc import reco_pb2, reco_pb2_grpc
class ArticleListResource(Resource):
"""
获取推荐文章列表数据
"""
def _feed_articles(self, channel_id, timestamp, feed_count):
"""
获取推荐文章
:param channel_id: 频道id
:param feed_count: 推荐数量
:param timestamp: 时间戳
:return: [{article_id, trace_params}, ...], timestamp
"""
user_request = reco_pb2.UserRequest()
user_request.user_id = g.user_id or 'annoy'
user_request.channel_id = channel_id
user_request.article_num = feed_count
user_request.time_stamp = round(time.time() * 1000)
stub = reco_pb2_grpc.UserRecommendStub(current_app.rpc_reco)
# ret -> ArticleResponse 对象
ret = stub.user_recommend(user_request)
return ret.recommends, ret.time_stamp
def get(self):
"""
获取文章列表
"""
qs_parser = RequestParser()
qs_parser.add_argument('channel_id', type=parser.channel_id, required=True, location='args')
qs_parser.add_argument('timestamp', type=inputs.positive, required=True, location='args')
args = qs_parser.parse_args()
channel_id = args.channel_id
timestamp = args.timestamp
per_page = constants.DEFAULT_ARTICLE_PER_PAGE_MIN
try:
feed_time = time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(time.time()))
except Exception:
return {'message': 'timestamp param error'}, 400
results = []
# 获取推荐文章列表
feeds, pre_timestamp = self._feed_articles(channel_id, timestamp, per_page)
# 查询文章
for feed in feeds:
article = cache_article.ArticleInfoCache(feed.article_id).get()
if article:
article['pubdate'] = feed_time
article['trace'] = {
'click': feed.track.click,
'collect': feed.track.collect,
'share': feed.track.share,
'read': feed.track.read
}
results.append(article)
return {'pre_timestamp': pre_timestamp, 'results': results}
以上是关于flask-web ——RPC实际项目业务简析的主要内容,如果未能解决你的问题,请参考以下文章
flask-web 搜索系统项目实际应用suggest查询实现联想提示自动补全的实现