java 实现grpc服务间调用工程
Posted BBinChina
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 实现grpc服务间调用工程相关的知识,希望对你有一定的参考价值。
学习目标:
使用grpc协议,实现服务间通讯
学习内容:
1、rpc协议有哪些
2、grpc
3、构建rpc服务
rpc协议有哪些
RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务。
为微服务时代,rpc是常用的技术方案,区别于单体服务内的本地调用,rpc需要解决的是不同进程间的调用,往往会是跨机器调用(区别于同一机器下通道等IPC解决方案),跨机器主要解决的是网络通讯。
那么跨机器的网络通讯方式主要通过TCP/IP,而rpc更多关注的是应用层的协议。常用的RPC通讯协议有json(http)、二进制(grpc、dubbo)。
json的话便于解析,但需要占用带宽。
二进制的话需要特殊的解析过程,带宽占用小,但又依赖协议解析性能,解析过程也叫:序列化、反序列化
学习使用gRpc
grpc是谷歌出品的二进制通讯框架,既然是二进制,那么就需要协议解析层了, 在构建grpc服务器时,我们需要引入grpc 组件。
grpc服务层依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dfs</groupId>
<artifactId>dfs-rpc</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>dfs-rpc</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourcecEncoding>UTF-8</project.build.sourcecEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
</project>
gRpc工具
1、需要使用到 protoc进程,该进程主要用于生成协议序列化跟反序列化
2、生成java通讯层代码,需要引入grpc的java插件
我们采用的protoc为3.15.5,protoc-gen-grpc-java插件为1.9.1版本。
注意:版本需要跟服务引入的依赖一直
以上protoc进程可以到我上传的资源库里下载
构建服务步骤
1、编写proto
构建grpc服务的第一步是先编写我们的proto用于生成grpc代码,可以看到我这里有两个proto
NameNodeRpcModel.proto
该协议内容定义了两个请求的request跟respon 消息体,用于传输序列化
syntax = "proto3";
package com.dfs.namenode.rpc;
option java_multiple_files = true;
option java_package = "com.dfs.namenode.rpc.model";
option java_outer_classname = "NameNodeRpcModel";
message RegisterRequest {
string ip = 1;
string hostname = 2;
}
message RegisterResponse {
int32 status = 1;
}
message HeartbeatRequest {
string ip = 1;
string hostname = 2;
}
message HeartbeatResponse {
int32 status = 1;
}
NameNodeRpcServer.proto
该协议内容定义了请求的api
syntax = "proto3";
package com.dfs.namenode.rpc;
option java_multiple_files = true;
option java_package = "com.dfs.namenode.rpc.service";
option java_outer_classname = "NameNodeServer";
import "NameNodeRpcModel.proto";
service NameNodeService {
rpc Register(RegisterRequest) returns (RegisterResponse);
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}
2、生成grpc代码
protoc.exe --java_out=./ *.proto
protoc.exe --plugin=protoc-gen-grpc-java=protoc-gen-grpc-java-0.13.2-windows-x86_64.exe --grpc-java_out=./ *.proto
在protoc的进程目录下执行以上代码后,会生成 protoc协议里定义的java_package
将com包内容导入dfs-rpc工程,可以生成单独的组件,用于被其他服务调用,再次提醒注意pom引入的依赖版本,需要跟我们使用的生成器版本一致。
3、服务提供方 namenode, 服务消费方(调用方)datanode
namenode需要实现业务服务,在namenode引入rpc依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dfs</groupId>
<artifactId>dfs-namenode</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>dfs-rpc</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourcecEncoding>UTF-8</project.build.sourcecEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.dfs</groupId>
<artifactId>dfs-rpc</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
构建实现rpc业务层实现
package com.dfs.namenode.server;
import com.dfs.namenode.rpc.model.HeartbeatRequest;
import com.dfs.namenode.rpc.model.HeartbeatResponse;
import com.dfs.namenode.rpc.model.RegisterRequest;
import com.dfs.namenode.rpc.model.RegisterResponse;
import com.dfs.namenode.rpc.service.*;
import io.grpc.stub.StreamObserver;
public class NameNodeServiceImpl implements NameNodeServiceGrpc.NameNodeService{
public static final Integer STATUS_SUCCESS = 1;
public static final Integer STATUS_FAILURE = 2;
private FSNameSystem fsNameSystem;
private DataNodeManager dataNodeManager;
public NameNodeServiceImpl(FSNameSystem fsNameSystem, DataNodeManager dataNodeManager) {
this.fsNameSystem = fsNameSystem;
this.dataNodeManager = dataNodeManager;
}
@Override
public void register(RegisterRequest request, StreamObserver<RegisterResponse> responseObserver) {
dataNodeManager.Register(request.getIp(), request.getHostname());
RegisterResponse response = RegisterResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
dataNodeManager.Heartbeat(request.getIp(), request.getHostname());
HeartbeatResponse response = HeartbeatResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
构建的实现需要注入到gRpc的通讯框架中,用于当有gRpc请求时路由到对应的业务服务。
package com.dfs.namenode.server;
import com.dfs.namenode.rpc.service.NameNodeServiceGrpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
/**
* 对外提供rpc接口
*/
public class NameNodeRpcServer {
private static final int DEFAULT_PORT = 50070;
private Server server = null;
private FSNameSystem fsNameSystem;
private DataNodeManager dataNodeManager;
public NameNodeRpcServer(FSNameSystem fsNameSystem, DataNodeManager dataNodeManager) {
this.fsNameSystem = fsNameSystem;
this.dataNodeManager = dataNodeManager;
}
/**
* 注册数据节点
*
* @param ip
* @param hostname
* @return
* @throws Exception
*/
public Boolean Register(String ip, String hostname) throws Exception {
return dataNodeManager.Register(ip, hostname);
}
/**
* 创建目录
*
* @param path
* @return
* @throws Exception
*/
public Boolean Mkdir(String path) throws Exception {
return this.fsNameSystem.mkdir(path);
}
public Boolean Heartbeat(String ip, String hostname) throws Exception {
return dataNodeManager.Heartbeat(ip, hostname);
}
/**
* 启动grpc服务
*/
public void Start() throws IOException {
//监听,接受请求
server = ServerBuilder.forPort(DEFAULT_PORT).addService(NameNodeServiceGrpc.bindService(new NameNodeServiceImpl(fsNameSystem, dataNodeManager))).build().start();
}
}
在NameNodeRpcServer 中使用grp的ServerBuilder将处理服务注入,同时绑定监听的端口。
4、调用方dataNode
在DataNode工程内使用NameNodeService的代理类,用于对NameNode进行Rpc请求的封装
package com.dfs.datanode.server;
import com.dfs.namenode.rpc.service.NameNodeServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
public class NameNodeService {
private static final String NAMENODE_HOSTNAME = "localhost";
private static final Integer NAMENODE_PORT = 50070;
private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;
public NameNodeService() {
ManagedChannel channel = NettyChannelBuilder.forAddress(NAMENODE_HOSTNAME, NAMENODE_PORT).negotiationType(NegotiationType.PLAINTEXT).build();
this.namenode = NameNodeServiceGrpc.newBlockingStub(channel);
}
}
这里使用NettyChannelBuilder创建一个对NameNode的请求通道。
总结:
使用grpc较于复杂的情况在于rpc层的代码生成,以及任何的调用方都需要引入服务提供方的jar包,而不像http的方式直接发送json数据。
以上是关于java 实现grpc服务间调用工程的主要内容,如果未能解决你的问题,请参考以下文章