gRPC框架学习Note 1
Posted 智辉NOTE
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了gRPC框架学习Note 1相关的知识,希望对你有一定的参考价值。
1. RPC
1.1 RPC是什么?
从单机走向分布式,产生了很多分布式的通信方式:
最古老也是最有效,并且永不过时的,TCP/UDP的二级制传输。事实上,所有的通信方式归根到底都是TCP/UDP(用户数据报协议).
CORBA(common Object request Broker Architecture),比较古老而复杂的协议,支持面向对象的通信协议;
Web Service(SOA SOAP RDDI WSDL...) 基于http+xml的标准化Web API
SOA: 面向服务的架构,xml大型,传输成本大
RestFul(Representational State Transfer):回归简单化本源的Web API的事实标准,http+json
RMI: Remote Method Invocation(Java 内部的分布式通信协议)
JMS Java Message Service:Java EE中的消息框架标准,为很多MQ所支持
RPC(Remote Procedure Call):远程进程(方法)调用,这只是一个统称,重点在于方法调用(不支持对象的概念)。具体实现甚至可以用RMI RestFul等去实现,但是一般不用,因为RMI不能跨语言,而RestFul效率太低。
多用于服务器集群间的通信,因此常使用更加高效、短小精悍的传输模式以提高效率(thrift,Hessian,protobuf)
RCP只是一个概念,不是一个框架,也不是一个协议,这个概念的实现有好多种。
从单机到分布式-->产生了很多分布式通信协议,有广域网也有局域网上的-->最基本:二进制数据传输(高电平、低电平,5V和0V信号) TCP/IP;
RPC就是一种概念,有各种各样的实现。
1.2 RPC 01 最原始的方式
UserServiceImpl.java
import common.IUserService;
import common.User;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 13:20
* filename : null.java
* description:
*/
public class UserServiceImpl implements IUserService {
/*
最核心的业务逻辑在这里实现,并且返回需要的数据
*/
public User findUserById(Integer id) {
User user = new User(id, "");
return user;
}
}
Server.java
import com.sun.org.apache.bcel.internal.generic.NEW;
import common.User;
import sun.reflect.generics.scope.Scope;
import javax.sql.rowset.Predicate;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 13:20
* filename : null.java
* description:
*/
public class Server {
public static boolean running = true;
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8888);
while (running){
Socket accept = serverSocket.accept();
process(accept);
accept.close();
}
serverSocket.close();
}
public static void process(Socket s) throws IOException {
InputStream inputStream = s.getInputStream();
OutputStream outputStream = s.getOutputStream();
DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
int id = dataInputStream.readInt();
UserServiceImpl userService = new UserServiceImpl();
User user = userService.findUserById(id);
// 把属性写出去
dataOutputStream.writeInt(user.getId());
dataOutputStream.writeUTF(user.getName());
dataOutputStream.flush();
}
}
client.java
import common.User;
import sun.reflect.generics.scope.Scope;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 13:20
* filename : null.java
* description:
*/
public class Client {
public static void main(String[] args) throws Exception{
Socket socket = new Socket("127.0.0.1", 8888);
// 转化成0101的二进制格式传输
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
dataOutputStream.writeInt(123);
socket.getOutputStream().write(byteArrayOutputStream.toByteArray());
socket.getOutputStream().flush();
DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
int id = dataInputStream.readInt();
String name = dataInputStream.readUTF();
User user = new User(id, name);
System.out.println(user);
dataOutputStream.close();
socket.close();
}
}
比较麻烦,需要了解传输对象的一切细节才可以把对象传的过去,拿的回来。
传输过程和业务逻辑代码混在一起,不断有新的对象,product等等加入,每个对象的属性也在不断变动。
1.3 RPC 02
client把网络部分单独出来,用一个代理stub来完成
client.java
public class Client {
public static void main(String[] args) throws Exception{
// 把网络传输的部分拿出去
// 给定义一个代理处理方式
Stub stub = new Stub();
System.out.println(stub.findUserById(123));
}
}
stub.java
在原始的方式中,client端的代理叫stub,服务端的代理叫skeleton,这个类就是一个代理,帮忙屏蔽了一些细节。
import common.User;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 14:48
* filename : null.java
* description:
*/
public class Stub {
// 其实是一个代理类
// 在原始的方式中,client端的代理叫stub,服务端的代理叫skeleton
public User findUserById(Integer id) throws Exception{
Socket socket = new Socket("127.0.0.1", 8888);
// 转化成0101的二进制格式传输
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
dataOutputStream.writeInt(id);
socket.getOutputStream().write(byteArrayOutputStream.toByteArray());
socket.getOutputStream().flush();
DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
int id1 = dataInputStream.readInt();
String name = dataInputStream.readUTF();
User user = new User(id1, name);
dataOutputStream.close();
socket.close();
return user;
}
}
但是stub只能返回一个方法,一个类,因此功能太不完善,太不灵活了。
1.4 RPC 03
代理模式里面的动态代理,是最难的。
client.java
需要哪个接口,返回改接口的实现类;
import common.IUserService;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 14:48
* filename : null.java
* description:
*/
public class Client {
public static void main(String[] args) throws Exception{
// 把网络传输的部分拿出去
// 给定义一个代理处理方式
// 代理返回这样一个类
// 有一个类实现了UserService的接口, 接口的实现类,一定实现了findUserById这个方法
// 动态产生
IUserService service = Stub.getStub();
System.out.println(service.findUserById(123));
}
}
Stub.java
import common.IUserService;
import common.User;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 15:06
* filename : null.java
* description:
*/
public class Stub {
// 这种方法解决了方法增加的问题
public static IUserService getStub(){
InvocationHandler h = new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket("127.0.0.1", 8888);
// 转化成0101的二进制格式传输
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
dataOutputStream.writeInt(123);
socket.getOutputStream().write(byteArrayOutputStream.toByteArray());
socket.getOutputStream().flush();
DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
int id1 = dataInputStream.readInt();
String name = dataInputStream.readUTF();
User user = new User(id1, name);
dataOutputStream.close();
socket.close();
return user;
}
};
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[] {IUserService.class}, h);
return (IUserService)o;
}
}
IUservice ---实现类---动态产生---通过动态代理动态产生---如何通过动态代理产生
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[] {IUserService.class}, h);
// 代理类的对象
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[] {IUserService.class}, h);
System.out.println(o.getClass().getName());
System.out.println(o.getClass().getInterfaces()[0]);
// 输出
com.sun.proxy.$Proxy0
interface common.IUserService
User{id=123, name=''}
缺陷:
无论调用任何方法,都写了123进去。
1.5 RPC 04(对同一个接口任意方法的支持)
调用接口中的其他方法时,需要传递不同的参数。
server.java
public class Server {
public static boolean running = true;
public static void main(String[] args) throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
ServerSocket serverSocket = new ServerSocket(8888);
while (running){
Socket accept = serverSocket.accept();
process(accept);
accept.close();
}
serverSocket.close();
}
public static void process(Socket s) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
InputStream inputStream = s.getInputStream();
OutputStream outputStream = s.getOutputStream();
ObjectInputStream dataInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream dataOutputStream = new ObjectOutputStream(outputStream);
String methodName = dataInputStream.readUTF();
Class[] parameterTypes = (Class[])dataInputStream.readObject();
Object[] args = (Object[])dataInputStream.readObject();
// 通过反射的方式找到这个方法
UserServiceImpl userService = new UserServiceImpl();
Method method = userService.getClass().getMethod(methodName, parameterTypes)
User user = (User) method.invoke(userService, args);
// 把属性写出去
dataOutputStream.writeInt(user.getId());
dataOutputStream.writeUTF(user.getName());
dataOutputStream.flush();
}
}
stub.java
// 可以提供同一个接口中对很多方法的支持
public class Stub {
public static IUserService getStub(){
// 无论任何方法,用通用的版本实现
InvocationHandler h = new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket("127.0.0.1", 8888);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
String stringName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 通过网络发送方法名到服务器端
objectOutputStream.writeUTF(stringName);
// 可能有重载,讲方法类型也写过去
objectOutputStream.writeObject(parameterTypes);
objectOutputStream.writeObject(args);
objectOutputStream.flush();
// 处于监听状态
DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
int id1 = dataInputStream.readInt();
String name = dataInputStream.readUTF();
User user = new User(id1, name);
objectOutputStream.close();
socket.close();
return user;
}
};
// 代理类的对象
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[] {IUserService.class}, h);
System.out.println(o.getClass().getName());
System.out.println(o.getClass().getInterfaces()[0]);
return (IUserService)o;
}
}
缺点:没办法实现对很多个接口随意方法的支持
1.6 RPC 05(返回Object封装, 支持任意类型)
server.java
直接将对象返回
public class Server {
public static boolean running = true;
public static void main(String[] args) throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
ServerSocket serverSocket = new ServerSocket(8888);
while (running){
Socket accept = serverSocket.accept();
process(accept);
accept.close();
}
serverSocket.close();
}
public static void process(Socket s) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
InputStream inputStream = s.getInputStream();
OutputStream outputStream = s.getOutputStream();
ObjectInputStream dataInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream dataOutputStream = new ObjectOutputStream(outputStream);
String methodName = dataInputStream.readUTF();
Class[] parameterTypes = (Class[])dataInputStream.readObject();
Object[] args = (Object[])dataInputStream.readObject();
// 通过反射的方式找到这个方法
UserServiceImpl userService = new UserServiceImpl();
Method method = userService.getClass().getMethod(methodName, parameterTypes);
User user = (User) method.invoke(userService, args);
// 把属性写出去
dataOutputStream.writeObject(user);
dataOutputStream.flush();
}
}
stub.java
public class Stub {
public static IUserService getStub(){
// 无论任何方法,用通用的版本实现
InvocationHandler h = new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket("127.0.0.1", 8888);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
String stringName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 通过网络发送方法名到服务器端
objectOutputStream.writeUTF(stringName);
// 可能有重载,讲方法类型也写过去
objectOutputStream.writeObject(parameterTypes);
objectOutputStream.writeObject(args);
objectOutputStream.flush();
// 处于监听状态, 接受服务器返回的对象
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
User user = (User) objectInputStream.readObject();
objectOutputStream.close();
socket.close();
return user;
}
};
// 代理类的对象
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[] {IUserService.class}, h);
System.out.println(o.getClass().getName());
System.out.println(o.getClass().getInterfaces()[0]);
return (IUserService)o;
}
}
1.7 RPC 06(生成更多类型的代理)
client端只能拿IUService这样一个接口实现类,其他的不能拿;
通过Stub.getStub拿到任意类型的对象。
首先考虑,系统向外提供什么样的接口,站在用户的角度。
server.java
public class Server {
public static boolean running = true;
public static void main(String[] args) throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
ServerSocket serverSocket = new ServerSocket(8888);
while (running){
Socket accept = serverSocket.accept();
process(accept);
accept.close();
}
serverSocket.close();
}
public static void process(Socket s) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException {
InputStream inputStream = s.getInputStream();
OutputStream outputStream = s.getOutputStream();
ObjectInputStream dataInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream dataOutputStream = new ObjectOutputStream(outputStream);
String clazzName = dataInputStream.readUTF();
String methodName = dataInputStream.readUTF();
Class[] parameterTypes = (Class[])dataInputStream.readObject();
Object[] args = (Object[])dataInputStream.readObject();
Class clazz = null;
// 从服务注册列表中找到具体的类
clazz = rpc06.UserServiceImpl.class;
Method method = clazz.getClass().getMethod(methodName, parameterTypes);
Object user = (Object) method.invoke(clazz.newInstance(), args);
// 把属性写出去
dataOutputStream.writeObject(user);
dataOutputStream.flush();
}
}
client.java
public class Client {
public static void main(String[] args) throws Exception{
// 有一个类实现了UserService的接口, 接口的实现类,一定实现了findUserById这个方法
// 传递任意的对象的类
IUserService service = (IUserService) Stub.getStub(IUserService.class);
System.out.println(service.findUserById(123));
}
}
stub.java
public class Stub {
public static Object getStub(final Class clazz){
// 无论任何方法,用通用的版本实现
InvocationHandler h = new InvocationHandler() {
// 实现了这个动态代理类的对象
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket("127.0.0.1", 8888);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
// 把接口的名字也要一起传过去
String className = clazz.getName();
String stringName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 通过网络发送方法名到服务器端
objectOutputStream.writeUTF(stringName);
// 可能有重载,讲方法类型也写过去
objectOutputStream.writeObject(parameterTypes);
objectOutputStream.writeObject(args);
objectOutputStream.flush();
// 处于监听状态
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
User user = (User) objectInputStream.readObject();
objectOutputStream.close();
socket.close();
return user;
}
};
// 代理类的对象
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[] {IUserService.class}, h);
System.out.println(o.getClass().getName());
System.out.println(o.getClass().getInterfaces()[0]);
return (IUserService)o;
}
}
1.8 RPC 07 添加其他接口的其他方法
client.java
使用不同的接口来实现;
ublic class Client {
public static void main(String[] args) throws Exception{
// 有一个类实现了UserService的接口, 接口的实现类,一定实现了findUserById这个方法
// 传递任意的对象的类
IProductService service = (IProductService) Stub.getStub(IProductService.class);
System.out.println(service.findProductById(123));
}
}
这就是RPC.
通过网络传输,一段需要转成二进制。另一端需要把二进制解析成我要处理的对象。(序列化和反序列化)。目前用的序列化方式是java自带的Serialisable,只支持java语言,而且效率很低,序列化后的二进制数据还特别长。
☆RPC序列化框架
RPC必须进行序列化。
序列化的一些协议,用在rpc的序列化层面。
java.io.Serializable
Hessian
google protobuf
facebook thrift
kyro
fst
json 序列化框架
Jackson
google Gson
Ali FastJson
xmlrpc(xstream)
...
1.9 Hessian序列化方式
manven 依赖
<dependencies>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.38</version>
</dependency>
</dependencies>
序列化方式:
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import common.User;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 19:38
* filename : null.java
* description:
*/
public class HelloHessian {
public static void main(String[] args) throws Exception {
User user = new User(1, "zhangdan");
byte[] bytes = serialize(user);
System.out.println(bytes.length);
}
public static byte[] serialize(Object o) throws Exception{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Hessian2Output output = new Hessian2Output(baos);
output.writeObject(o);
output.flush();
byte[] bytes = baos.toByteArray();
baos.close();
output.close();
return bytes;
}
public static Object deserialize(byte[] bytes) throws Exception{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Hessian2Input input = new Hessian2Input(byteArrayInputStream);
Object o = input.readObject();
byteArrayInputStream.close();
input.close();
return o;
}
}
相同的对象,使用hessian进行序列化比使用jdk进行序列化小的多,并且很快。
☆RPC通信协议
前面的数据传输只是用了tcp/ip协议,也可以用http2.x协议进行传输。
不同的网络协议,在实际使用中,根据需求进行定制。
http
http2.0(grpc)
TCP
同步/异步 阻塞/非阻塞
Web Service
在https上传,就相当于加密了
1.10 使用Hessian作为序列化方式
HessionUtils.py
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 20:41
* filename : null.java
* description:
*/
public class HessianUtils {
public static byte[] serialize(Object o) throws Exception{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Hessian2Output output = new Hessian2Output(baos);
output.writeObject(o);
output.flush();
byte[] bytes = baos.toByteArray();
baos.close();
output.close();
return bytes;
}
public static Object deserialize(byte[] bytes) throws Exception{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Hessian2Input input = new Hessian2Input(byteArrayInputStream);
Object o = input.readObject();
byteArrayInputStream.close();
input.close();
return o;
}
}
2. 以gRPC为例了解RPC用法
2.1 基本概念
将远程调用和本地调用,在调用过程上进行统一化。
grpc的传输使用了http协议。
grpc和thrift都是rpc的一种实现方式;
netty作为网络传输的框架。netty--->NIO--->syscall 系统调用;
2.2 gRPC
public void add(a.b.c.AddRequest request,
io.grpc.stub.StreamObserver<a.b.c.AddReply> responseObserver) {
asyncUnimplementedUnaryCall(getAddMethod(), responseObserver);
}
形参和返回值都作为对象传入。
In gRPC, a client application can directly call a method on a server application on a different machine as if it were a local object, making it easier for you to create distributed applications and services. As in many RPC systems, gRPC is based around the idea of defining a service, specifying the methods that can be called remotely with their parameters and return types.On the server side, the server implements this interface and runs a gRPC server to handle client calls. On the client side, the client has a stub (referred to as just a client in some languages) that provides the same methods as the server.
3 gRPC的系统调用过程
官网:https://www.grpc.io/docs/what-is-grpc/introduction/
message Person {
string name = 1;
int32 id = 2;
bool has_ponycopter = 3;
}
// The greeter service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
3.1 gRPC是怎么实现方法调用的
3.1.1 分析pb生成的对应文件
service AddService{
rpc add(AddRequest) returns (AddReply){}
}
编译就会生成一个AddServiceGrpc.java文件
public final class AddServiceGrpc {
private AddServiceGrpc() {}
public static final String SERVICE_NAME = "grpc.AddService";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<a.b.c.AddRequest,
a.b.c.AddReply> getAddMethod;
public static io.grpc.MethodDescriptor<a.b.c.AddRequest,
a.b.c.AddReply> getAddMethod();
/**
* Creates a new async stub that supports all call types for the service
*/
public static AddServiceStub newStub(io.grpc.Channel channel);
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static AddServiceBlockingStub newBlockingStub(
io.grpc.Channel channel);
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static AddServiceFutureStub newFutureStub(
io.grpc.Channel channel);
public static abstract class AddServiceImplBase implements io.grpc.BindableService;
public static final class AddServiceBlockingStub extends io.grpc.stub.AbstractBlockingStub<AddServiceBlockingStub>;
public static final class AddServiceFutureStub extends io.grpc.stub.AbstractFutureStub<AddServiceFutureStub>;
private static final int METHODID_ADD = 0;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp>;
private static abstract class AddServiceBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier;
private static final class AddServiceFileDescriptorSupplier
extends AddServiceBaseDescriptorSupplier;
private static final class AddServiceMethodDescriptorSupplier
extends AddServiceBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier;
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor()
}
包含Add方法的实现基类:
public static abstract class AddServiceImplBase implements io.grpc.BindableService {
/**
*/
public void add(a.b.c.AddRequest request,
io.grpc.stub.StreamObserver<a.b.c.AddReply> responseObserver) {
asyncUnimplementedUnaryCall(getAddMethod(), responseObserver);
}
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getAddMethod(),
asyncUnaryCall(
new MethodHandlers<
a.b.c.AddRequest,
a.b.c.AddReply>(
this, METHODID_ADD)))
.build();
}
}
AddServer.java
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 22:33
* filename : null.java
* description:
*/
public class AddServer extends AddServiceGrpc.AddServiceImplBase{
public static void main(String[] args) throws IOException {
// 把接口暴露出来进行监听
// 如果端口被占用,就会抛出异常。
Server server = ServerBuilder.forPort(8799)
.addService(new AddServer())
.build();
server.start();
System.out.println("server started at 9999");
while (true){
}
}
public void add(a.b.c.AddRequest request,
io.grpc.stub.StreamObserver<a.b.c.AddReply> responseObserver) {
int a = request.getA();
int b = request.getB();
int res = myAdd(a, b);
responseObserver.onNext(AddReply.newBuilder().setRes(res).build());
responseObserver.onCompleted();
}
private int myAdd(int a, int b){
return a + b;
}
}
AddClient.java
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
/**
* author : chenguanghui
* date : 2021-05-04
* time : 22:44
* filename : null.java
* description:
*/
public class AddClient {
ManagedChannel channel = null;
AddServiceGrpc.AddServiceBlockingStub stub;
public static void main(String[] args) {
int a = 1;
int b = 1;
AddClient addClient = new AddClient();
AddReply add = addClient.stub.add(AddRequest.newBuilder().setA(a).setB(b).build());
System.out.println(add.getRes());
}
public AddClient(){
channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8799)
.usePlaintext()
.build();
// 针对这个channel创建一个stub
stub = AddServiceGrpc.newBlockingStub(channel);
}
}
java_package (file option): 用于生成的Java类的包。如果.proto文件中没有给出明确的Java _ package选项,那么默认情况下将使用proto package (在.proto文件中使用“package”关键字指定)。然而,proto包通常不能成为好的Java包,因为proto包不应该以反向域名开始。如果不生成Java代码,则此选项无效。
option java_package = "com.example.foo";
java_multiple_files (file option): 导致顶层消息、枚举和服务在包级别定义,而不是在以.proto文件命名的外部类中定义。
option java_multiple_files = true;
java_outer_classname (file option): 要生成的最外面的Java类的类名(因此也是文件名)。如果.proto文件中未指定显式 java_outer_classname,则类名将通过转换来构造。原型文件名转换为ccamel-case(因此foo_bar.proto变成了FooBar.java)。如果不生成Java代码,则此选项无效。
option java_outer_classname = "Ponycopter";
client端调用了invoke方法
3.2 认证方式
SSL/TSL认证方式
基于google 的token认证方式
3.2.1 client-side SSL/TSL
// Create a default SSL ChannelCredentials object.
auto channel_creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
// Create a channel using the credentials created in the previous step.
auto channel = grpc::CreateChannel(server_name, channel_creds);
// Create a stub on the channel.
std::unique_ptr<Greeter::Stub> stub(Greeter::NewStub(channel));
// Make actual RPC calls on the stub.
grpc::Status s = stub->sayHello(&context, *request, response);
3.2.2 server-side
auto channel_creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
auto channel = grpc::CreateChannel("myservice.example.com", channel_creds);
std::unique_ptr<Greeter::Stub> stub(Greeter::NewStub(channel));
3.3 BenchMarking 性能测试
Using the API
Starting from a service definition in a .proto
file, gRPC provides protocol buffer compiler plugins that generate client- and server-side code. gRPC users typically call these APIs on the client side and implement the corresponding API on the server side.
On the server side, the server implements the methods declared by the service and runs a gRPC server to handle client calls. The gRPC infrastructure decodes incoming requests, executes service methods, and encodes service responses.
On the client side, the client has a local object known as stub (for some languages, the preferred term is client) that implements the same methods as the service. The client can then just call those methods on the local object, wrapping the parameters for the call in the appropriate protocol buffer message type - gRPC looks after sending the request(s) to the server and returning the server’s protocol buffer response(s).
Synchronous vs. asynchronous
Synchronous RPC calls that block until a response arrives from the server are the closest approximation to the abstraction of a procedure call that RPC aspires to. On the other hand, networks are inherently asynchronous and in many scenarios it’s useful to be able to start RPCs without blocking the current thread.
The gRPC programming API in most languages comes in both synchronous and asynchronous flavors. You can find out more in each language’s tutorial and reference documentation (complete reference docs are coming soon)
3.4 RPC life cycle
In this section, you’ll take a closer look at what happens when a gRPC client calls a gRPC server method. For complete implementation details, see the language-specific pages.
Unary RPC(一元形式)
First consider the simplest type of RPC where the client sends a single request and gets back a single response.
Once the client calls a stub method, the server is notified that the RPC has been invoked with the client’s metadata for this call, the method name, and the specified deadline if applicable.
The server can then either send back its own initial metadata (which must be sent before any response) straight away, or wait for the client’s request message. Which happens first, is application-specific.
Once the server has the client’s request message, it does whatever work is necessary to create and populate a response. The response is then returned (if successful) to the client together with status details (status code and optional status message) and optional trailing metadata.
If the response status is OK, then the client gets the response, which completes the call on the client side.
比较详细的三次握手、四次挥手的过程。
metadata包含于client请求server的 method name ,method parameterType, args等等 还有deadline(可以允许的延迟)。
当client请求Stub中的方法时,client 的metadata会invoke RPC ---> 通知Server端;
在接受请求之前,Server端使用自己的初始化metadata 回馈给client端;(确认连接建立)
接受请求并处理,状态码等;
如果请求status是OK, client端关闭连接。
Server streaming RPC
A server-streaming RPC is similar to a unary RPC, except that the server returns a stream of messages in response to a client’s request. After sending all its messages, the server’s status details (status code and optional status message) and optional trailing metadata are sent to the client. This completes processing on the server side. The client completes once it has all the server’s messages.
Client streaming RPC
A client-streaming RPC is similar to a unary RPC, except that the client sends a stream of messages to the server instead of a single message. The server responds with a single message (along with its status details and optional trailing metadata), typically but not necessarily after it has received all the client’s messages.
Bidirectional streaming RPC
In a bidirectional streaming RPC, the call is initiated by the client invoking the method and the server receiving the client metadata, method name, and deadline. The server can choose to send back its initial metadata or wait for the client to start streaming messages.
Client- and server-side stream processing is application specific. Since the two streams are independent, the client and server can read and write messages in any order. For example, a server can wait until it has received all of a client’s messages before writing its messages, or the server and client can play “ping-pong” – the server gets a request, then sends back a response, then the client sends another request based on the response, and so on.
Deadlines/Timeouts
gRPC allows clients to specify how long they are willing to wait for an RPC to complete before the RPC is terminated with a DEADLINE_EXCEEDED
error. On the server side, the server can query to see if a particular RPC has timed out, or how much time is left to complete the RPC.
Specifying a deadline or timeout is language specific: some language APIs work in terms of timeouts (durations of time), and some language APIs work in terms of a deadline (a fixed point in time) and may or may not have a default deadline.
RPC termination
In gRPC, both the client and server make independent and local determinations of the success of the call, and their conclusions may not match. This means that, for example, you could have an RPC that finishes successfully on the server side (“I have sent all my responses!") but fails on the client side (“The responses arrived after my deadline!"). It’s also possible for a server to decide to complete before a client has sent all its requests.
Cancelling an RPC
Either the client or the server can cancel an RPC at any time. A cancellation terminates the RPC immediately so that no further work is done.
Warning
Changes made before a cancellation are not rolled back.
Metadata(k-v数据结构形式)
Metadata is information about a particular RPC call (such as authentication details) in the form of a list of key-value pairs, where the keys are strings and the values are typically strings, but can be binary data. Metadata is opaque to gRPC itself - it lets the client provide information associated with the call to the server and vice versa.
Access to metadata is language dependent.
Channels
A gRPC channel provides a connection to a gRPC server on a specified host and port. It is used when creating a client stub. Clients can specify channel arguments to modify gRPC’s default behavior, such as switching message compression on or off. A channel has state, including connected
and idle
.
How gRPC deals with closing a channel is language dependent. Some languages also permit querying channel state.
3.5 proto编码过程
message:
message People{
bool male = 1;
int32 age = 2;
string address = 3;
}
生成的结果:
[1000 1 10000 101000 11010 1010 100011 .....]
3.6 grpclog-设计一款适合业务的日志库
以上是关于gRPC框架学习Note 1的主要内容,如果未能解决你的问题,请参考以下文章