Python Module — grpcio gRPC 远程调用示例程序

Posted 范桂飓

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python Module — grpcio gRPC 远程调用示例程序相关的知识,希望对你有一定的参考价值。

目录

文章目录

Python gRPC demo

实现 2 个微服务,调用链关系如下图所示:

  1. Marketplace(图书集市):一个简单的 Web 程序,向用户展示所有图书列表。
  2. Recommendations(图书推荐):一个微服务程序,向用户展示推荐的图书列表。

项目结构:

py3_grpc_demo
├── docker-compose.yml
├── marketplace
│   ├── Dockerfile
│   ├── marketplace.py
│   ├── recommendations_pb2_grpc.py
│   ├── recommendations_pb2.py
│   ├── requirements.txt
│   └── templates
│       └── homepage.html
├── protobufs
│   └── recommendations.proto
└── recommendations
    ├── Dockerfile
    ├── recommendations_pb2_grpc.py
    ├── recommendations_pb2.py
    ├── recommendations.py
    └── requirements.txt

1、使用 Protocol Buffers 来定义 gRPC API

  • py3_grpc_demo/protobufs/recommendations.proto
syntax = "proto3";

enum BookCategory 
    MYSTERY = 0;
    SCIENCE_FICTION = 1;
    SELF_HELP = 2;


message BookRecommendation 
    int32 id = 1;
    string title = 2;


message RecommendationRequest 
    int32 user_id = 1;
    BookCategory category = 2;
    int32 max_results = 3;


message RecommendationResponse 
    repeated BookRecommendation recommendations = 1;


service Recommendations 
    rpc Recommend (RecommendationRequest) returns (RecommendationResponse);

  • gRPC API Service:Recommendations
  • gRPC API Request Message:RecommendationRequest
  • gRPC API Response Message:RecommendationResponse

2、实现 Recommendations 微服务

安装依赖

  • py3_grpc_demo/recommendations/requirements.txt
grpc-interceptor ~= 0.12.0
grpcio-tools ~= 1.30
pytest ~= 5.4
  • 安装
$ cd py3_grpc_demo/recommendations
$ pip3 install -r requirements.txt

基于 .proto 文件生成 gRPC 代码

$ cd py3_grpc_demo/recommendations

$ python3 -m grpc_tools.protoc -I ../protobufs --python_out=. --grpc_python_out=. ../protobufs/recommendations.proto

$ ll
...
-rw-r--r--. 1 root root 2531 128 02:34 recommendations_pb2_grpc.py
-rw-r--r--. 1 root root 8345 128 02:34 recommendations_pb2.py

自动生成的 2 个文件,包含了与 gRPC API 通讯所需要的 Python 类型和函数。注意这 2 个文件都是不可被编辑:

  1. gRPC 消息传输数据类型定义文件:recommendations_pb2_grpc.py
  2. gRPC C/S 基本通讯框架定义文件:recommendations_pb2.py

指令行模拟 gRPC Client

recommendations_pb2.py 文件的可读性很差,通常我们不需要关心这个文件的内容,根据 .proto 文件就能理解 gRPC Client 端所需要的操作内容。

通过从 recommendations_pb2 Module 导入 “请求消息传输数据类型:RecommendationRequest”,就可以构建 Client 需要的 gRPC Request 了。

>>> from recommendations_pb2 import BookCategory, RecommendationRequest
>>> request = RecommendationRequest(
...     user_id=1, category=BookCategory.SCIENCE_FICTION, max_results=3
... )
>>>
>>> request.category
1

值得注意的是,虽然基于 .proto 文件生成的代码里面包含了 API 输入数据的校验,不会接受错误的输入数据类型。使用 Protocol buffers,使得 gRPC 就像是调用本地函数一样简明,但底层实际上是一个网络调用。

但是 proto3 版本中,.proto message 中定义的所有字段是可选的,如果其中某些字段未提供实参,则会填充默认值,例如:数值类型则默认为 0,字符类型默认为 None。如下所示:

>>> request = RecommendationRequest(
...     user_id=1, category=BookCategory.SCIENCE_FICTION)
>>>
>>> request.max_results
0

也就是说,开发者依旧需要自己来校验输入函数的实参列表是否正确的设置了所有数值。

当 Client 基于 recommendations_pb2 Module 构建好了 gRPC Request Msg 之后,就可以再基于 recommendations_pb2_grpc Module 来发送这个 Request 了。

gRPC 会为 .proto 文件中定义的每个 service 定义一个 Stub(存根),例如:RecommendationsStub。这些 Stub 类实现了 Recommend,用于 Send gRPC Request 和 Receive gRPC Response,并且 Request 和 Response 的数据结构在 .proto 文件的 service 定义中也明确了。

>>> import grpc
>>> from recommendations_pb2_grpc import RecommendationsStub
>>> from recommendations_pb2 import BookCategory, RecommendationRequest

>>> channel = grpc.insecure_channel("localhost:50051")
>>> client = RecommendationsStub(channel)
>>> request = RecommendationRequest(
...     user_id=1, category=BookCategory.SCIENCE_FICTION, max_results=3
... )
>>> client.Recommend(request)
Traceback (most recent call last):
...
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
...

上述例子可知,RecommendationsStub 接收一个 gRPC Channel 实例作为实参,并且这个 Channel 会帮的一个 Endpoint(IP:Port)。上述错误的意思是,没有找到一个运行在 localhost:50051(50051 是 gRPC 的标准端口)之上的 gRPC Server。

实现 gRPC Server

Server 端的实现,首先需要一个 gRPC Server 对象,由 gRPC 库来提供。然后还需要将任意个 gRPC Services 对象注册到 gRPC Server 中。示例中的 RecommendationsServicer 根据 .proto 文件中的 service Recommendations 自动生成,在 recommendations_pb2_grpc 模块中被定义,代表一个具体的 API Service。

  • py3_grpc_demo/recommendations/recommendations.py
from concurrent import futures
import random

import grpc
# grpc_interceptor 是 Python 实现的 gRPC 拦截器库。
from grpc_interceptor import ServerInterceptor
from grpc_interceptor import ExceptionToStatusInterceptor

from recommendations_pb2 import (
    BookCategory,
    BookRecommendation,
    RecommendationResponse,
)
import recommendations_pb2_grpc

# 构造测试数据,在实际的微服务项目中,这些数据应该被存放在数据库中。
books_by_category = 
    BookCategory.MYSTERY: [
        BookRecommendation(id=1, title="The Maltese Falcon"),
        BookRecommendation(id=2, title="Murder on the Orient Express"),
        BookRecommendation(id=3, title="The Hound of the Baskervilles"),
    ],
    BookCategory.SCIENCE_FICTION: [
        BookRecommendation(
            id=4, title="The Hitchhiker's Guide to the Galaxy"
        ),
        BookRecommendation(id=5, title="Ender's Game"),
        BookRecommendation(id=6, title="The Dune Chronicles"),
    ],
    BookCategory.SELF_HELP: [
        BookRecommendation(
            id=7, title="The 7 Habits of Highly Effective People"
        ),
        BookRecommendation(
            id=8, title="How to Win Friends and Influence People"
        ),
        BookRecommendation(id=9, title="Man's Search for Meaning"),
    ],



# 定义微服务接口访问拦截器,当存在之前未捕获的异常时,会调用 log_error 方法。
class ErrorLogger(ServerInterceptor):

    def intercept(self, method, request, context, method_name):
        try:
            return method(request, context)
        except Exception as e:
            self.log_error(e)
            raise

    def log_error(self, err):
        # do something...
        pass


# 微服务实现
class RecommendationService(recommendations_pb2_grpc.RecommendationsServicer):

    # 重载父类的 Recommend 方法。
    def Recommend(self, request, context):
        if request.category not in books_by_category:
            context.abort(grpc.StatusCode.NOT_FOUND, "Category not found")

        books_for_category = books_by_category[request.category]
        num_results = min(request.max_results, len(books_for_category))
        books_to_recommend = random.sample(
            books_for_category, num_results
        )

        return RecommendationResponse(recommendations=books_to_recommend)


def serve():
    # 将拦截器添加到 Server,使得所有微服务的请求和返回都会经过拦截器,可以以此来统计有多少请求是错误的。
    # 通过调用 context.abort(),拦截器将为你捕获抛出的异常,无须自行实现。
    interceptors = [ExceptionToStatusInterceptor(), ErrorLogger()]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=2),
                         interceptors=interceptors)
    recommendations_pb2_grpc.add_RecommendationsServicer_to_server(
        RecommendationService(), server
    )
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()


if __name__ == "__main__":
    serve()

基于拦截器(Interceptors)的微服务可观测性

使用 gRPC 来构建微服务程序时,就会要求具备可观测性。你需要监控的项目包括:

  1. 每个微服务收到的请求。
  2. 错误请求,以及错误内容。
  3. 请求延时。
  4. 异常堆栈。
  5. 等。

在上述代码中,我们使用了 grpc-interceptor 来实现拦截器。而在某些场景下,我们可以使用 Service Mesh 来实现拦截器的功能,所有微服务的请求和返回都会经过 Service Mesh 的代理,从代理层面可以自动处理日志或者记录错误次数。为了获取更精确地错误输出,你的微服务应该要设置准确的状态码。

运行测试

  • 启动 gRPC Server:
$ cd py3_grpc_demo/recommendations
$ python recommendations.py
  • Client 发出请求:
>>> import grpc
>>> from recommendations_pb2 import BookCategory, RecommendationRequest
>>> from recommendations_pb2_grpc import RecommendationsStub

>>> channel = grpc.insecure_channel("localhost:50051")
>>> client = RecommendationsStub(channel)

>>> request = RecommendationRequest(
...    user_id=1, category=BookCategory.SCIENCE_FICTION, max_results=3)
>>> client.Recommend(request)
recommendations 
  id: 6
  title: "The Dune Chronicles"

recommendations 
  id: 5
  title: "Ender\\'s Game"

recommendations 
  id: 4
  title: "The Hitchhiker\\'s Guide to the Galaxy"

3、实现 Marketplace Web 程序

安装依赖

  • py3_grpc_demo/marketplace/requirements.txt
flask ~= 1.1
grpcio-tools ~= 1.30
Jinja2 ~= 2.11
pytest ~= 5.4
  • 安装
$ pip3 install -r requirements.txt

基于 .proto 文件生成 gRPC 代码

cd py3_grpc_demo/marketplace
python -m grpc_tools.protoc -I ../protobufs --python_out=. --grpc_python_out=. ../protobufs/recommendations.proto

实现 Marketplace Web Server 和 gRPC Client

$ vi py3_grpc_demo/marketplace/marketplace.py

import os

from flask import Flask, render_template
import grpc

from recommendations_pb2 import BookCategory, RecommendationRequest
from recommendations_pb2_grpc import RecommendationsStub


app = Flask(__name__)

# 通过环境变量来获取 Server IP 地址。
recommendations_host = os.getenv("RECOMMENDATIONS_HOST", "localhost")
recommendations_channel = grpc.insecure_channel(
    f"recommendations_host:50051"
)
recommendations_client = RecommendationsStub(recommendations_channel)


@app.route("/")
def render_homepage():
    recommendations_request = RecommendationRequest(
        user_id=1, category=BookCategory.MYSTERY, max_results=3
    )
    recommendations_response = \\
        recommendations_client.Recommend(recommendations_request)
    return render_template(
        "homepage.html",
        recommendations=recommendations_response.recommendations,
    )
vi py3_grpc_demo/marketplace/templates/homepage.html

<!-- homepage.html -->
<!doctype html>
<html lang="en">
<head>
    <title>Online Books For You</title>
</head>
<body>
    <h1>Mystery books you may like</h1>
    <ul>
    % for book in recommendations %
        <li> book.title </li>
    % endfor %
    </ul>
</body>

运行测试

  • 启动:Marketplace REST API Server
FLASK_APP=marketplace.py flask run -h 0.0.0.0
  • 访问:http://server_ip:5000

4、容器化部署

Dockerfile

  • py3_grpc_demo/recommendations/Dockerfile
FROM python

RUN mkdir /service
COPY protobufs/ /service/protobufs/
COPY recommendations/ /service/recommendations/
WORKDIR /service/recommendations
RUN python -m pip install --upgrade pip
RUN python -m pip install -r requirements.txt
RUN python -m grpc_tools.protoc -I ../protobufs --python_out=. \\
           --grpc_python_out=. ../protobufs/recommendations.proto

EXPOSE 50051
ENTRYPOINT [ "python", "recommendations.py" ]
  • py3_grpc_demo/marketplace/Dockerfile
FROM python

RUN mkdir /service
COPY protobufs/ /service/protobufs/
COPY marketplace/ /service/marketplace/
WORKDIR /service/marketplace
RUN python -m pip install --upgrade pip
RUN python -m pip install -r requirements.txt
RUN python -m grpc_tools.protoc -I ../protobufs --python_out=. \\
           --grpc_python_out=. ../protobufs/recommendations.proto

EXPOSE 5000
ENV FLASK_APP=marketplace.py
ENTRYPOINT [ "flask", "run", "--host=0.0.0.0"]
  • py3_grpc_demo/docker-compose.yml
version: "3.3"

services:

    marketplace:
        build:
            context: .
            dockerfile: marketplace/Dockerfile
        environment:
            RECOMMENDATIONS_HOST: recommendations
        image: marketplace
        networks:
            - microservices
        ports:
            - 5000:5000

    recommendations:
        build:
            context: .
            dockerfile: recommendations/Dockerfile
        image: recommendations
        networks:
            - microservices

networks:
    microservices:

运行测试

docker-compose up

参考文档

https://sunqi.site/posts/python-microservices-grpc-6/

以上是关于Python Module — grpcio gRPC 远程调用示例程序的主要内容,如果未能解决你的问题,请参考以下文章

Python 协程 greenlet

用greenlet实现Python中的并发

Python 协程

在Python中使用gRPC的方法示例h

python 协程之Greenlet

Bigquery 存储 API 多处理段错误