手写RPC框架第二章《netty通信》
Posted bugstack虫洞栈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写RPC框架第二章《netty通信》相关的知识,希望对你有一定的参考价值。
故善攻者,敌不知其所守;善守者,敌不知其所
案例介绍
在我们实现rpc框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们rpc框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。
这里我们选择netty作为我们的socket框架,采用future方式进行通信。
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64
代码示例
1itstack-demo-rpc-02
2└── src
3 └── main
4 │ └── java
5 │ └── org.itstack.demo.rpc.network
6 │ ├── client
7 │ │ ├── ClientSocket.java
8 │ │ └── MyClientHandler.java
9 │ ├── codec
10 │ │ ├── RpcDecoder.java
11 │ │ └── RpcEncoder.java
12 │ ├── future
13 │ │ ├── SyncWrite.java
14 │ │ ├── SyncWriteFuture.java
15 │ │ ├── SyncWriteMap.java
16 │ │ └── WriteFuture.java
17 │ ├── msg
18 │ │ ├── Request.java
19 │ │ └── Response.java
20 │ ├── server
21 │ │ ├── MyServerHandler.java
22 │ │ └── ServerSocket.java
23 │ └── util
24 │ └── SerializationUtil.java
25 └── test
26 └── java
27 └── org.itstack.demo.test
28 ├── client
29 │ └── StartClient.java
30 └── server
31 └── StartServer.java
ClientSocket.java
1package org.itstack.demo.rpc.network.client;
2
3import io.netty.bootstrap.Bootstrap;
4import io.netty.channel.ChannelFuture;
5import io.netty.channel.ChannelInitializer;
6import io.netty.channel.ChannelOption;
7import io.netty.channel.EventLoopGroup;
8import io.netty.channel.nio.NioEventLoopGroup;
9import io.netty.channel.socket.SocketChannel;
10import io.netty.channel.socket.nio.NiosocketChannel;
11import org.itstack.demo.rpc.network.codec.RpcDecoder;
12import org.itstack.demo.rpc.network.codec.RpcEncoder;
13import org.itstack.demo.rpc.network.msg.Request;
14import org.itstack.demo.rpc.network.msg.Response;
15
16/**
17 * http://www.itstack.org
18 * create by fuzhengwei on 2019/5/6
19 */
20public class ClientSocket implements Runnable {
21
22 private ChannelFuture future;
23
24 @Override
25 public void run() {
26 EventLoopGroup workerGroup = new NioEventLoopGroup();
27 try {
28 Bootstrap b = new Bootstrap();
29 b.group(workerGroup);
30 b.channel(NioSocketChannel.class);
31 b.option(ChannelOption.AUTO_READ, true);
32 b.handler(new ChannelInitializer<SocketChannel>() {
33 @Override
34 public void initChannel(SocketChannel ch) throws Exception {
35 ch.pipeline().addLast(
36 new RpcDecoder(Response.class),
37 new RpcEncoder(Request.class),
38 new MyClientHandler());
39 }
40 });
41 ChannelFuture f = b.connect("127.0.0.1", 7397).sync();
42 this.future = f;
43 f.channel().closeFuture().sync();
44 } catch (InterruptedException e) {
45 e.printStackTrace();
46 } finally {
47 workerGroup.shutdownGracefully();
48 }
49 }
50
51 public ChannelFuture getFuture() {
52 return future;
53 }
54
55 public void setFuture(ChannelFuture future) {
56 this.future = future;
57 }
58}
MyClientHandler.java
1package org.itstack.demo.rpc.network.client;
2
3import io.netty.channel.ChannelHandlerContext;
4import io.netty.channel.ChannelInboundHandlerAdapter;
5import org.itstack.demo.rpc.network.future.SyncWriteFuture;
6import org.itstack.demo.rpc.network.future.SyncWriteMap;
7import org.itstack.demo.rpc.network.msg.Response;
8
9/**
10 * http://www.itstack.org
11 * create by fuzhengwei on 2019/5/6
12 */
13public class MyClientHandler extends ChannelInboundHandlerAdapter {
14
15 @Override
16 public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
17 Response msg = (Response) obj;
18 String requestId = msg.getRequestId();
19 SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
20 if (future != null) {
21 future.setResponse(msg);
22 }
23 }
24
25 @Override
26 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
27 cause.printStackTrace();
28 ctx.close();
29 }
30
31}
RpcDecoder.java
1package org.itstack.demo.rpc.network.codec;
2
3import io.netty.buffer.ByteBuf;
4import io.netty.channel.ChannelHandlerContext;
5import io.netty.handler.codec.ByteToMessageDecoder;
6import org.itstack.demo.rpc.network.util.SerializationUtil;
7
8import java.util.List;
9
10/**
11 * http://www.itstack.org
12 * create by fuzhengwei on 2019/5/6
13 */
14public class RpcDecoder extends ByteToMessageDecoder {
15
16 private Class<?> genericClass;
17
18 public RpcDecoder(Class<?> genericClass) {
19 this.genericClass = genericClass;
20 }
21
22 @Override
23 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
24 if (in.readableBytes() < 4) {
25 return;
26 }
27 in.markReaderIndex();
28 int dataLength = in.readInt();
29 if (in.readableBytes() < dataLength) {
30 in.resetReaderIndex();
31 return;
32 }
33 byte[] data = new byte[dataLength];
34 in.readBytes(data);
35 out.add(SerializationUtil.deserialize(data, genericClass));
36 }
37
38}
RpcEncoder.java
1package org.itstack.demo.rpc.network.codec;
2
3import io.netty.buffer.ByteBuf;
4import io.netty.channel.ChannelHandlerContext;
5import io.netty.handler.codec.MessageToByteEncoder;
6import org.itstack.demo.rpc.network.util.SerializationUtil;
7
8/**
9 * http://www.itstack.org
10 * create by fuzhengwei on 2019/5/6
11 */
12public class RpcEncoder extends MessageToByteEncoder {
13
14 private Class<?> genericClass;
15
16 public RpcEncoder(Class<?> genericClass) {
17 this.genericClass = genericClass;
18 }
19
20 @Override
21 protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) {
22 if (genericClass.isInstance(in)) {
23 byte[] data = SerializationUtil.serialize(in);
24 out.writeInt(data.length);
25 out.writeBytes(data);
26 }
27 }
28
29}
SyncWrite.java
1package org.itstack.demo.rpc.network.future;
2
3import io.netty.channel.Channel;
4import io.netty.channel.ChannelFuture;
5import io.netty.channel.ChannelFutureListener;
6import org.itstack.demo.rpc.network.msg.Request;
7import org.itstack.demo.rpc.network.msg.Response;
8
9import java.util.UUID;
10import java.util.concurrent.TimeUnit;
11import java.util.concurrent.TimeoutException;
12
13public class SyncWrite {
14
15 public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {
16
17 if (channel == null) {
18 throw new NullPointerException("channel");
19 }
20 if (request == null) {
21 throw new NullPointerException("request");
22 }
23 if (timeout <= 0) {
24 throw new IllegalArgumentException("timeout <= 0");
25 }
26
27 String requestId = UUID.randomUUID().toString();
28 request.setRequestId(requestId);
29
30 WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
31 SyncWriteMap.syncKey.put(request.getRequestId(), future);
32
33 Response response = doWriteAndSync(channel, request, timeout, future);
34
35 SyncWriteMap.syncKey.remove(request.getRequestId());
36 return response;
37 }
38
39 private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {
40
41 channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
42 public void operationComplete(ChannelFuture future) throws Exception {
43 writeFuture.setWriteResult(future.isSuccess());
44 writeFuture.setCause(future.cause());
45 //失败移除
46 if (!writeFuture.isWriteSuccess()) {
47 SyncWriteMap.syncKey.remove(writeFuture.requestId());
48 }
49 }
50 });
51
52 Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
53 if (response == null) {
54 if (writeFuture.isTimeout()) {
55 throw new TimeoutException();
56 } else {
57 // write exception
58 throw new Exception(writeFuture.cause());
59 }
60 }
61 return response;
62 }
63
64}
SyncWriteFuture.java
1package org.itstack.demo.rpc.network.future;
2
3
4import org.itstack.demo.rpc.network.msg.Response;
5
6import java.util.concurrent.CountDownLatch;
7import java.util.concurrent.ExecutionException;
8import java.util.concurrent.TimeUnit;
9import java.util.concurrent.TimeoutException;
10
11public class SyncWriteFuture implements WriteFuture<Response> {
12
13 private CountDownLatch latch = new CountDownLatch(1);
14 private final long begin = System.currentTimeMillis();
15 private long timeout;
16 private Response response;
17 private final String requestId;
18 private boolean writeResult;
19 private Throwable cause;
20 private boolean isTimeout = false;
21
22 public SyncWriteFuture(String requestId) {
23 this.requestId = requestId;
24 }
25
26 public SyncWriteFuture(String requestId, long timeout) {
27 this.requestId = requestId;
28 this.timeout = timeout;
29 writeResult = true;
30 isTimeout = false;
31 }
32
33
34 public Throwable cause() {
35 return cause;
36 }
37
38 public void setCause(Throwable cause) {
39 this.cause = cause;
40 }
41
42 public boolean isWriteSuccess() {
43 return writeResult;
44 }
45
46 public void setWriteResult(boolean result) {
47 this.writeResult = result;
48 }
49
50 public String requestId() {
51 return requestId;
52 }
53
54 public Response response() {
55 return response;
56 }
57
58 public void setResponse(Response response) {
59 this.response = response;
60 latch.countDown();
61 }
62
63 public boolean cancel(boolean mayInterruptIfRunning) {
64 return true;
65 }
66
67 public boolean isCancelled() {
68 return false;
69 }
70
71 public boolean isDone() {
72 return false;
73 }
74
75 public Response get() throws InterruptedException, ExecutionException {
76 latch.wait();
77 return response;
78 }
79
80 public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
81 if (latch.await(timeout, unit)) {
82 return response;
83 }
84 return null;
85 }
86
87 public boolean isTimeout() {
88 if (isTimeout) {
89 return isTimeout;
90 }
91 return System.currentTimeMillis() - begin > timeout;
92 }
93}
SyncWriteMap.java
1package org.itstack.demo.rpc.network.future;
2
3import java.util.Map;
4import java.util.concurrent.ConcurrentHashMap;
5
6public class SyncWriteMap {
7
8 public static Map<String, WriteFuture> syncKey = new ConcurrentHashMap<String, WriteFuture>();
9
10}
WriteFuture.java
1package org.itstack.demo.rpc.network.future;
2
3import org.itstack.demo.rpc.network.msg.Response;
4
5import java.util.concurrent.Future;
6
7public interface WriteFuture<T> extends Future<T> {
8
9 Throwable cause();
10
11 void setCause(Throwable cause);
12
13 boolean isWriteSuccess();
14
15 void setWriteResult(boolean result);
16
17 String requestId();
18
19 T response();
20
21 void setResponse(Response response);
22
23 boolean isTimeout();
24
25
26}
Request.java
1package org.itstack.demo.rpc.network.msg;
2
3/**
4 * http://www.itstack.org
5 * create by fuzhengwei on 2019/5/6
6 */
7public class Request {
8
9 private String requestId;
10 private Object result;
11
12 public String getRequestId() {
13 return requestId;
14 }
15
16 public void setRequestId(String requestId) {
17 this.requestId = requestId;
18 }
19
20 public Object getResult() {
21 return result;
22 }
23
24 public void setResult(Object result) {
25 this.result = result;
26 }
27
28}
Response.java
1package org.itstack.demo.rpc.network.msg;
2
3/**
4 * http://www.itstack.org
5 * create by fuzhengwei on 2019/5/6
6 */
7public class Response {
8
9 private String requestId;
10 private String param;
11
12 public String getRequestId() {
13 return requestId;
14 }
15
16 public void setRequestId(String requestId) {
17 this.requestId = requestId;
18 }
19
20 public String getParam() {
21 return param;
22 }
23
24 public void setParam(String param) {
25 this.param = param;
26 }
27
28}
MyServerHandler.java
1package org.itstack.demo.rpc.network.server;
2
3import io.netty.channel.ChannelHandlerContext;
4import io.netty.channel.ChannelInboundHandlerAdapter;
5import io.netty.util.ReferenceCountUtil;
6import org.itstack.demo.rpc.network.msg.Request;
7import org.itstack.demo.rpc.network.msg.Response;
8
9/**
10 * http://www.itstack.org
11 * create by fuzhengwei on 2019/5/6
12 */
13public class MyServerHandler extends ChannelInboundHandlerAdapter{
14
15 @Override
16 public void channelRead(ChannelHandlerContext ctx, Object obj){
17 Request msg = (Request) obj;
18 //反馈
19 Response request = new Response();
20 request.setRequestId(msg.getRequestId());
21 request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理。");
22 ctx.writeAndFlush(request);
23 //释放
24 ReferenceCountUtil.release(msg);
25 }
26
27 @Override
28 public void channelReadComplete(ChannelHandlerContext ctx) {
29 ctx.flush();
30 }
31
32}
ServerSocket.java
1package org.itstack.demo.rpc.network.server;
2
3import io.netty.bootstrap.ServerBootstrap;
4import io.netty.channel.ChannelFuture;
5import io.netty.channel.ChannelInitializer;
6import io.netty.channel.ChannelOption;
7import io.netty.channel.EventLoopGroup;
8import io.netty.channel.nio.NioEventLoopGroup;
9import io.netty.channel.socket.SocketChannel;
10import io.netty.channel.socket.nio.NioServerSocketChannel;
11import org.itstack.demo.rpc.network.codec.RpcDecoder;
12import org.itstack.demo.rpc.network.codec.RpcEncoder;
13import org.itstack.demo.rpc.network.msg.Request;
14import org.itstack.demo.rpc.network.msg.Response;
15
16/**
17 * http://www.itstack.org
18 * create by fuzhengwei on 2019/5/6
19 */
20public class ServerSocket implements Runnable {
21
22 private ChannelFuture f;
23
24 @Override
25 public void run() {
26 EventLoopGroup bossGroup = new NioEventLoopGroup();
27 EventLoopGroup workerGroup = new NioEventLoopGroup();
28 try {
29 ServerBootstrap b = new ServerBootstrap();
30 b.group(bossGroup, workerGroup)
31 .channel(NioServerSocketChannel.class)
32 .option(ChannelOption.SO_BACKLOG, 128)
33 .childHandler(new ChannelInitializer<SocketChannel>() {
34 @Override
35 public void initChannel(SocketChannel ch){
36 ch.pipeline().addLast(
37 new RpcDecoder(Request.class),
38 new RpcEncoder(Response.class),
39 new MyServerHandler());
40 }
41 });
42
43 ChannelFuture f = null;
44 f = b.bind(7397).sync();
45 f.channel().closeFuture().sync();
46
47
48 } catch (InterruptedException e) {
49 e.printStackTrace();
50 } finally {
51 workerGroup.shutdownGracefully();
52 bossGroup.shutdownGracefully();
53 }
54
55 }
56
57}
SerializationUtil.java
1package org.itstack.demo.rpc.network.util;
2
3import com.dyuproject.protostuff.LinkedBuffer;
4import com.dyuproject.protostuff.ProtostuffIOUtil;
5import com.dyuproject.protostuff.Schema;
6import com.dyuproject.protostuff.runtime.RuntimeSchema;
7import org.objenesis.Objenesis;
8import org.objenesis.ObjenesisStd;
9
10import java.util.Map;
11import java.util.concurrent.ConcurrentHashMap;
12
13/**
14 * Created by fuzhengwei1 on 2016/10/20.
15 */
16public class SerializationUtil {
17
18 private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap();
19
20 private static Objenesis objenesis = new ObjenesisStd();
21
22 private SerializationUtil() {
23
24 }
25
26 /**
27 * 序列化(对象 -> 字节数组)
28 *
29 * @param obj 对象
30 * @return 字节数组
31 */
32 public static <T> byte[] serialize(T obj) {
33 Class<T> cls = (Class<T>) obj.getClass();
34 LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
35 try {
36 Schema<T> schema = getSchema(cls);
37 return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
38 } catch (Exception e) {
39 throw new IllegalStateException(e.getMessage(), e);
40 } finally {
41 buffer.clear();
42 }
43 }
44
45 /**
46 * 反序列化(字节数组 -> 对象)
47 *
48 * @param data
49 * @param cls
50 * @param <T>
51 */
52 public static <T> T deserialize(byte[] data, Class<T> cls) {
53 try {
54 T message = objenesis.newInstance(cls);
55 Schema<T> schema = getSchema(cls);
56 ProtostuffIOUtil.mergeFrom(data, message, schema);
57 return message;
58 } catch (Exception e) {
59 throw new IllegalStateException(e.getMessage(), e);
60 }
61 }
62
63 private static <T> Schema<T> getSchema(Class<T> cls) {
64 Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
65 if (schema == null) {
66 schema = RuntimeSchema.createFrom(cls);
67 cachedSchema.put(cls, schema);
68 }
69 return schema;
70 }
71
72}
StartClient.java
1package org.itstack.demo.test.client;
2
3import com.alibaba.fastjson.JSON;
4import io.netty.channel.ChannelFuture;
5import org.itstack.demo.rpc.network.client.ClientSocket;
6import org.itstack.demo.rpc.network.future.SyncWrite;
7import org.itstack.demo.rpc.network.msg.Request;
8import org.itstack.demo.rpc.network.msg.Response;
9
10/**
11 * http://www.itstack.org
12 * create by fuzhengwei on 2019/5/6
13 */
14public class StartClient {
15
16 private static ChannelFuture future;
17
18 public static void main(String[] args) {
19 ClientSocket client = new ClientSocket();
20 new Thread(client).start();
21
22 while (true) {
23 try {
24 //获取future,线程有等待处理时间
25 if (null == future) {
26 future = client.getFuture();
27 Thread.sleep(500);
28 continue;
29 }
30 //构建发送参数
31 Request request = new Request();
32 request.setResult("查询用户信息");
33 SyncWrite s = new SyncWrite();
34 Response response = s.writeAndSync(future.channel(), request, 1000);
35 System.out.println("调用结果:" + JSON.toJSON(response));
36 Thread.sleep(1000);
37 } catch (Exception e) {
38 e.printStackTrace();
39 }
40 }
41 }
42
43}
StartServer.java
1package org.itstack.demo.test.server;
2
3import org.itstack.demo.rpc.network.server.ServerSocket;
4
5/**
6 * http://www.itstack.org
7 * create by fuzhengwei on 2019/5/6
8 */
9public class StartServer {
10
11 public static void main(String[] args) {
12 System.out.println("启动服务端开始");
13 new Thread(new ServerSocket()).start();
14 System.out.println("启动服务端完成");
15 }
16
17}
测试结果
启动StartServer
1启动服务端开始
2启动服务端完成
3log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
4log4j:WARN Please initialize the log4j system properly.
5log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
6
启动StartClient
1log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
2log4j:WARN Please initialize the log4j system properly.
3log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"3380f061-2501-49b5-998b-21b5956fe60a"}
5调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"81c51815-4d92-482c-bd05-e4b6dfa4d3b6"}
6调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7af01c4f-a438-47a1-b35c-8e2cd7e4a5e7"}
7调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"86e38bb1-eccc-4d45-b976-c3b67999e3ab"}
8调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7f72002c-3b38-43d9-8452-db8797298899"}
9调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"d566a7d4-4b0d-426b-8c09-c535ccf8eb09"}
10
11...
以上是关于手写RPC框架第二章《netty通信》的主要内容,如果未能解决你的问题,请参考以下文章
Day480.Netty手写dubboRPC框架 -netty
Day480.Netty手写dubboRPC框架 -netty
手写RPC框架-重写服务治理和引入netty,开启1000个线程看看执行调用情况