Day480.Netty手写dubboRPC框架 -netty
Posted 阿昌喜欢吃黄桃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day480.Netty手写dubboRPC框架 -netty相关的知识,希望对你有一定的参考价值。
Netty手写dubboRPC框架
一、RPC 基本介绍
rpc是远程调用的一种行为,在数据传输过程中涉及到传输协议,http就是一种传输协议
RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。
-
该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
-
两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样
- 常见的 RPC 框架有:
- 阿里的Dubbo
- google的gRPC
- Go语言的rpc
- Apache的thrift
- Spring旗下的 Spring Cloud。
二、 RPC 调用流程
说明:
- 服务消费方(client)以本地调用方式调用服务
- 【client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
- client stub 将消息进行编码并发送到服务端
- server stub 收到消息后进行解码
- server stub 根据解码结果调用本地的服务
- 本地服务执行并将结果返回给 server stub
- server stub 将返回导入结果进行编码并发送至消费方
- client stub 接收到消息并进行解码】
- 服务消费方(client)得到结果
RPC 的目标就是将
2-8 这些步骤都封装
起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用
三、实现 dubbo RPC(基于 Netty)
1、需求说明
-
dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
-
模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用
Netty 4.1.20
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
2、设计说明
- 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
3、代码实现
- 项目结构
- com.achang.netty.dubboRPC.consumer.ClientBootstrap
客户端启动器
public class ClientBootstrap
//定义协议头
public static final String providerName = "HelloService#hello";
public static void main(String[] args) throws InterruptedException
NettyClient client = new NettyClient();
HelloService serviceProxy = (HelloService) client.getBean(HelloService.class, providerName);//拿到代理对象
// for (; ; )
//调用客户端的方法
// Thread.sleep(2000);
String result = serviceProxy.hello("阿昌来也");
System.out.println("客户端调用服务端,结果为:" + result);
//
- com.achang.netty.dubboRPC.provider.ServerBootstrap
服务端启动器
public class ServerBootstrap
public static void main(String[] args) throws InterruptedException
NettyServer.startServer("127.0.0.1",7000);
- com.achang.netty.dubboRPC.netty.NettyClient
客户端初始化类
/******
@author 阿昌
@create 2021-12-17 21:32
******* Netty客户端
*/
public class NettyClient
//创建线程池
private static ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler nettyClientHandler;
/**
* 编写方式使用代理模式,获取一个代理对象
* @param serviceClass service类
* @param providerName 协议头
* @return 代理对象
*/
public Object getBean(final Class<?> serviceClass,final String providerName)
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]serviceClass,
((proxy, method, args) ->
//客户端每调用一次就会进入该代码块
//第一次调用
if (nettyClientHandler==null)
startClient0("127.0.0.1",7000);
//设置要发送给服务器的信息
//providerName协议头,args传入的参数
nettyClientHandler.setParam(providerName+args[0]);
return executors.submit(nettyClientHandler).get();
));
//初始化客户端
private static void startClient0(String ipaddr,Integer port)
nettyClientHandler = new NettyClientHandler();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try
Bootstrap bootstrap = new Bootstrap();
Bootstrap clientBootstrap = bootstrap.group(workerGroup)
.channel(NiosocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(nettyClientHandler);
);
clientBootstrap.connect(ipaddr,port).sync();
catch (InterruptedException e)
e.printStackTrace();
- com.achang.netty.dubboRPC.netty.NettyClientHandler
客户端处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable
private ChannelHandlerContext channelHandlerContext;//上下文
private String result;//调用的返回结果
private String param;//客户端调用方法时的参数
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
ctx.close();
//收到服务器的数据后就会被调用
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("channelRead");
result = msg.toString();
notify();//唤醒等待的线程
//与服务器连接成功后就会被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
System.out.println("channelActive");
channelHandlerContext = ctx;
//被代理对象调用,异步发送数据给服务器,然后阻塞,会等待被唤醒
@Override
public synchronized Object call() throws Exception
System.out.println("call1");
channelHandlerContext.writeAndFlush(param);
//进行wait阻塞
wait();
System.out.println("call2");
return result;
//设置发送的数据
void setParam(String msg)
System.out.println("setParam");
this.param = msg;
- com.achang.netty.dubboRPC.netty.NettyServer
服务端初始化类
public class NettyServer
public static void startServer(String hostname,Integer port) throws InterruptedException
startServer0(hostname,port);
private static void startServer0(String hostname,Integer port) throws InterruptedException
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try
ServerBootstrap bootstrap = new ServerBootstrap();
ServerBootstrap serverBootstrap = bootstrap.group(boosGroup, workerGroup)
// .handler(new LoggingHandler())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
);
System.out.println("服务端启动成功....端口:"+port);
ChannelFuture cf = serverBootstrap.bind(hostname, port).sync();
cf.channel().closeFuture().sync();
finally
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
- com.achang.netty.dubboRPC.netty.NettyServerHandler
服务端处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
//获取客户端发送来的消息,并调用服务
System.out.println("msg="+msg);
//客户端想要调用服务器的api时,想要满足一定协议的要求才能调用
//比如,我们这里要求,每次发送消息时,都必须要求以"HelloService#hello开头"
if (msg.toString().startsWith("HelloService#hello"))
String result = new HelloServiceImpl().hello(msg.toString().split("HelloService#hello")[1]);
ctx.writeAndFlush(result);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
ctx.close();
- com.achang.netty.dubboRPC.provider.HelloServiceImpl
客户端接口的真正实现Impl
public class HelloServiceImpl implements HelloService
private static int count = 0;
@Override
public String hello(String message)
System.out.println("客户端发来的消息为:【"+message+"】");
if (message!=null)
return "你好客户端,服务端已经收到了消息"+"调用次数为:【"+(++count)+"】";
else
return "消息不能为空";
- com.achang.netty.dubboRPC.publicinterface.HelloService
服务提供方 和 服务消费放 公共部分,约定的接口规范
public interface HelloService
String hello(String message);
4、测试
完结撒花!!!
源代码地址:
https://github.com/qq995931576/netty
以上是关于Day480.Netty手写dubboRPC框架 -netty的主要内容,如果未能解决你的问题,请参考以下文章