带你手写基于 Spring 的可插拔式 RPC 框架通信协议模块
Posted paulwang92115
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了带你手写基于 Spring 的可插拔式 RPC 框架通信协议模块相关的知识,希望对你有一定的参考价值。
在写代码之前我们先要想清楚几个问题。
- 我们的框架到底要实现什么功能?
我们要实现一个远程调用的 RPC 协议。 - 最终实现效果是什么样的?
我们能像调用本地服务一样调用远程的服务。 - 怎样实现上面的效果?
前面几章已经给大家说了,使用动态代理,在客户端生成接口代理类使用,在代理类的 invoke 方法里面将方法参数等信息组装成 request 发给服务端,服务端需要起一个服务器一直等待接收这种消息,接收之后使用反射调
用对应接口的实现类。
首先我们需要实现底层的通信的服务端和客户端,可以有一下几种实现:
基于 Socket 的客户端和服务端(同步阻塞式,不推荐),大家可以当作一个编程练习,整个和系统没有进行整合,纯粹练习使用。
基于 Socket 的服务端。
启动一个阻塞式的 socket server,加入一个线程池实现伪异步。public class SocketServer private static SocketServer INSTANCE = new SocketServer(); private SocketServer(); public static SocketServer getInstance() return INSTANCE; //没有核心线程数量控制的线程池,最大线程数是 Integer 的最大值,多线程实现伪异步 ExecutorService executorService = Executors.newCachedThreadPool(); /** * 发布服务,bio 模型 * @param service * @param port */ public void publiser(int port) try (ServerSocket serverSocket = new ServerSocket(port);) while (true) Socket socket = serverSocket.accept();//接收请求 executorService.execute(new SocketHandler(socket)); catch (IOException e) e.printStackTrace();
对应的 hanlder,使用反射调用对应的服务,并通过 sokcet 写回结果。
public class SocketHandler implements Runnable private Socket socket; public SocketHandler(Socket socket) this.socket = socket; @Override public void run() try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());) Object o = inputStream.readObject(); //readObject 是 java 反序列化的过程 System.out.println(o); Object result = invoke((RpcRequest) o); //写回结果 outputStream.writeObject(result); outputStream.flush(); catch (IOException e) e.printStackTrace(); catch (ClassNotFoundException e) e.printStackTrace(); private Object invoke(RpcRequest invocation) //根据方法名和参数类型在 service 里获取方法 try String interFaceName = invocation.getInterfaceName(); Class impClass = Class.forName(invocation.getImpl()); Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes()); String result = (String)method.invoke(impClass.newInstance(),invocation.getParams()); return result; catch (NoSuchMethodException e) e.printStackTrace(); catch (IllegalAccessException e) e.printStackTrace(); catch (InvocationTargetException e) e.printStackTrace(); catch (InstantiationException e) e.printStackTrace(); catch (ClassNotFoundException e) e.printStackTrace(); return null;
在看客户端,拼装参数,发送给 socket 服务端。
通过上面的代码相信大家已经明白了这个流程了,就是一个客户端与服务端通信的过程,将需要调用的方法的参数传到服务端,服务端通过反射完成调用,最后返回结果给客户端。public class SocketClient private static SocketClient INSTANCE = new SocketClient(); private SocketClient(); public static SocketClient getInstance() return INSTANCE; private Socket newSocket(String host, Integer port) System.out.println("创建一个新的 socket 连接"); try Socket socket = new Socket(host, port); return socket; catch (IOException e) System.out.println("建立连接失败"); e.printStackTrace(); return null; public Object sendRequest(String host, Integer port,RpcRequest rpcRequest) Socket socket = newSocket(host,port); try ( ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());) outputStream.writeObject(rpcRequest); outputStream.flush(); Object result = inputStream.readObject(); inputStream.close(); outputStream.close(); return result; catch (Exception e) e.printStackTrace(); return null;
下面正式开始。基于 Http 请求的客户端和基于 Tomcat 的服务端。
基于 Tomcat 的服务端,单例模式,只有一个启动服务的 start 方法,监听到的请求通过 DispatcherServlet 处理。public class HttpServer private static HttpServer INSTANCE = new HttpServer(); private HttpServer() public static HttpServer getInstance() return INSTANCE; /** * * servlet 容器,tomcat * @param hostname * @param port */ public void start(String hostname,Integer port) Tomcat tomcat = new Tomcat(); Server server = tomcat.getServer(); Service service = server.findService("Tomcat"); Connector connector = new Connector(); connector.setPort(port); Engine engine = new StandardEngine(); engine.setDefaultHost(hostname); Host host = new StandardHost(); host.setName(hostname); String contextPath = ""; Context context = new StandardContext(); context.setPath(contextPath); context.addLifecycleListener(new Tomcat.FixContextListener()); //声明周期监听器 host.addChild(context); engine.addChild(host); service.setContainer(engine); service.addConnector(connector); tomcat.addServlet(contextPath,"dispatcher", new DispatcherServlet()); context.addServletMappingDecoded("/*","dispatcher"); try tomcat.start(); tomcat.getServer().await(); catch (LifecycleException e) e.printStackTrace();
下面来看请求分发器 DispatcherServlet 的实现,将请求派发给 HttpServletHandler 实现。
/** * tomcat 是 servlet 容器,写一个 servlet * */ public class DispatcherServlet extends HttpServlet @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException new HttpServletHandler().handler(req,resp);
HttpServletHandler 的实现其实就是解析 request,通过反射调用最后返回结果。
public class HttpServletHandler public void handler(HttpServletRequest req, HttpServletResponse resp) try(InputStream inputStream = req.getInputStream(); OutputStream outputStream =resp.getOutputStream();) ObjectInputStream ois = new ObjectInputStream(inputStream); RpcRequest invocation = (RpcRequest) ois.readObject(); // 从注册中心根据接口,找接口的实现类 String interFaceName = invocation.getInterfaceName(); Class impClass = Class.forName(invocation.getImpl()); Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes()); Object result = method.invoke(impClass.newInstance(),invocation.getParams()); RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setResponseId(invocation.getRequestId()); rpcResponse.setData(result); IOUtils.write(toByteArray(rpcResponse),outputStream); catch (IOException e) e.printStackTrace(); catch (ClassNotFoundException e) e.printStackTrace(); catch (NoSuchMethodException e) e.printStackTrace(); catch (IllegalAccessException e) e.printStackTrace(); catch (InvocationTargetException e) e.printStackTrace(); catch (InstantiationException e) e.printStackTrace(); public byte[] toByteArray (Object obj) byte[] bytes = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); try ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); bytes = bos.toByteArray (); oos.close(); bos.close(); catch (IOException ex) ex.printStackTrace(); return bytes;
最后来看客户端的实现,通过 post 方法发送数据,最后解析服务端返回的结果。
public class HttpClient private static HttpClient INSTANCE = new HttpClient(); private HttpClient() public static HttpClient getInstance() return INSTANCE; public Object post(String hostname, Integer port, RpcRequest invocation) try URL url = new URL("http",hostname,port,"/"); HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); httpURLConnection.setRequestMethod("POST"); httpURLConnection.setDoOutput(true); OutputStream outputStream = httpURLConnection.getOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(outputStream); oos.writeObject(invocation); oos.flush(); oos.close(); InputStream inputStream = httpURLConnection.getInputStream(); RpcResponse rpcResponse = (RpcResponse)toObject(IOUtils.toByteArray(inputStream)); return rpcResponse.getData(); catch (MalformedURLException e) e.printStackTrace(); catch (IOException e) e.printStackTrace(); return null; public Object toObject (byte[] bytes) Object obj = null; try ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream (bis); obj = ois.readObject(); ois.close(); bis.close(); catch (IOException ex) ex.printStackTrace(); catch (ClassNotFoundException ex) ex.printStackTrace(); return obj;
Netty 模型的客户端和服务端。
基于 Netty 的服务端,里面的编码器和解码器是我们自己实现的,大家可以先用我注释掉的那部分,等我们写到编码解码器的时候再替换。public class NettyServer private static NettyServer INSTANCE = new NettyServer(); private static Executor executor = Executors.newCachedThreadPool(); private final static int MESSAGE_LENGTH = 4; private NettyServer(); public static NettyServer getInstance() return INSTANCE; private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize()); public static void submit(Runnable t) executor.execute(t); public void start(String host, Integer port) EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try final ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioserverSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() @Override protected void initChannel(SocketChannel arg0) throws Exception ChannelPipeline pipeline = arg0.pipeline(); //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4); // pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, NettyServer.MESSAGE_LENGTH, 0, NettyServer.MESSAGE_LENGTH)); //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头 // pipeline.addLast(new LengthFieldPrepender(NettyServer.MESSAGE_LENGTH)); // pipeline.addLast(new ObjectEncoder()); //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可 // pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); //注册解码器NettyDecoderHandler pipeline.addLast(new NettyDecoderHandler(RpcRequest.class, serializeType)); //注册编码器NettyEncoderHandler pipeline.addLast(new NettyEncoderHandler(serializeType)); pipeline.addLast("handler", new NettyServerHandler()); ); Channel channel = bootstrap.bind(host, port).sync().channel(); System.out.println("Server start listen at " + port); catch(Exception e) bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
服务端对应的 handler,netty 都是这种 handler 模式,handler 里面也是将这个接收的 request 放入线程池中处理。
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> private ChannelHandlerContext context; @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception System.out.println("server channelRead..."); System.out.println(ctx.channel().remoteAddress() + "->server:" + rpcRequest.toString()); InvokeTask it = new InvokeTask(rpcRequest,ctx); NettyServer.submit(it); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception this.context = ctx;
给出 InvokeTask 的对应实现。
public class InvokeTask implements Runnable private RpcRequest invocation; private ChannelHandlerContext ctx; public InvokeTask(RpcRequest invocation,ChannelHandlerContext ctx) super(); this.invocation = invocation; this.ctx = ctx; @Override public void run() // 从注册中心根据接口,找接口的实现类 String interFaceName = invocation.getInterfaceName(); Class impClass = null; try impClass = Class.forName(invocation.getImpl()); catch (ClassNotFoundException e) e.printStackTrace(); Method method; Object result = null; try method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes()); //这块考虑实现类,是不是应该在 spring 里面拿 result = method.invoke(impClass.newInstance(),invocation.getParams()); catch (Exception e) e.printStackTrace(); RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setResponseId(invocation.getRequestId()); rpcResponse.setData(result); ctx.writeAndFlush(rpcResponse).addListener(new ChannelFutureListener() public void operationComplete(ChannelFuture channelFuture) throws Exception System.out.println("RPC Server Send message-id respone:" + invocation.getRequestId()); );
再来看客户端,客户端有两种实现,一种是不能复用 handler(可以立即为 connection)的模式,这种模式并发不太高,另一种是能够复用 handler 的 handlerPool 模式。
不能复用的模式。
public class NettyClient private static NettyClient INSTANCE = new NettyClient(); private final static int parallel = Runtime.getRuntime().availableProcessors() * 2; private NettyClient(); public static NettyClient getInstance() return INSTANCE; private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize()); public void start(String host,Integer port) Bootstrap bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(parallel); try bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() @Override protected void initChannel(SocketChannel arg0) throws Exception ChannelPipeline pipeline = arg0.pipeline(); //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4); // pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头 // pipeline.addLast(new LengthFieldPrepender(4)); // pipeline.addLast(new ObjectEncoder()); //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可 // pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); //注册Netty编码器 System.out.println("11111111:"+serializeType.getSerializeType()); pipeline.addLast(new NettyEncoderHandler(serializeType)); //注册Netty解码器 pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType)); pipeline.addLast("handler", new NettyClientHandler()); ); ChannelFuture future = bootstrap.connect(host,port).sync(); catch(Exception e) group.shutdownGracefully();
在看可复用的模式,固定 handler 数量,目前框架中使用的是可复用模式,上面的不可复用的没用上,为了给大家理解,没有删除。
public class NettyChannelPoolFactory //初始化Netty Channel阻塞队列的长度,该值为可配置信息 private static final int channelConnectSize = 10; //Key为服务提供者地址,value为Netty Channel阻塞队列 private static final Map<URL, ArrayBlockingQueue<Channel>> channelPoolMap = new ConcurrentHashMap<>(); private static NettyChannelPoolFactory INSTANCE = new NettyChannelPoolFactory(); private NettyChannelPoolFactory(); public static NettyChannelPoolFactory getInstance() return INSTANCE; private List<ServiceProvider> serviceMetaDataList = new ArrayList<>(); //根据配置文件里面需要调用的接口信息来初始化 channel public void initNettyChannelPoolFactory(Map<String, List<ServiceProvider>> providerMap) //将服务提供者信息存入serviceMetaDataList列表 Collection<List<ServiceProvider>> collectionServiceMetaDataList = providerMap.values(); for (List<ServiceProvider> serviceMetaDataModels : collectionServiceMetaDataList) if (CollectionUtils.isEmpty(serviceMetaDataModels)) continue; serviceMetaDataList.addAll(serviceMetaDataModels); //获取服务提供者地址列表 Set<URL> set = new HashSet<>(); for (ServiceProvider serviceMetaData : serviceMetaDataList) String serviceIp = serviceMetaData.getIp(); int servicePort = serviceMetaData.getPort(); URL url = new URL(serviceIp,servicePort); set.add(url); for(URL url:set) //为每个 ip端口 建立多个 channel,并且放入阻塞队列中 int channelSize = 0; while(channelSize < channelConnectSize) Channel channel = null; while(channel == null) channel = registerChannel(url); channelSize ++; ArrayBlockingQueue<Channel> queue = channelPoolMap.get(url); if(queue == null) queue = new ArrayBlockingQueue<Channel>(channelConnectSize); channelPoolMap.put(url, queue); queue.offer(channel); public Channel registerChannel(URL url) final SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize()); Bootstrap bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(10); try bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() @Override protected void initChannel(SocketChannel arg0) throws Exception ChannelPipeline pipeline = arg0.pipeline(); //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4); // pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头 // pipeline.addLast(new LengthFieldPrepender(4)); // pipeline.addLast(new ObjectEncoder()); //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可 // pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new NettyEncoderHandler(serializeType)); //注册Netty解码器 pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType)); pipeline.addLast("handler", new NettyClientHandler()); ); ChannelFuture future = bootstrap.connect(url.getHost(),url.getPort()).sync(); Channel channel = future.channel(); //等待Netty服务端链路建立通知信号 final CountDownLatch connectedLatch = new CountDownLatch(1); final List<Boolean> isSuccess = new ArrayList<>(1); future.addListener(new ChannelFutureListener() @Override public void operationComplete(ChannelFuture future) throws Exception if(future.isSuccess()) isSuccess.add(true); else isSuccess.add(false); connectedLatch.countDown(); ); connectedLatch.await(); if(isSuccess.get(0)) return channel; catch(Exception e) group.shutdownGracefully(); e.printStackTrace(); return null; //根据 url 获取阻塞队列 public ArrayBlockingQueue<Channel> acqiure(URL url) System.out.println(channelPoolMap.toString()); return channelPoolMap.get(url); //channel 使用完毕后进行回收 public void release(ArrayBlockingQueue<Channel> queue, Channel channel, URL url) if(queue == null) return; //需要检查 channel 是否可用,如果不可用,重新注册一个放入阻塞队列中 if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()) if (channel != null) channel.deregister().syncUninterruptibly().awaitUninterruptibly(); channel.closeFuture().syncUninterruptibly().awaitUninterruptibly(); Channel c = null; while(c == null) c = registerChannel(url); queue.offer(c); return; queue.offer(channel);
给出对应的 handler 实现,在 channelread0 里面读取 server 端返回的信息,因为 netty 是异步的,所以需要 MessageCallBack 来实现我们的同步调用。
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> private ChannelHandlerContext context; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception cause.printStackTrace(); ctx.close(); @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception System.out.println("停止时间是:"+new Date()); System.out.println("HeartBeatClientHandler channelInactive"); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception this.context = ctx; System.out.println("激活时间是:"+ctx.channel().id()); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception // String res = (String)msg; //RpcResponse rpcResponse = (RpcResponse)msg; String responseId = rpcResponse.getResponseId(); MessageCallBack callBack = ResponseHolder.getInstance().mapCallBack.get(responseId); if(callBack != null) ResponseHolder.getInstance().mapCallBack.remove(responseId); callBack.over(rpcResponse);
MessageCallBack 的实现。
public class MessageCallBack private RpcRequest rpcRequest; private RpcResponse rpcResponse; private Lock lock = new ReentrantLock(); private Condition finish = lock.newCondition(); public MessageCallBack(RpcRequest request) this.rpcRequest = request; public Object start() throws InterruptedException try lock.lock(); //设定一下超时时间,rpc服务器太久没有相应的话,就默认返回空吧。 finish.await(10*1000, TimeUnit.MILLISECONDS); if (this.rpcResponse != null) return this.rpcResponse.getData(); else return null; finally lock.unlock(); public void over(RpcResponse reponse) try lock.lock(); this.rpcResponse = reponse; finish.signal(); finally lock.unlock();
既然是可插拔式框架,那么底层协议一定要是可选择的,所以我们定义一个顶层接口来支持我们选择协议。
start 方法是启动服务端,send 方法是客户端发送数据。public interface Procotol void start(URL url); Object send(URL url, RpcRequest invocation);
对应的三个协议的接口实现。
Netty 的实现public class DubboProcotol implements Procotol @Override public void start(URL url) NettyServer nettyServer = NettyServer.getInstance(); nettyServer.start(url.getHost(),url.getPort()); @Override public Object send(URL url, RpcRequest invocation) ArrayBlockingQueue<Channel> queue = NettyChannelPoolFactory.getInstance().acqiure(url); Channel channel = null; try channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS); if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()) channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS); if(channel == null) channel = NettyChannelPoolFactory.getInstance().registerChannel(url); //将本次调用的信息写入Netty通道,发起异步调用 ChannelFuture channelFuture = channel.writeAndFlush(invocation); channelFuture.syncUninterruptibly(); MessageCallBack callback = new MessageCallBack(invocation); ResponseHolder.getInstance().mapCallBack.put(invocation.getRequestId(), callback); try return callback.start(); catch (InterruptedException e) e.printStackTrace(); return null; catch (InterruptedException e1) e1.printStackTrace(); finally System.out.println("release:"+channel.id()); NettyChannelPoolFactory.getInstance().release(queue, channel, url); return null;
http 的实现
public class HttpProcotol implements Procotol @Override public void start(URL url) HttpServer httpServer = HttpServer.getInstance(); httpServer.start(url.getHost(),url.getPort()); @Override public Object send(URL url, RpcRequest invocation) HttpClient httpClient = HttpClient.getInstance(); return httpClient.post(url.getHost(),url.getPort(),invocation);
Socket 的实现
public class SocketProcotol implements Procotol @Override public void start(URL url) SocketServer socketServer = SocketServer.getInstance(); socketServer.publiser(url.getPort()); @Override public Object send(URL url, RpcRequest invocation) SocketClient socketClient = SocketClient.getInstance(); return socketClient.sendRequest(url.getHost(),url.getPort(),invocation);
这样一个可选择协议的模型就实现了,我们可已通过这个模块选择协议,并且与服务端通信。
以上是关于带你手写基于 Spring 的可插拔式 RPC 框架通信协议模块的主要内容,如果未能解决你的问题,请参考以下文章
带你手写基于 Spring 的可插拔式 RPC 框架通信协议模块