13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)
Posted PacosonSWJTU
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)相关的知识,希望对你有一定的参考价值。
【README】
- 1.本文总结了 RPC 的概念,包括定义,RPC实现,及其优缺点;
- 2.本文po出了RPC的简单代码实现,总结自 B站《netty-尚硅谷》;
- 3.本文部分内容总结自 :What Is Remote Procedure Call (RPC)? Definition from SearchAppArchitecture
- 4.本文还po出了 RPC 与 RMI 的区别;
- 5.基于socket的rpc是阻塞式的,而基于netty的rpc是非阻塞式 ;
【1】RPC
【1.1】RPC定义
1)RPC定义: Remote Procedure Call, 远程过程调用协议;
远程过程调用是一种软件通讯协议,基于该协议,一个程序可以通过网络调用另一台计算机上的服务而无需关心细节,就像调用本地系统的服务一样。
【补充】RMI:Remote Method Invocation,远程方法调用
- RMI 能够让jvm上的客户端像调用本地方法一样,调用远程jvm服务器上对象的方法;
- 显然, RMI是 RPC 的 java版本的实现 ;
2)常见的 RPC 框架有:
- 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift, Spring 旗下的 Spring Cloud。
【1.2】RPC的实现
1)当触发远程过程调用,调用环境会被挂起,过程参数会通过网络传送到过程执行环境(过程参数可以理解为是类名,方法名,方法参数,方法参数类型等)。
2)当服务器过程执行完成,结果会被回传到调用环境;只要过程调用返回则客户端立即恢复执行;
3)RPC调用过程中,会经过如下步骤:
- 步骤1:客户端调用客户端存根(代理);该调用是本地过程调用,并把参数压入栈;
- 步骤2:客户端代理封装过程参数到报文,并发送报文;把过程参数打包称为 参数编组;
- 步骤3:客户端本地操作系统发送报文到远程服务器;
- 步骤4:服务器操作系统把请求数据包送入 服务器存根(代理);
- 步骤5:服务器代理把参数解包,这个过程称为 参数解组;
- 步骤6:当服务器过程调用完成,把结果返回给服务器代理;服务器代理把返回值封装到报文;服务器代理接着把报文传递给传输层;
- 步骤7:服务器传输层发送响应报文到客户端传输层,接着回传给客户端代理;
- 步骤8:客户端代理解组返回的参数,返回到调用者的执行点;
4)RPC 调用流程图
【图解】 在RPC中:
- 1)客户端称为服务消费者;服务端称为服务提供者;
- 2)编码是把对象转为字节数组(字节码),以便网络传输,即序列化;
- 3)解码是把字节数组(字节)转为对象,以便业务逻辑处理,即反序列化,
- 4)综上,RPC需要用到动态代理, 序列化与反序列化技术等;
【1.3】RPC的优缺点
1)优点:
- 在高级语言中,有助于客户端通过传统过程调用与服务器通讯;
- 能够被用于分布式环境,远程调用就像本地调用一样;
- 支持面向进程与面向线程模型;
- 对用户隐藏了内部报文传输机制;
- 仅需极少的工作即可重写和重新开发代码;
- 删除了许多协议层以提高性能;
2)缺点:
- 客户端与服务器各自使用不同执行环境,使用资源非常复杂,如文件。因此 RPC系统不适合传输大量数据;
- RPC极易失败,因为它涉及到通讯系统,另一外机器和另一个处理过程;
- RPC没有统一的标准,可以通过多种方式实现;
- RPC 只是基于交互的,因此,它对硬件架构没有提供任何灵活性;
【2】RPC的Socket实现(阻塞式)
部分代码转自: 简单RPC之Socket实现_归田的博客-CSDN博客_rpc socket
0)目录结构:
【图解】RPC的实现代码包含3部分:
1)公共协议:
- 约定的报文格式(封装了过程调用参数,包括全限定类名,方法名,参数类型数组,参数值数组),以便于服务端(服务提供方)根据反射创建对应类与调用方法;
- 约定的过程名(如方法名):把方法名封装到接口;
2)客户端(服务消费方):
- 通过代理模式创建服务的代理对象;调用方法,实际上调用的是代理对象方法,代理对象方法底层封装报文通过socket请求服务器获取调用结果;(客户端在整个调用过程是阻塞的,直到结果返回)
3)服务端(服务提供方):
- 根据报文中的 类名通过反射创建服务实例,根据方法名,参数类型,参数列表通过反射调用方法,并将结果回写到socket;
注意(报文的序列化与反序列化):
- 客户端把报文对象写出到 socket 前需要序列化为字节数组;服务器从socket读入字节数组后,需要反序列化为报文对象;
【2.1】协议报文与过程调用方法定义:
报文:
/**
* @Description 报文(注意:要可序列化)
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月12日
*/
public class MyRpcMessage implements Externalizable
/** 全限定类名 */
private String className;
/** 方法名 */
private String methodName;
/** 参数类型clazz数组 */
private Class<?>[] parameterTypes;
/** 参数值 */
private Object[] parameterValues;
/**
* @description 私有构造器
* @author xiao tang
* @date 2022/9/12
*/
public MyRpcMessage()
/**
* @description 创建生成器
* @return 生成器
* @author xiao tang
* @date 2022/9/12
*/
public static Builder builder()
Builder builder = new Builder();
return builder;
public String getClassName()
return className;
public void setClassName(String className)
this.className = className;
public String getMethodName()
return methodName;
public void setMethodName(String methodName)
this.methodName = methodName;
public Class<?>[] getParameterTypes()
return parameterTypes;
public void setParameterTypes(Class<?>[] parameterTypes)
this.parameterTypes = parameterTypes;
public Object[] getParameterValues()
return parameterValues;
public void setParameterValues(Object[] parameterValues)
this.parameterValues = parameterValues;
@Override
public void writeExternal(ObjectOutput out) throws IOException
out.writeUTF(className);
out.writeUTF(methodName);
out.writeObject(parameterTypes);
out.writeObject(parameterValues);
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
this.className = in.readUTF();
this.methodName = in.readUTF();
this.parameterTypes = (Class<?>[]) in.readObject();
this.parameterValues = (Object[]) in.readObject();
/**
* @description 生成器
* @author xiao tang
* @date 2022/9/12
*/
public static class Builder
/** rpc报文对象 */
private MyRpcMessage myRpcMessage;
/**
* @description 私有构造器
* @author xiao tang
* @date 2022/9/12
*/
private Builder()
this.myRpcMessage = new MyRpcMessage();
/**
* @description 设置全限定类名
* @return 生成器
* @author xiao tang
* @date 2022/9/12
*/
public Builder className(String className)
this.myRpcMessage.setClassName(className);
return this;
/**
* @description 设置方法名
* @return 生成器
* @author xiao tang
* @date 2022/9/12
*/
public Builder methodName(String methodName)
this.myRpcMessage.setMethodName(methodName);
return this;
/**
* @description 设置参数类型数组
* @return 生成器
* @author xiao tang
* @date 2022/9/12
*/
public Builder parameterTypes(Class<?>[] parameterTypes)
this.myRpcMessage.setParameterTypes(parameterTypes);
return this;
/**
* @description 设置参数值数组
* @return 生成器
* @author xiao tang
* @date 2022/9/12
*/
public Builder parameterValues(Object[] parameterValues)
this.myRpcMessage.setParameterValues(parameterValues);
return this;
/**
* @description 获得rpc报文对象
* @return rpc报文对象
* @author xiao tang
* @date 2022/9/12
*/
public MyRpcMessage build()
return myRpcMessage;
方法接口定义:
/**
* @Description 过程调用接口
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public interface IHelloRpc
String hello(String name);
【2.2】服务器:
/**
* @Description rpc服务器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public class MyRpcSocketServer
/** 线程池 */
private static ExecutorService THREAD_POOL = Executors.newCachedThreadPool();
public static void main(String[] args)
try
startServer0(8089);
catch (IOException e)
e.printStackTrace();
finally
THREAD_POOL.shutdown();
/**
* @description 开启服务器
* @param port 端口
* @author xiao tang
* @date 2022/9/11
*/
private static void startServer0(int port) throws IOException
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("服务器启动成功.");
while(true)
try
// 阻塞直到有客户端请求
Socket socket = serverSocket.accept();
// 处理器请求
THREAD_POOL.execute(new BusiTask(socket));
catch (Exception e)
e.printStackTrace();
System.out.println("抛出异常");
throw e;
/**
* @description 业务作业,Runnable子类
* @author xiao tang
* @date 2022/9/11
*/
static class BusiTask implements Runnable
private Socket socket;
public BusiTask(Socket socket)
this.socket = socket;
@Override
public void run()
// 获得字节输入输出流
try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream outputMsg = new ObjectOutputStream(socket.getOutputStream());)
// 获取过程参数(报文),包括类名,方法名,方法参数类型,方法参数列表
MyRpcMessage inputMsg = (MyRpcMessage)inputStream.readObject();
// 通过反射创建对象
Class<?> clazz = Class.forName(inputMsg.getClassName());
Object procedureObj = clazz.newInstance();
// // 通过反射创建方法
Method method = clazz.getDeclaredMethod(inputMsg.getMethodName(), inputMsg.getParameterTypes());
// // 通过反射调用方法
Object result = method.invoke(procedureObj, inputMsg.getParameterValues());
// 把结果回写
outputMsg.writeObject(result);
catch (Exception e)
e.printStackTrace();
finally
try
socket.close();
catch (IOException e)
e.printStackTrace();
过程调用接口实现类:
/**
* @Description 过程调用接口实现
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public class IHelloRpcServerImpl implements IHelloRpc
@Override
public String hello(String name)
return "hello " + name + ", nice to meet you, i am a server named RPC.";
【2.3】客户端:
/**
* @Description RPC存根(代理)工厂
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public class MyRpcProxyFactory
/**
* @description 私有构造器
* @author xiao tang
* @date 2022/9/11
*/
private MyRpcProxyFactory()
/**
* @description 创建代理对象
* @param targetClazz 目标对象clazz
* @param className 类名
* @return 代理对象
* @author xiao tang
* @date 2022/9/11
*/
public static <T> T newProxy(Class<T> targetClazz, String className)
return (T) Proxy.newProxyInstance(
targetClazz.getClassLoader()
, new Class<?>[]targetClazz
, (Object proxy, Method method, Object[] args) ->
Socket socket = new Socket("localhost",8089);
// 获取输出流
try(ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream()))
// 写出过程参数到服务器,包括类名,方法名,方法参数类型,方法参数列表
MyRpcMessage message = MyRpcMessage.builder()
.className(className)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.parameterValues(args)
.build();
outputStream.writeObject(message);
outputStream.flush();
// 获取输入流
Object result = inputStream.readObject();
if (result instanceof Throwable)
throw (Throwable)result;
return result;
finally
socket.close();
);
测试用例入口:
public class MyRpcSocketClient
public static void main(String[] args)
IHelloRpc helloRpc = MyRpcProxyFactory.newProxy(IHelloRpc.class, "com.my.netty.rpc.socket.server.IHelloRpcServerImpl");
String result = helloRpc.hello("成都");
System.out.println(result);
【2.4】运行效果:
hello 成都, nice to meet you, i am a server named RPC.
【3】RPC的netty 实现
1)背景: 基于socket的rpc实现时阻塞式的,吞吐量不高; 本文尝试使用netty 实现rpc ;
2)netyt优势:
- netty是非阻塞的,能够很好利用多核cpu的算力,增大系统吞吐量;
- netty底层采用零拷贝,报文通讯性能更优;
3)代码目录:
【图解】 代码还是分为3部分(常量不算):
1)公共协议:
- 把过程方法抽象为接口,客户端与服务器以该接口进行通讯;
2)客户端:
- 通过代理模式创建服务的代理对象;调用方法,实际上调用的是代理对象方法,代理对象方法底层封装报文(一个简单的字符串)通过 netty非阻塞式连接服务器,并把报文写出到channel,送入服务器(客户端在整个调用过程是非阻塞的);
- 当前线程睡眠直到服务端的响应报文回写到客户端;
3)服务端:
- 服务器在接收到客户端请求报文后,若协议报文头以 HelloServer#hello# 开头,则调用预设 HelloService.hello() 方法;
- 调用hello() 方法完成后,把结果回写到netty的channel通道;
- 补充:为了简化编程,基于netty的rpc实现没有使用反射来创建类和调用方法,而是写死 方法;当然了,像上文的socket版本的rpc一样,可以把类名,方法名,参数类型,参数值列表等过程参数封装到报文头,服务器可以根据上述报文头通过反射来创建类和调用方法;
【3.1】公共协议(约定)
1)公共接口;
/**
* @Description 公共接口,服务提供方,服务消费方都需要
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public interface HelloService
String hello(String msg);
常量:
public class MyDubboConst
/** 协议头 */
public static final String PROTOCOL_HEAD = "HelloServer#hello#";
【3.2】 客户端
1)netty客户端代理:
/**
* @Description 客户端-服务消费者
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public class MyDubboNettyClient
// 创建线程池
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// 客户端处理器
private static MyDubboNettyClientHandler clientHandler;
private int counter = 0;
// 编写方法,使用代理模式,获取代理对象
public Object getBean(final Class<?> serviceClass, final String protocolHead)
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]serviceClass, (Object proxy, Method method, Object[] args)->
System.out.println("Object proxy, Method method, Object[] args 进入了,第 " + (++counter) + " 次进入");
// 下面的代码, 客户端每调用一次hello,就会进入到该代码
if (clientHandler == null)
initClientHandler(); // 初始化客户端处理器
// 设置要发送给服务器的信息
// protocolHead 协议头部
// args[0] 就是客户端调用 hello的参数
System.out.println("clientHandler.setParameter");
clientHandler.setParameter(protocolHead + args[0]);
System.out.println("executorService.submit");
return executorService.submit(clientHandler).get();
);
// 初始化客户端处理器
private static void initClientHandler()
clientHandler = new MyDubboNettyClientHandler();
// 创建 EventLoopGroup
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NiosocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
);
// 启动客户端
ChannelFuture channelFuture = null;
try
channelFuture = bootstrap.connect("127.0.0.1", 8089).sync();
catch (InterruptedException e)
e.printStackTrace();
System.out.println("客户端启动成功");
2)netty客户端业务逻辑处理器
/**
* @Description dubbo客户端处理器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public class MyDubboNettyClientHandler extends ChannelInboundHandlerAdapter implements Callable
private ChannelHandlerContext context; // 上下文
private String result; // 返回结果
private String parameter; // 客户端调用方法时,传入的参数
// 与服务器连接创建成功后,就会被调用 (第1个被调用 )
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
System.out.println("MyDubboNettyClientHandler.channelActive");
context = ctx; // 因为在其他方法也会用到这个 context
// 收到服务器的数据后,就会调用方法
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("MyDubboNettyClientHandler.channelRead");
result = msg.toString();
notify(); // 唤醒等待的线程
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
System.out.println("MyDubboNettyClientHandler.exceptionCaught");
ctx.close();
// 被代理对象调用, 发送数据给服务器 -> 然后 wait -> 等待被唤醒 -> 返回结果 (第3个被调用)
@Override
public synchronized Object call() throws Exception
System.out.println("发送数据到服务器, parameter = " + parameter);
context.writeAndFlush(parameter); // 发送数据给服务器
System.out.println("发送数据到服务成功");
wait(); // 等待直到服务器返回结果,服务器返回后调用 channelRead 方法来唤醒;
System.out.println("after wait().");
return result; // 服务提供方返回的结果
// 第2个被调用,设置参数
public void setParameter(String parameter)
System.out.println("setParameter");
this.parameter = parameter;
3)客户端启动引导程序:
public class MyDubboClientBootstrap
public static void main(String[] args)
// 创建访问消费者-netty客户端
MyDubboNettyClient customer = new MyDubboNettyClient();
// 创建代理对象
HelloService helloService = (HelloService)customer.getBean(HelloService.class, MyDubboConst.PROTOCOL_HEAD);
// 通过代理对象调用服务提供者的服务
String result = helloService.hello("hello服务器");
System.out.println("服务器响应结果 = " + result);
【3.3】服务器
1)过程调用(方法调用)的具体实现
/**
* @Description 服务提供方实现接口
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public class HelloServiceImpl implements HelloService
private int counter;
// 当有消费方调用该方法时,就返回一个字符串
@Override
public String hello(String msg)
System.out.println("收到客户端消息 = " + msg);
// 根据msg 返回不同结果
if (msg != null)
return "你好客户端,我收到你的消息 ["+msg+"]这是第" + (++counter) + "次收到消息";
else
return "你好客户端,我收到的消息为空" ;
2)netty服务器
/**
* @Description 基于netty的 Dubbo 服务器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public class MyDubboNettyServer
public static void startServer(String hostName, int port) throws InterruptedException
startServer0(hostName, port);
private static void startServer0(String hostName, int port) throws InterruptedException
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new MyDubboNettyServerHandler()) ; // 自定义业务处理器
);
// 启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
System.out.println("服务提供方启动成功,开始提供服务");
// 监听关闭
channelFuture.channel().closeFuture().sync();
finally
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
3)netty服务器业务处理器
/**
* @Description 基于netty的 Dubbo 服务器处理器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月11日
*/
public class MyDubboNettyServerHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
// 获取客户端发送的消息 并 调用公共接口服务
System.out.println("接收的msg = " + msg);
// 客户端在调用服务器的api时,我们需要定义一个协议
// 如 msg 以 "HelloServer#hello#" 开头,才调用api
String msgString = msg.toString();
if (msgString.startsWith(MyDubboConst.PROTOCOL_HEAD))
String result = new HelloServiceImpl().hello(msgString.substring(msgString.lastIndexOf("#")+1));
ctx.writeAndFlush(result);
else
ctx.writeAndFlush("协议头非法");
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
ctx.close();
4)服务器启动引导程序:
public class MyDubboServerBootstrap
public static void main(String[] args) throws InterruptedException
// 代码代填
MyDubboNettyServer.startServer("127.0.0.1", 8089);
【3.4】运行效果:
1)服务器日志:
服务提供方启动成功,开始提供服务
[DEBUG] Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
[DEBUG] Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
[DEBUG] Recycler - -Dio.netty.recycler.linkCapacity: 16
[DEBUG] Recycler - -Dio.netty.recycler.ratio: 8
[DEBUG] AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
[DEBUG] ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@e62edfd
接收的msg = HelloServer#hello#hello服务器
收到客户端消息 = hello服务器
2)客户端日志:
客户端启动成功
clientHandler.setParameter
MyDubboNettyClientHandler.channelActive
setParameter
executorService.submit
发送数据到服务器, parameter = HelloServer#hello#hello服务器
[DEBUG] Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
[DEBUG] Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
[DEBUG] Recycler - -Dio.netty.recycler.linkCapacity: 16
[DEBUG] Recycler - -Dio.netty.recycler.ratio: 8
发送数据到服务成功
[DEBUG] AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
[DEBUG] ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@89e890f
MyDubboNettyClientHandler.channelRead
after wait().
服务器响应结果 = 你好客户端,我收到你的消息 [hello服务器]这是第1次收到消息
以上是关于13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)的主要内容,如果未能解决你的问题,请参考以下文章