RPC&Netty 学习之路

Posted 玩转编程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC&Netty 学习之路相关的知识,希望对你有一定的参考价值。

    本着学习与开源精神,在学习之余试着基于netty做一个RPC框架,这样也可以对netty做一个练习,并可进一步了解RPC原理。

    RPC框架,则不可缺少网络请求交互。既然选择基于netty, 那么我们需要先来学习一下netty在C/S架构中是怎么交互的。

Server 启动类代码: 

package com.dream.mqpushserver.test;
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioserverSocketChannel;
public class ServerTest {
public static void main(String[] args) {
EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss,work) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .option(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ServeHandlerTest()); } });
ChannelFuture bind = serverBootstrap.bind(10010);
}}

ServeHandlerTest 代码 

package com.dream.mqpushserver.test;
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;
public class ServeHandlerTest extends SimpleChannelInboundHandler<ByteBuf> {

@Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); System.out.println("server received "+ new String(bytes)); ctx.channel().writeAndFlush(Unpooled.copiedBuffer("\n this is server response ....".getBytes())); }}

客户端启动类代码:

package com.dream.mqpushserver.test;
import io.netty.bootstrap.Bootstrap;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;
public class ClientTest {
public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .option(ChannelOption.SO_KEEPALIVE,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ClientHandlerTest()); } });
ChannelFuture connect = bootstrap.connect("127.0.0.1", 10010);
for (int i = 0; i < 10; i++) { connect.channel().writeAndFlush(Unpooled.copiedBuffer((i + "this is client request ").getBytes() )); } System.out.println("client send done ... "); }}

客户端处理类代码:

package com.dream.mqpushserver.test;
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;
public class ClientHandlerTest extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); System.out.println("client received "+ new String(bytes)); }}
这样就可以完成client与server端的相互通信。
但是,是否这样就完成了C/S结构通信的基础呢?
很显然不是,因为在socket传输中,有着一个常见的问题半包/粘包问题UDP由于有消息边界,所以不存在半包/粘包问题,因此我们就不得不对接收到的数据进行处理,来防止读取到的数据不完整的情况,
那我们应该怎么处理半包/粘包问题呢?
自定义解码器。。。。


以上是关于RPC&Netty 学习之路的主要内容,如果未能解决你的问题,请参考以下文章

Netty学习

Netty实战高性能分布式RPC(Dubbo分布式底层实现)

IT新手之路Netty学习之旅

Java进阶:Netty实现RPC的代码

Netty学习二

netty实现rpc框架