Rpc到底是个啥
Posted James的黑板报
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rpc到底是个啥相关的知识,希望对你有一定的参考价值。
什么是Rpc服务
Rpc全称为Remote Procedure Call,中文名为
远程过程调用
,Rpc是一种计算机通信协议,他允许像调用本地服务一样调用远程服务,且随着互联网应用的量级越来越大,单台计算机的计算能力有限,需要借助集群的算力来实现,在分布式系统中,各个系统中的服务可以通过Rpc来进行互相调用,我们现在学习的许多分布式组件(hdfs,kafka)等底层都是通过Rpc来进行机器之间的互相通信的
Rpc的原理
在Rpc框架中主要有三个角色的存在: Registry
,Provider
,Consumer
,分别是服务注册中心
,服务提供方
,服务消费方
Rpc的调用过程
在上图示例中,我们可以看到整个Rpc服务的调用过程,调用过程如下
下面我们简单实现一个Rpc例子。
服务间远程调用实现整数求和
首先看看,实现这个逻辑,我们需要规划好哪些呢?
1.各个服务之间的职责2.服务之间通信与传输模型如何实现
现在已经明确了整个过程中需要三个服务:Registry,Provider,Consumer,我们先去实现Provider的逻辑
作为服务提供者,我们需要提供一个整数相加的方法暴露给其他服务,在这里我们定义一个接口,新建一个java工程,在src下新建Provider/Methods包,新建一个接口MethodInterface.java
package Provider.Methods;
/**
* 服务提供者接口
*/
public interface MethodInterface {
int GetSum(int a,int b);
}
接下来去实现这个方法,同级目录下新建一个接口实现类MethodInterfaceImpl.java
package Provider.Methods;
/**
* 服务提供者接口实现方法
**/
public class MethodInterfaceImpl implements MethodInterface {
@Override
public int GetSum(int a, int b) {
return a + b;
}
}
package Registry;
import java.io.IOException;
/**
* 服务注册中心接口
*/
public interface ServerRegistry {
//启动注册中心
public void start() throws IOException;
//停止注册中心
public void stop();
//注册一个方法类
public void register(Class serviceInterface,Class Impl);
}
接下来去实现这个注册中心的方法,以下高能预警
package Registry;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 服务注册中心实现类
**/
public class ServerRegistryImpl implements ServerRegistry {
// private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static ExecutorService executor = Executors.newCachedThreadPool();
private static final HashMap<String,Class> serviceRegistry = new HashMap<String,Class>();
private static int port;
public ServerRegistryImpl(int port) {
this.port = port;
}
@Override
public void stop() {
executor.shutdown();
}
@Override
public void start() throws IOException {
ServerSocket server = new ServerSocket();
server.bind(new InetSocketAddress(port));
System.out.println("当前已经注册的服务");
for (Map.Entry<String, Class> entry : serviceRegistry.entrySet()) {
String key = entry.getKey();
String value = entry.getValue().getName();
System.out.println("key=" + key + " value=" + value);
}
try {
while (true) {
// 循环监听客户端的tcp连接,接到tcp连接后直接封装为task,由线程池去执行
executor.execute(new ServiceTask(server.accept()));
}
} finally {
server.close();
executor.shutdown();
}
}
@Override
public void register(Class serviceInterface, Class Impl) {
// 服务列表注册到注册中心的hash表里
serviceRegistry.put(serviceInterface.getName(),Impl);
}
// 循环执行服务任务
public static class ServiceTask implements Runnable {
Socket client = null;
public ServiceTask(Socket client) {
this.client = client;
}
@Override
public void run() {
// todo 这两个类的方法作用
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
// 将客户端发送的码流反序列化为对象,反射调用服务实现者,获取执行结果
input = new ObjectInputStream(client.getInputStream());
String serviceName = input.readUTF();
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
// 获取注册中心的实体类
Class serviceClass = serviceRegistry.get(serviceName);
if (serviceClass == null) {
throw new ClassNotFoundException(serviceName + " not found");
}
Method method = serviceClass.getMethod(methodName,parameterTypes);
Object result = method.invoke(serviceClass.newInstance(),arguments);
// 将执行结果反序列化,通过socket发送给客户端
output = new ObjectOutputStream(client.getOutputStream());
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (output != null) {
try {
output.close();
}catch (IOException e) {
e.printStackTrace();
}
}
if (input != null) {
try {
input.close();
}catch (IOException e) {
e.printStackTrace();
}
}
if (client != null) {
try {
client.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
让我们把目光再回到服务提供者,现在注册中心已经搭建好,可以去注册我们的整数相加的方法了,Provider下新建一个ProviderServer.java文件
package Provider;
import Provider.Methods.MethodInterface;
import Provider.Methods.MethodInterfaceImpl;
import Registry.ServerRegistryImpl;
import java.io.IOException;
/**
* 服务提供者注册方法
**/
public class ProviderServer {
public static void main(String[] args) throws IOException {
ServerRegistryImpl serverRegistry = new ServerRegistryImpl(8088);
// 注册一个接口方法,key是接口名,value是接口实现类
serverRegistry.register(MethodInterface.class, MethodInterfaceImpl.class);
serverRegistry.start();
}
}
现在我们可以启动这个server,可以看到提供者的方法已经注册上去了
现在我们去考虑服务消费者端的设计,这里我们要使用一种基于动态代理的设计策略去实现,在src下新建一个Consumer包,在其下新建一个RPCServer.java文件
package Consumer;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* 客户端远程代理对象
**/
public class RPCServer<T> {
public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
// 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream output = null;
ObjectInputStream input = null;
try {
// 2.创建Socket客户端,根据指定地址连接远程服务提供者
socket = new Socket();
socket.connect(addr);
// 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
output = new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(serviceInterface.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
// 4.同步阻塞等待服务器返回应答,获取应答后返回
input = new ObjectInputStream(socket.getInputStream());
return input.readObject();
} finally {
if (socket != null) {
socket.close();
}
if (output != null) {
output.close();
}
if (input != null) {
input.close();
}
}
}
});
}
}
接下来去实现我们的服务消费者服务端,同样在Consumer下新建一个ConsumerServer.java文件
package Consumer;
import Provider.Methods.MethodInterface;
import java.net.InetSocketAddress;
/**
* 服务消费者
**/
public class ConsumeServer {
public static void main(String[] args) {
// 远程调用接口方法
MethodInterface service = RPCServer.getRemoteProxyObj(MethodInterface.class,new InetSocketAddress("localhost",8088));
System.out.println(service.GetSum(1,2));
}
}
现在我们执行这个服务消费者的main方法,可以看到,通过Rpc代理简单的封装,我们可以像调用本地方法一样调用远程服务提供者的服务
这就是本文的所有内容,以上,欢迎大家在后台留言讨论哟
以上是关于Rpc到底是个啥的主要内容,如果未能解决你的问题,请参考以下文章