浅谈RPC及Netty在RPC中的应用
Posted Megustas_JJC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈RPC及Netty在RPC中的应用相关的知识,希望对你有一定的参考价值。
什么是RPC
RPC协议:只是定义数据传输格式和传输方式,是一种应用层协议。
传输方式:有基于HTTP传输数据的RPC Over HTTP,也有基于TCP的RPC Over TCP等。
数据格式:双方协商定义,一般包括以下几点:
1、类名
2、方法名
3、参数类型(用来确定具体执行的方法,有方法重载)
4、参数值
(个人对于网络协议这里了解的不够深入,只能广义上讲下对rpc的认识)
官方说明:
一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加轻易。
RPC架构
一个完整的RPC架构里面包含了四个核心的组件,分别是Client ,Server,Client Stub以及Server Stub,这个Stub大家可以理解为存根。分别说说这几个组件:
客户端(Client),服务的调用方。
服务端(Server),真正的服务提供者。
客户端存根,存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
服务端存根,接收客户端发送过来的消息,将消息解包,并调用本地的方法
dubbo
dubbo具体可以参考dubbo文档:https://dubbo.apache.org/zh/docs/v2.7/user/quick-start/
dubbo源码:https://github.com/apache/dubbo
对应上图RPC架构的dubbo依赖关系:
接下来以dubbo官方源码的demo项目为例子进行Netty在dubbo中的作用分析。
dubbo启动
启动之前需要先安装好zookeeper,zkServer start先本地启动zk,默认地址127.0.0.1:2181,结束zkServer stop
dubbo-provider.xml配置注册中心地址
dubbo-consumer.xml配置为provider提供的注册中心地址
做好以上准备工作,启动dubbo的demo项目即可
Netty在dubbo的Provider提供者中的使用
provider启动程序:
/**
* provider加载spring配置
* Provider作为被访问方,是一个 Server模式的Socket
*/
public class Application {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-provider.xml");
context.start();
// 按任意键退出
System.in.read();
}
}
至此运行到DubboProtocol的export方法
export(Invoker<T> invoker)
|
openServer(URL url)
|
ProtocolServer createServer(URL url)
打印出对应的执行栈:
doOpen:89, NettyServer (org.apache.dubbo.remoting.transport.netty4) 创建Netty Server端
<init>:71, AbstractServer (org.apache.dubbo.remoting.transport)
<init>:79, NettyServer (org.apache.dubbo.remoting.transport.netty4)
bind:35, NettyTransporter (org.apache.dubbo.remoting.transport.netty4)
bind:-1, Transporter$Adaptive (org.apache.dubbo.remoting)
bind:56, Transporters (org.apache.dubbo.remoting)
bind:44, HeaderExchanger (org.apache.dubbo.remoting.exchange.support.header)
bind:70, Exchangers (org.apache.dubbo.remoting.exchange) Bind a server
|
createServer:347, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
openServer:321, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
export:304, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
各种export
export:-1, Protocol$Adaptive (org.apache.dubbo.rpc) Export service for remote invocation
|
doExportUrlsFor1Protocol:494, ServiceConfig (org.apache.dubbo.config)
doExportUrls:327, ServiceConfig (org.apache.dubbo.config)
doExport:303, ServiceConfig (org.apache.dubbo.config)
export:204, ServiceConfig (org.apache.dubbo.config)
|
start:889, DubboBootstrap (org.apache.dubbo.config.bootstrap) The bootstrap class of Dubbo
onContextRefreshedEvent:69, DubboBootstrapApplicationListener (org.apache.dubbo.config.spring.context)
onApplicationContextEvent:62, DubboBootstrapApplicationListener (org.apache.dubbo.config.spring.context)
|
main:27, Application (org.apache.dubbo.demo.provider)
当Spring容器启动的时候,会调用一些扩展类的初始化方法,比如继承了ApplicationContextAware,ApplicationListener的DubboBootstrapApplicationListener。而dubbo创建了ServiceBean继承了一个监听器,该类有一个export方法,用于打开 ServerSocket 。
然后执行了DubboProtocol的createServer方法,然后创建了一个NettyServer对象。NettyServer对象的构造方法同样是doOpen方法
真正创建Netty的Server:
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, keepalive)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
该方法中,看到了boss 线程,worker 线程,和 ServerBootstrap,在添加了编解码 handler 之后,添加一个 NettyHandler,最后调用 bind 方法,完成绑定端口的工作。
具体看下4个ChannelHandler的作用:
编解码器-NettyCodecAdapter
适配了解码器和编码器,传进去的Codec2对象,底下由这个Codec2实现解码和编码
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
...
codec.encode(channel, buffer, msg);
...
}
}
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
...
msg = codec.decode(channel, message);
...
}
codec可以找到跟dubbo协议相关的DubboCodec类,在重载的decodeBody方法中,包装DecodeableRpcInvocation,塞进Request.data中,返回这个Request给下级handler,在NettyServerHandler中处理
心跳机制IdleStateHandler
用于检测远端是否存活,会对应触发userEventTrigger()方法,具体示例可以参考基于Netty的IM系统
业务Handler-NettyServerHandler
是对数据流的处理,直接看NettyServerHandler的channelRead:
责任链的方式途径几个关键ChannelHandler
DecodeHandler-transport层(面向message)
这个handler接收到上级decoder解码后的request(内部data是一个DecodeableRpcInvocation对象),进入received逻辑
实际是执行DecodeableRpcInvocation的decode方法,主要步骤是读取方法名/参数类型/参数/attachment,完了之后这个DecodeableRpcInvocation可以被使用,继续调用内部handler
HeaderExchangeHandler-exchanger层(面向request/response)
exchanger层(面向request/response),最终调用DubboProtocol.reply方法,实际是DubboProtocol中的一个匿名内部类的对象requestHandler经过几层包装而来
HeaderExchangeHandler的received方法中
handleRequest中处理request并返回response
这里reply调用的是DubboProtocol的内部类
DubboProtocol-protocol层(面向invoker, invocation)
拿到上层解析完的invocation,找到invoker,调用代理的service实现
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
...
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
自此,从网络字节流invocation的逐级转化完成,接下去就是invoker调用service的过程
Netty在dubbo的Consumer提供者中的使用
对于consumer的创建,经过的各dubbo层与provider类似
consumer启动程序:
/**
* 加载Spring配置并调用远程服务
*/
public class Application {
/**
* In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
* launch the application
*/
public static void main(String[] args) throws Exception {
// dubbo-consumer.xml 通过Spring配置引用远程服务
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
context.start();
// 获取远程服务代理,这个过程中会调用DubboProtocol实例的getClients(URL url)方法
DemoService demoService = context.getBean("demoService", DemoService.class);
GreetingService greetingService = context.getBean("greetingService", GreetingService.class);
new Thread(() -> {
while (true) {
// 执行远程方法,最终会调用HeaderExchangeChannel的request方法,通过channel进行请求,本质NioClientSocketChannel的write方法。
String greetings = greetingService.hello();
// 调用结果
System.out.println(greetings + " from separated thread.");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
while (true) {
CompletableFuture<String> hello = demoService.sayHelloAsync("world");
System.out.println("result: " + hello.get());
String greetings = greetingService.hello();
System.out.println("result: " + greetings);
Thread.sleep(500);
}
}
}
执行栈:
doOpen:92, NettyClient (org.apache.dubbo.remoting.transport.netty4) 创建Netty的bootstrap
<init>:63, AbstractClient (org.apache.dubbo.remoting.transport)
<init>:82, NettyClient (org.apache.dubbo.remoting.transport.netty4)
connect:40, NettyTransporter (org.apache.dubbo.remoting.transport.netty4)
connect:-1, Transporter$Adaptive (org.apache.dubbo.remoting)
connect:75, Transporters (org.apache.dubbo.remoting)
connect:39, HeaderExchanger (org.apache.dubbo.remoting.exchange.support.header)
connect:109, Exchangers (org.apache.dubbo.remoting.exchange)
initClient:615, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo) 创建Netty的client
buildReferenceCountExchangeClient:583, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
buildReferenceCountExchangeClientList:570, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
getSharedClient:495, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
getClients:426, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
protocolBindingRefer:403, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
refer:104, AbstractProtocol (org.apache.dubbo.rpc.protocol)
refer:74, ProtocolListenerWrapper (org.apache.dubbo.rpc.protocol)
refer:83, ProtocolFilterWrapper (org.apache.dubbo.rpc.protocol)
refer:-1, Protocol$Adaptive (org.apache.dubbo.rpc)
toInvokers:344, RegistryDirectory (org.apache.dubbo.registry.integration)
refreshInvoker:199, RegistryDirectory (org.apache.dubbo.registry.integration)
refreshOverrideAndInvoker:159, RegistryDirectory (org.apache.dubbo.registry.integration)
notify:141, RegistryDirectory (org.apache.dubbo.registry.integration)
notify:429, AbstractRegistry (org.apache.dubbo.registry.support)
doNotify:372, FailbackRegistry (org.apache.dubbo.registry.support)
notify:364, FailbackRegistry (org.apache.dubbo.registry.support)
doSubscribe:181, ZookeeperRegistry (org.apache.dubbo.registry.zookeeper)
subscribe:299, FailbackRegistry (org.apache.dubbo.registry.support)
subscribe:105, ListenerRegistryWrapper (org.apache.dubbo.registry)
subscribe:104, RegistryDirectory (org.apache.dubbo.registry.integration)
doCreateInvoker:517, RegistryProtocol (org.apache.dubbo.registry.integration)
getInvoker:63, InterfaceCompatibleRegistryProtocol (org.apache.dubbo.registry.integration)
refreshInterfaceInvoker:332, MigrationInvoker (org.apache.dubbo.registry.client.migration)
migrateToServiceDiscoveryInvoker:113, MigrationInvoker (org.apache.dubbo.registry.client.migration)
doMigrate:60, MigrationRuleHandler (org.apache.dubbo.registry.client.migration)
onRefer:103, MigrationRuleListener (org.apache.dubbo.registry.client.migration)
interceptInvoker:490, RegistryProtocol (org.apache.dubbo.registry.integration)
doRefer:476, RegistryProtocol (org.apache.dubbo.registry.integration)
refer:470, RegistryProtocol (org.apache.dubbo.registry.integration)
refer:72, ProtocolListenerWrapper (org.apache.dubbo.rpc.protocol)
refer:81, ProtocolFilterWrapper (org.apache.dubbo.rpc.protocol)
refer:-1以上是关于浅谈RPC及Netty在RPC中的应用的主要内容,如果未能解决你的问题,请参考以下文章