java 实现grpc服务间调用工程

Posted BBinChina

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 实现grpc服务间调用工程相关的知识,希望对你有一定的参考价值。

学习目标:

使用grpc协议,实现服务间通讯

学习内容:

1、rpc协议有哪些
2、grpc
3、构建rpc服务

rpc协议有哪些

RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务。
为微服务时代,rpc是常用的技术方案,区别于单体服务内的本地调用,rpc需要解决的是不同进程间的调用,往往会是跨机器调用(区别于同一机器下通道等IPC解决方案),跨机器主要解决的是网络通讯。

那么跨机器的网络通讯方式主要通过TCP/IP,而rpc更多关注的是应用层的协议。常用的RPC通讯协议有json(http)、二进制(grpc、dubbo)。

json的话便于解析,但需要占用带宽。
二进制的话需要特殊的解析过程,带宽占用小,但又依赖协议解析性能,解析过程也叫:序列化、反序列化

学习使用gRpc

grpc是谷歌出品的二进制通讯框架,既然是二进制,那么就需要协议解析层了, 在构建grpc服务器时,我们需要引入grpc 组件。

grpc服务层依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dfs</groupId>
    <artifactId>dfs-rpc</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>dfs-rpc</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourcecEncoding>UTF-8</project.build.sourcecEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.1.0</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-all</artifactId>
            <version>1.3.0</version>
        </dependency>
    </dependencies>


</project>

gRpc工具

1、需要使用到 protoc进程,该进程主要用于生成协议序列化跟反序列化
2、生成java通讯层代码,需要引入grpc的java插件

我们采用的protoc为3.15.5,protoc-gen-grpc-java插件为1.9.1版本。
注意:版本需要跟服务引入的依赖一直

以上protoc进程可以到我上传的资源库里下载

构建服务步骤

1、编写proto
构建grpc服务的第一步是先编写我们的proto用于生成grpc代码,可以看到我这里有两个proto

NameNodeRpcModel.proto
该协议内容定义了两个请求的request跟respon 消息体,用于传输序列化

syntax = "proto3";

package com.dfs.namenode.rpc;

option java_multiple_files = true;
option java_package = "com.dfs.namenode.rpc.model";
option java_outer_classname = "NameNodeRpcModel";

message RegisterRequest {
  string ip = 1;
  string hostname = 2;
}

message RegisterResponse {
  int32 status = 1;
}

message HeartbeatRequest {
  string ip = 1;
  string hostname = 2;
}

message HeartbeatResponse {
  int32 status = 1;
}

NameNodeRpcServer.proto
该协议内容定义了请求的api

syntax = "proto3";

package com.dfs.namenode.rpc;

option java_multiple_files = true;
option java_package = "com.dfs.namenode.rpc.service";
option java_outer_classname = "NameNodeServer";

import "NameNodeRpcModel.proto";

service NameNodeService {
  rpc Register(RegisterRequest) returns (RegisterResponse);
  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}

2、生成grpc代码

protoc.exe --java_out=./ *.proto

protoc.exe --plugin=protoc-gen-grpc-java=protoc-gen-grpc-java-0.13.2-windows-x86_64.exe --grpc-java_out=./ *.proto

在protoc的进程目录下执行以上代码后,会生成 protoc协议里定义的java_package


将com包内容导入dfs-rpc工程,可以生成单独的组件,用于被其他服务调用,再次提醒注意pom引入的依赖版本,需要跟我们使用的生成器版本一致。

3、服务提供方 namenode, 服务消费方(调用方)datanode

namenode需要实现业务服务,在namenode引入rpc依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dfs</groupId>
    <artifactId>dfs-namenode</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>dfs-rpc</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourcecEncoding>UTF-8</project.build.sourcecEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.dfs</groupId>
            <artifactId>dfs-rpc</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

构建实现rpc业务层实现

package com.dfs.namenode.server;

import com.dfs.namenode.rpc.model.HeartbeatRequest;
import com.dfs.namenode.rpc.model.HeartbeatResponse;
import com.dfs.namenode.rpc.model.RegisterRequest;
import com.dfs.namenode.rpc.model.RegisterResponse;
import com.dfs.namenode.rpc.service.*;
import io.grpc.stub.StreamObserver;

public class NameNodeServiceImpl implements NameNodeServiceGrpc.NameNodeService{
    public static final Integer STATUS_SUCCESS = 1;
    public static final Integer STATUS_FAILURE = 2;

    private FSNameSystem fsNameSystem;
    private DataNodeManager dataNodeManager;

    public NameNodeServiceImpl(FSNameSystem fsNameSystem, DataNodeManager dataNodeManager) {
        this.fsNameSystem = fsNameSystem;
        this.dataNodeManager = dataNodeManager;
    }

    @Override
    public void register(RegisterRequest request, StreamObserver<RegisterResponse> responseObserver) {
        dataNodeManager.Register(request.getIp(), request.getHostname());
        RegisterResponse response = RegisterResponse.newBuilder().setStatus(STATUS_SUCCESS).build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    @Override
    public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
        dataNodeManager.Heartbeat(request.getIp(), request.getHostname());

        HeartbeatResponse response = HeartbeatResponse.newBuilder().setStatus(STATUS_SUCCESS).build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }
}

构建的实现需要注入到gRpc的通讯框架中,用于当有gRpc请求时路由到对应的业务服务。

package com.dfs.namenode.server;

import com.dfs.namenode.rpc.service.NameNodeServiceGrpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

/**
 * 对外提供rpc接口
 */
public class NameNodeRpcServer {
    private static final int DEFAULT_PORT = 50070;

    private Server server = null;

    private FSNameSystem fsNameSystem;

    private DataNodeManager dataNodeManager;

    public NameNodeRpcServer(FSNameSystem fsNameSystem, DataNodeManager dataNodeManager) {
        this.fsNameSystem = fsNameSystem;
        this.dataNodeManager = dataNodeManager;
    }

    /**
     * 注册数据节点
     *
     * @param ip
     * @param hostname
     * @return
     * @throws Exception
     */
    public Boolean Register(String ip, String hostname) throws Exception {
        return dataNodeManager.Register(ip, hostname);
    }

    /**
     * 创建目录
     *
     * @param path
     * @return
     * @throws Exception
     */
    public Boolean Mkdir(String path) throws Exception {
        return this.fsNameSystem.mkdir(path);
    }

    public Boolean Heartbeat(String ip, String hostname) throws Exception {
        return dataNodeManager.Heartbeat(ip, hostname);
    }

    /**
     * 启动grpc服务
     */
    public void Start() throws IOException {
        //监听,接受请求
        server = ServerBuilder.forPort(DEFAULT_PORT).addService(NameNodeServiceGrpc.bindService(new NameNodeServiceImpl(fsNameSystem, dataNodeManager))).build().start();
    }
}

在NameNodeRpcServer 中使用grp的ServerBuilder将处理服务注入,同时绑定监听的端口。

4、调用方dataNode
在DataNode工程内使用NameNodeService的代理类,用于对NameNode进行Rpc请求的封装

package com.dfs.datanode.server;

import com.dfs.namenode.rpc.service.NameNodeServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

public class NameNodeService {

    private static final String NAMENODE_HOSTNAME = "localhost";
    private static final Integer NAMENODE_PORT = 50070;
    
    private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;
    
    public NameNodeService() {
        ManagedChannel channel = NettyChannelBuilder.forAddress(NAMENODE_HOSTNAME, NAMENODE_PORT).negotiationType(NegotiationType.PLAINTEXT).build();
        this.namenode = NameNodeServiceGrpc.newBlockingStub(channel);
    }
}

这里使用NettyChannelBuilder创建一个对NameNode的请求通道。

总结:
使用grpc较于复杂的情况在于rpc层的代码生成,以及任何的调用方都需要引入服务提供方的jar包,而不像http的方式直接发送json数据。

以上是关于java 实现grpc服务间调用工程的主要内容,如果未能解决你的问题,请参考以下文章

java版gRPC实战之二:服务发布和调用

3.微服务--GRPC

Istio 微服务架构实现服务间gRPC通信

grpc框架源码分析

Visual Studio自动编译gRPC工程的设置

gRPC的Java实现