聊一聊 gRPC 的四种通信模式
Posted _江南一点雨
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊一聊 gRPC 的四种通信模式相关的知识,希望对你有一定的参考价值。
温馨提示:本文需要结合上一篇 gRPC 文章一起食用,否则可能看不懂。
前面一篇文章松哥和大家聊了 gRPC 的基本用法,今天我们再来稍微深入一点点,来看下 gRPC 中四种不同的通信模式。
gRPC 中四种不同的通信模式分别是:
- 一元 RPC
- 服务端流 RPC
- 客户端流 RPC
- 双向流 RPC
接下来松哥就通过四个完整的案例,来分别和向伙伴们演示这四种不同的通信模式。
1. 准备工作
关于 gRPC 的基础知识我们就不啰嗦了,咱们直接来看我今天的 proto 文件,如下:
这次我新建了一个名为 book.proto 的文件,这里主要定义了一些图书相关的方法,如下:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.javaboy.grpc.demo";
option java_outer_classname = "BookServiceProto";
import "google/protobuf/wrappers.proto";
package book;
service BookService
rpc addBook(Book) returns (google.protobuf.StringValue);
rpc getBook(google.protobuf.StringValue) returns (Book);
rpc searchBooks(google.protobuf.StringValue) returns (stream Book);
rpc updateBooks(stream Book) returns (google.protobuf.StringValue);
rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet);
message Book
string id = 1;
repeated string tags = 2;
string name = 3;
float price = 4;
string author = 5;
message BookSet
string id = 1;
repeated Book bookList = 3;
这个文件中,有一些内容我们在上篇文章中都讲过了,讲过的我就不再重复了,我说一些上篇文章没有涉及到的东西:
- 由于我们在这个文件中,引用了 Google 提供的 StringValue(
google.protobuf.StringValue
),所以这个文件上面我们首先用 import 导入相关的文件,导入之后,才可以使用。 - 在方法参数和返回值中出现的 stream,就表示这个方法的参数或者返回值是流的形式(其实就是数据可以多次传输)。
- message 中出现了一个上篇文章没有的关键字 repeated,这个表示这个字段可以重复,可以简单理解为这就是我们 Java 中的数组。
好了,和上篇文章相比,本文主要就是这几个地方不一样。
proto 文件写好之后,按照上篇文章介绍的方法进行编译,生成对应的代码,这里就不再重复了。
2. 一元 RPC
一元 RPC 是一种比较简单的 RPC 模式,其实说白了我们上篇文章和大家介绍的就是一种一元 RPC,也就是客户端发起一个请求,服务端给出一个响应,然后请求结束。
上面我们定义的五个方法中,addBook 和 getBook 都算是一种一元 RPC。
2.1 addBook
先来看 addBook 方法,这个方法的逻辑很简单,我们提前在服务端准备一个 Map 用来保存 Book,addBook 调用的时候,就把 book 对象存入到 Map 中,并且将 book 的 ID 返回,大家就这样一件事,来看看服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase
private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl()
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
@Override
public void addBook(Book request, StreamObserver<StringValue> responseObserver)
bookMap.put(request.getId(), request);
responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build());
responseObserver.onCompleted();
看过上篇文章的小伙伴,我觉得这段代码应该很好理解。
客户端调用方式如下:
public class BookServiceClient
public static void main(String[] args) throws InterruptedException
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
addBook(stub);
private static void addBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException
CountDownLatch countDownLatch = new CountDownLatch(1);
stub.addBook(Book.newBuilder().setPrice(99).setId("100").setName("java").setAuthor("javaboy").build(), new StreamObserver<StringValue>()
@Override
public void onNext(StringValue stringValue)
System.out.println("stringValue.getValue() = " + stringValue.getValue());
@Override
public void onError(Throwable throwable)
@Override
public void onCompleted()
countDownLatch.countDown();
System.out.println("添加完毕");
);
countDownLatch.await();
这里我使用了 CountDownLatch 来实现线程等待,等服务端给出响应之后,客户端再结束。这里在回调的 onNext 方法中,我们就可以拿到服务端的返回值。
2.2 getBook
getBook 跟上面的 addBook 类似,先来看服务端代码,如下:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase
private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl()
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
@Override
public void getBook(StringValue request, StreamObserver<Book> responseObserver)
String id = request.getValue();
Book book = bookMap.get(id);
if (book != null)
responseObserver.onNext(book);
responseObserver.onCompleted();
else
responseObserver.onCompleted();
这个 getBook 就是根据客户端传来的 id,从 Map 中查询到一个 Book 并返回。
客户端调用代码如下:
public class BookServiceClient
public static void main(String[] args) throws InterruptedException
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
getBook(stub);
private static void getBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException
CountDownLatch countDownLatch = new CountDownLatch(1);
stub.getBook(StringValue.newBuilder().setValue("2").build(), new StreamObserver<Book>()
@Override
public void onNext(Book book)
System.out.println("book = " + book);
@Override
public void onError(Throwable throwable)
@Override
public void onCompleted()
countDownLatch.countDown();
System.out.println("查询完毕");
);
countDownLatch.await();
小伙伴们大概也能看出来,addBook 和 getBook 基本上操作套路是一模一样的。
3. 服务端流 RPC
前面的一元 RPC,客户端发起一个请求,服务端给出一个响应,请求就结束了。服务端流则是客户端发起一个请求,服务端给一个响应序列,这个响应序列组成一个流。
上面我们给出的 searchBook 就是这样一个例子,searchBook 是传递图书的 tags 参数,然后在服务端查询哪些书的 tags 满足条件,将满足条件的书全部都返回去。
我们来看下服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase
private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl()
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
@Override
public void searchBooks(StringValue request, StreamObserver<Book> responseObserver)
Set<String> keySet = bookMap.keySet();
String tags = request.getValue();
for (String key : keySet)
Book book = bookMap.get(key);
int tagsCount = book.getTagsCount();
for (int i = 0; i < tagsCount; i++)
String t = book.getTags(i);
if (t.equals(tags))
responseObserver.onNext(book);
break;
responseObserver.onCompleted();
小伙伴们看下,这段 Java 代码应该很好理解:
- 首先从 request 中提取客户端传来的 tags 参数。
- 遍历 bookMap,查看每一本书的 tags 是否等于客户端传来的 tags,如果相等,说明添加匹配,则通过
responseObserver.onNext(book);
将这本书写回到客户端。 - 等所有操作都完成后,执行
responseObserver.onCompleted();
,表示服务端的响应序列结束了,这样客户端也就知道请求结束了。
我们来看看客户端的代码,如下:
public class BookServiceClient
public static void main(String[] args) throws InterruptedException
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
searchBook(stub);
private static void searchBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException
CountDownLatch countDownLatch = new CountDownLatch(1);
stub.searchBooks(StringValue.newBuilder().setValue("明清小说").build(), new StreamObserver<Book>()
@Override
public void onNext(Book book)
System.out.println(book);
@Override
public void onError(Throwable throwable)
@Override
public void onCompleted()
countDownLatch.countDown();
System.out.println("查询完毕!");
);
countDownLatch.await();
客户端的代码好理解,搜索的关键字是 明清小说
,每当服务端返回一次数据的时候,客户端回调的 onNext 方法就会被触发一次,当服务端之行了 responseObserver.onCompleted();
之后,客户端的 onCompleted 方法也会被触发。
这个就是服务端流,客户端发起一个请求,服务端通过 onNext 可以多次写回数据。
4. 客户端流 RPC
客户端流则是客户端发起多个请求,服务端只给出一个响应。
上面的 updateBooks 就是一个客户端流的案例,客户端想要修改图书,可以发起多个请求修改多本书,服务端则收集多次修改的结果,将之汇总然后一次性返回给客户端。
我们先来看看服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase
private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl()
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags(聊一聊 gRPC 中的拦截器