flask-web ——RPC实际项目业务简析

Posted 胖虎是只mao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flask-web ——RPC实际项目业务简析相关的知识,希望对你有一定的参考价值。

一、需求背景

在首页中 获取特定用户的推荐文章列表 需要web系统推荐系统配合

  • 有web系统告知 推荐系统 用户id是谁
  • 推荐系统 根据用户id 决定 推荐的文章id
  • web系统 根据推荐的文章id 查询文章数据,返回给客户端

RPC业务实现

RPC接口分析

调用请求

  • channel_id
  • user_id
  • article_num 推荐的文章数量 10
  • timestamp 时间戳 明确 跟推荐系统索要历史推荐数据还是最新的推荐结果
    • 如果timestamp传递的是最新的当前时间,则表明索要新的推荐数据
    • 如果传递的是历史的时间,则索要历史推荐

调用返回值

[article_id 文章id [1,2,3,4,5.6] 埋点

 曝光参数  exposure 
 请求上一页历史推荐的时间戳 time_stamp
[
   
    {
        article_id
           {
               click: 'click param'
               collect: 'collct param'
               share: 
               read: 
           }
    }
    {
        article_id
           [
               click: 'click param'
               collect: 'collct param'
               share: 
               read: 
           ]
    }
]

使用IDL 接口定义语言 将上述接口写到文件中

  • gRPC -> IDL ProtoBuf protobuf proto

二、推荐系统接口定义

接口原型
接口名称: user_recommend

调用参数:

UserRequest:
    user_id       # 用户id
    channel_id    # 频道id
    article_num   # 推荐的文章数量
    time_stamp    # 推荐的时间戳

返回数据:

ArticleResponse:
    expousre         # 曝光埋点数据
    time_stamp       # 推荐的时间戳
    recommends:      # 推荐结果
        article_id   # 文章id
        track:         # 关于文章的埋点数据
            click    # 用户点击行为的埋点参数
            collect  # 用户收藏的埋点参数
            share    # 用户分享的埋点参数
            read     # 用户进入文章详情的埋点参数
使用Protobuf 定义的接口如下:

使用protobuf定义的接口文件通常以proto作为文件后缀名

toutiao-backend/common/rpc目录下新建reco.proto文件

syntax = "proto3";

// 使用message定义数据类型
message UserRequest {
    string user_id=1;
    int32 channel_id=2;
    int32 article_num=3;
    int64 time_stamp=4;
}

message Track {
    string click=1;
    string collect=2;
    string share=3;
    string read=4;
}

message Article {
    int64 article_id=1;
    Track track=2;
}

message ArticleResponse {
    string exposure=1;
    int64 time_stamp=2;
    repeated Article recommends=3;
}

// 使用service 定义一组服务
service UserRecommend {
	// 使用rcp 定义被调用的方法(函数)
    rpc user_recommend(UserRequest) returns(ArticleResponse) {}
    // rpc simpla_recommend() returns () {}
}
代码生成

安装protobuf编译器和grpc

pip install grpcio-tools

编译生成代码

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. reco.proto
  • -I表示搜索proto文件中被导入文件的目录
  • --python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的数据类型
  • --grpc_python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的服务类型

toutiao-backend/common/rpc目录下执行上述命令,会自动生成如下两个rpc调用辅助代码模块:

  • reco_pb2.py 保存根据接口定义文件中的数据类型生成的python类
  • reco_pb2_grpc.py 保存根据接口定义文件中的服务方法类型生成的python调用RPC方法

三、服务端代码demo

为了方便看到效果,我们编写补全服务端代码。

注意:此处实际推荐的代码在后续推荐系统课程中会涉及到

toutiao-backend/common/rpc目录下新建server.py文件

import reco_pb2
import reco_pb2_grpc
import grpc
from concurrent.futures import ThreadPoolExecutor
import time


# rpc接口定义中服务对应成Python的类
# 首先补全被调用的函数代码
class UserRecommendService(reco_pb2_grpc.UserRecommendServicer):
	    """
    通过子类继承重写的方式
    """
    # 在接口定义的同名方法中补全,被调用时应该执行的逻辑
    def user_recommend(self, request, context):
    	        """
        在接口中定义的用户推荐方法
        :param request:  调用时的请求参数对象  UserRequest
        :param context:  通过此对象可以设置调用返回的异常信息
        :return:
        """
        # 获取调用的参数
        # request是调用的请求数据对象
        user_id = request.user_id
        channel_id = request.channel_id
        article_num = request.article_num
        time_stamp = request.time_stamp
        
		# 决定调用返回数据
        response = reco_pb2.ArticleResponse()
        response.exposure = 'exposure param'
        response.time_stamp = round(time.time()*1000)
        recommends = []
        for i in range(article_num):
            article = reco_pb2.Article()
            article.track.click = 'click param {}'.format(i+1)
            article.track.collect = 'collect param {}'.format(i+1)
            article.track.share = 'share param {}'.format(i+1)
            article.track.read = 'read param {}'.format(i+1)
            article.article_id = i+1
            recommends.append(article)
         # 注意 对于列表类型的赋值使用extend
        response.recommends.extend(recommends)

        # 最终要返回一个调用结果
        return response

# 创建rpc的服务器
def serve():
    """
    rpc服务端启动方法
    """
    # 创建一个rpc服务器
    server = grpc.server(ThreadPoolExecutor(max_workers=10))

    # 向服务器中添加被调用的服务方法
    reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendService(), server)

    # 微服务器绑定ip地址和端口
    server.add_insecure_port('127.0.0.1:8888')

    # 启动rpc服务,开启服务器运行, start()方法是非阻塞方法
    server.start()

    # start()不会阻塞,此处需要加上循环睡眠 防止程序退出
    while True:
        time.sleep(10)


if __name__ == '__main__':
    serve()

四、编写客户端

在toutiao-backend/common/rpc目录下新建client.py

import grpc
import reco_pb2
import reco_pb2_grpc
import time


def feed_articles(stub):
    # 构建rpc调用的调用参数
    user_request = reco_pb2.UserRequest()
    user_request.user_id = '1'
    user_request.channel_id = 1
    user_request.article_num = 10
    user_request.time_stamp = round(time.time()*1000)

    # 通过stub进行方法调用,并接收调用返回值
        # ret -> ArticleResponse 对象
    ret = stub.user_recommend(user_request)
    print('ret={}'.format(ret))

def run():
    """
    rpc客户端调用的方法
    用with的优点是 断开服务器不需要close,中断with语句就行
    """
    # 使用with语句连接rpc服务器
    # 构建连接rpc服务器的对象
    with grpc.insecure_channel('127.0.0.1:8888') as channel:
        # 创建调用rpc远端服务的辅助对象stub
        stub = reco_pb2_grpc.UserRecommendStub(channel)
        # 通过stub进行rpc调用
        feed_articles(stub)

if __name__ == '__main__':
    run()

五、首页新闻推荐接口编写

toutiao-backend/toutiao/resources/news/article.py中编写

from rpc import reco_pb2, reco_pb2_grpc

class ArticleListResource(Resource):
    """
    获取推荐文章列表数据
    """
    def _feed_articles(self, channel_id, timestamp, feed_count):
        """
        获取推荐文章
        :param channel_id: 频道id
        :param feed_count: 推荐数量
        :param timestamp: 时间戳
        :return: [{article_id, trace_params}, ...], timestamp
        """
        user_request = reco_pb2.UserRequest()
        user_request.user_id = g.user_id or 'annoy'
        user_request.channel_id = channel_id
        user_request.article_num = feed_count
        user_request.time_stamp = round(time.time() * 1000)

        stub = reco_pb2_grpc.UserRecommendStub(current_app.rpc_reco)
               # ret -> ArticleResponse 对象
        ret = stub.user_recommend(user_request)
        return ret.recommends, ret.time_stamp

    def get(self):
        """
        获取文章列表
        """
        qs_parser = RequestParser()
        qs_parser.add_argument('channel_id', type=parser.channel_id, required=True, location='args')
        qs_parser.add_argument('timestamp', type=inputs.positive, required=True, location='args')
        args = qs_parser.parse_args()
        channel_id = args.channel_id
        timestamp = args.timestamp
        per_page = constants.DEFAULT_ARTICLE_PER_PAGE_MIN
        try:
            feed_time = time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(time.time()))
        except Exception:
            return {'message': 'timestamp param error'}, 400

        results = []

        # 获取推荐文章列表
        feeds, pre_timestamp = self._feed_articles(channel_id, timestamp, per_page)

        # 查询文章
        for feed in feeds:
            article = cache_article.ArticleInfoCache(feed.article_id).get()
            if article:
                article['pubdate'] = feed_time
                article['trace'] = {
                    'click': feed.track.click,
                    'collect': feed.track.collect,
                    'share': feed.track.share,
                    'read': feed.track.read
                }
                results.append(article)

        return {'pre_timestamp': pre_timestamp, 'results': results}

以上是关于flask-web ——RPC实际项目业务简析的主要内容,如果未能解决你的问题,请参考以下文章

flask-web Redis缓存实际项目中的应用

flask-web 搜索系统项目实际应用suggest查询实现联想提示自动补全的实现

RPC高性能框架总结3.NIO示例代码编写和简析

Hbase多租户机制简析

flask-web APScheduler 定时任务以及实际应用

Lucene内核索引实现方式简析