用 java 简单实现 rpc 通信

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用 java 简单实现 rpc 通信相关的知识,希望对你有一定的参考价值。


代码不一定能够运行起来,这是在之前的代码中抽象出来的。这里只是说说基本的思路
定义消息:

package com.xiaoyao.game.net.framework.codec;

import com.google.protobuf.MessageLite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NetCommand
private static final Logger logger = LoggerFactory.getLogger(NetCommand.class);
private int _cmdCode;// 消息:消息头
private MessageLite body; //消息:消息体

public int getCode()
return this._cmdCode;


public NetCommand(int cmdCode)
this._cmdCode = cmdCode;


public void parseFrom(byte[] data) throws Exception


public byte[] toBytes()
try
logger.info("accept:=========命令为:",this._cmdCode);
return this.body.toByteArray();
catch (Exception var2)
logger.error("error:============0x:" + Integer.toHexString(this._cmdCode));
var2.printStackTrace();
return null;



public void setBody(MessageLite body)
this.body = body;


public MessageLite getBody()
return this.body;

RPCResponse.java

package com.xiaoyao.game.rpc.remote;
import com.xiaoyao.game.net.framework.codec.NetCommand;
public abstract class RPCResponse extends NetCommand
public RPCResponse(int cmdCode)
super(cmdCode);


public abstract String getId();

public abstract void handleRequest(RPCRequest var1);

public abstract void parseFrom(byte[] var1) throws Exception;

RPCRequest.java

package com.xiaoyao.game.rpc.remote;

import com.xiaoyao.game.net.framework.codec.NetCommand;

public abstract class RPCRequest extends NetCommand
public String id;
public RPCResponse response;//注入RPCResponse

public RPCRequest(int cmdCode)
super(cmdCode);


public void genUniqueId()
this.id = Integer.toHexString(this.getCode()) + "." + Integer.toString((int)System.currentTimeMillis(), 36) + "." + Integer.toString((int)(Math.random() * 1.0E7D), 36);


public abstract Class getResponseClass();

public abstract void parseFrom(byte[] var1) throws Exception;

这两个抽象类分别定义了RPC请求和响应的必要方法,然后接着看

package com.xiaoyao.game.centerServer.remote;

import com.xiaoyao.game.room.GameRoomGen;
import com.xiaoyao.game.room.RoomInfo;
import com.xiaoyao.game.rpc.proto.CenterServerProto;
import com.xiaoyao.game.rpc.proto.RpcCommandCodeProto;
import com.xiaoyao.game.rpc.remote.RPCRequest;
import com.xiaoyao.game.rpc.remote.RPCResponse;

public class RPC_CreateRoomResponse extends RPCResponse

public RPC_CreateRoomResponse()
super(RpcCommandCodeProto.RpcCommandID.CREATE_ROOM_RESPONSE_VALUE);
// RpcCommandCodeProto.RpcCommandID.CREATE_ROOM_RESPONSE_VALUE 为消息头,实际上这是一个数字,每一个数字都代表了一种指令

@Override
public String getId()
if(this.getBody()==null)return null;
return ((CenterServerProto.RSCreateRoomResponse)this.getBody()).getId();



@Override
public void handleRequest(RPCRequest req)

try
CenterServerProto.RSCreateRoomRequest request = (CenterServerProto.RSCreateRoomRequest) req.getBody();//向下转型,这是标准的接口型写法,可以简化很多代码。这也是前面为啥需要两个抽象类来定义基本的方法的原因。
RoomInfo roomInfo = new RoomInfo();
roomInfo.setProto(request.getRoomInfo());
int result = GameRoomGen.RoomGen(roomInfo);
CenterServerProto.RSCreateRoomResponse.Builder answer = CenterServerProto.RSCreateRoomResponse.newBuilder();
answer.setId(request.getId());
answer.setResult(result);
answer.setRoomInfo(roomInfo.getProto());
this.setBody(answer.build());

catch (Exception e)
System.out.println("RPC_CreateRoomResponse.handleRequest() error:"+e);



@Override
public void parseFrom(byte[] data) throws Exception

setBody(CenterServerProto.RSCreateRoomResponse.parseFrom(data));

这个response在项目中是中心服务响应游戏服务发来的创建房间远程过程调用的响应类CenterServerProto 是protobuf打成的java类。里面的每个子类可以想象成每种传输结构,例如:CenterServerProto.RSCreateRoomRequest中,RSCreateRoomRequest就是一种传输的数据结构,里面包含各种字段。

//请求中心服务器创建房间请求
message RSCreateRoomRequest

required string id=1;
required SyncRoomInfo roomInfo=2;

通过protobuf官方提供的转换方法,可以将这种格式的结构转换成java代码。
上面说了,这是中心服务收到的请求返回的响应。那么问题来了,中心服务是如何实现类似于web服务器中的路由功能,将不同的请求分发到不同的响应类里面的呢。带着这样的问题,我们接着往下看

package com.xiaoyao.game.centerServer.remote;
import com.xiaoyao.game.rpc.proto.CenterServerProto;
import com.xiaoyao.game.rpc.proto.RpcCommandCodeProto;
import com.xiaoyao.game.rpc.remote.RPCRequest;
public class RPC_CreateRoomRequest extends RPCRequest
public RPC_CreateRoomRequest()
super(RpcCommandCodeProto.RpcCommandID.CREATE_ROOM_REQUEST_VALUE);


@Override
public Class getResponseClass()
return RPC_CreateRoomResponse.class;//这里会返回response的类


@Override
public void parseFrom(byte[] data) throws Exception

setBody(CenterServerProto.RSCreateRoomRequest.parseFrom(data));

创建房间的请求
来看看

// CenterRpcService.java
public class CenterRpcServer extends RPCServer
private static CenterRpcServer _centerServer;
public static void main(String[] args) throws Exception
getInstance().start();

public static CenterRpcServer getInstance() throws Exception
if (_centerServer == null)
_centerServer = new CenterRpcServer(_port);
//注册命令
regediterCommand();

return _centerServer;

private static void regediterCommand()


_centerServer.registerRequest(RPC_SetRoleSidRequest.class);
_centerServer.registerRequest(RPC_GetRoleRequest.class);
_centerServer.registerRequest(RPC_CreateRoleRequest.class);
_centerServer.registerRequest(RPC_SaveRoleRequest.class);
_centerServer.registerRequest(RPC_CreateRoomRequest.class);//被注册到服务中了,接下来看看注册到服务中都干了啥
_centerServer.registerRequest(RPC_ConnectSyncServerRequest.class);
_centerServer.registerRequest(RPC_SyncDataRequest.class);
_centerServer.registerRequest(RPC_RemoveRoomRequset.class);
_centerServer.registerRequest(RPC_GetRoomInfoRequest.class);
_centerServer.registerRequest(RPC_CreateRedPacketRequest.class);
_centerServer.registerRequest(RPC_GetSendRedPacketRequest.class);
_centerServer.registerRequest(RPC_GetReceiverRedPacketLogByIdRequest.class);
_centerServer.registerRequest(RPC_GetSenderRedPacketLogRequest.class);
_centerServer.registerRequest(RPC_ReceiverRedPacketRequest.class);


来看看 RPCServer

package com.xiaoyao.game.rpc.server;

import com.xiaoyao.game.net.framework.server.ClientConnection;
import com.xiaoyao.game.net.framework.server.ClientConnectionListener;
import com.xiaoyao.game.net.framework.server.ClientConnectionListenerFactory;
import com.xiaoyao.game.net.framework.server.NetServer;
import com.xiaoyao.game.rpc.remote.RPCRequest;

public class RPCServer implements ClientConnectionListenerFactory
private NetServer _netServer;
private int _port;

public RPCServer(String name, int port) throws Exception
this._port = port;
this._netServer = new NetServer(name, this._port, "command", "rpc");
this._netServer.setClientConnectionListenerFactory(this);


public void registerRequest(Class requestClass)
try
this._netServer.getCommandSet().addCommandClass(((RPCRequest)requestClass.newInstance()).getResponseClass());//注意这个方法,从request中调用了getResponseClass方法,即获取了response的类。而且对于每个request都有对应的类
this._netServer.getCommandSet().addCommandClass(requestClass);
catch (Exception var3)
var3.printStackTrace();




public void registerRequests(Class[] requestClasses)
for(int i = 0; i < requestClasses.length; ++i)
this.registerRequest(requestClasses[i]);




public void start() throws Exception
this._netServer.start();


public void stop() throws Exception
this._netServer.stop();


public ClientConnectionListener createListener(ClientConnection conn)
return new RPCListener(conn);

然后来看看NetService

public class NetServer 
.....

public void start() throws Exception
try
this.bossGroup = new NioEventLoopGroup(0, new NameThreadFactory("server-boss"));
this.workGroup = new NioEventLoopGroup(0, new NameThreadFactory("server-work"));
this.bootstrap = ((ServerBootstrap)(new ServerBootstrap()).group(this.bossGroup, this.workGroup).channel(NioserverSocketChannel.class)).childHandler(new ChannelInitializer<SocketChannel>()
protected void initChannel(SocketChannel channel) throws Exception
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("logging", new LoggingHandler(LogLevel.DEBUG));
if (NetServer.this.connectionIdleTimeInSeconds > 0)
pipeline.addLast("idleStateHandler", new IdleStateHandler(NetServer.this.connectionIdleTimeInSeconds, 0, 0));


pipeline.addLast(new ChannelHandler[]new ProtocolCodecFilter(NetServer.this.instance.getName(), NetServer.this._commandSet, NetServer.this._commandCodecMode, NetServer.this._serverMode));
pipeline.addLast(new ChannelHandler[]new NettyServerHandler(NetServer.this.instance));

);
this.bindConnectionOptions(this.bootstrap);
ChannelFuture future = this.bootstrap.bind(new InetSocketAddress(this._port));
future.addListener((channelFuture) ->
if (channelFuture.isSuccess())
logger.info("NetServer has stared on port:" + this._port);
else
logger.info("NetServer bind port has error:" + this._port);


);
future.sync();
catch (Exception var2)
logger.error("server:【】 has error on port :", new Object[]this.getName(), var2.getMessage(), this._port);
throw var2;



private void bindConnectionOptions(ServerBootstrap bootstrap)
((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)bootstrap.option(ChannelOption.SO_BACKLOG, 1024)).option(EpollChannelOption.SO_REUSEADDR, true)).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)).option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator())).option(ChannelOption.WRITE_BUFFER_WATER_MARK, WriteBufferWaterMark.DEFAULT)).option(ChannelOption.SO_RCVBUF, 1024)).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_LINGER, 0).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);


public void stop() throws Exception
if (this.bootstrap != null)
this.closeConnections();
if (this.bossGroup != null)
this.bossGroup.shutdownGracefully();


if (this.workGroup != null)
this.workGroup.shutdownGracefully();


logger.info("NetServer has stopped!");



...

从代码中我们知道NetService是底层传输的实现,而且,还可以发现,使用了netty框架作为异步NIO框架。也就是说,没有比这里的代码更加底层的东西了。现在我们需要分析的是,一个消息到底是怎么从A服务传到中心服务,并得到返回的响应的。让我们把视线聚焦到

pipeline.addLast(new ChannelHandler[]new ProtocolCodecFilter(NetServer.this.instance.getName(), NetServer.this._commandSet, NetServer.this._commandCodecMode, NetServer.this._serverMode));

这是上面的初始化netty的时候的一个步骤。可以从字面上看出这是加了一个过滤器。

package com.xiaoyao.game.net.framework.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.List;

public class ProtocolCodecFilter extends ByteToMessageCodec
public static final int COMMAND_HEADER_BYTES = 2;
public static final int MAX_COMMAND_DECODE_BYTES = 65536;
MessageToByteEncoder encoder;
ByteToMessageDecoder decoder;

public ProtocolCodecFilter(String serverName, CommandSet commandSet, String commandCodecMode, String serverMode)
this.encoder = new CommandEncoder(serverName, commandSet, serverMode);
this.decoder = new CommandDecoder(serverName, commandSet, serverMode);


protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception
((CommandEncoder)this.encoder).encode(channelHandlerContext, (NetCommand)o, byteBuf);


protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception
((CommandDecoder)this.decoder).decode(channelHandlerContext, byteBuf, list);

这是这个过滤器的定义。过滤器中注入了一个Encoder和Decoder。

package com.xiaoyao.game.net.framework.codec;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandEncoder extends MessageToByteEncoder<NetCommand>
private static final Logger logger = LoggerFactory.getLogger(CommandEncoder.class);
private CommandSet _commandSet;
private String serverName;
private String serverMode;

public CommandEncoder(String serverName, CommandSet cmdSet, String serverMode)
this._commandSet = cmdSet;
this.serverMode = serverMode;
this.serverName = serverName;


protected void encode(ChannelHandlerContext channelHandlerContext, NetCommand cmd, ByteBuf out) throws Exception
byte[] bytes = cmd.toBytes();
int cmdcode = cmd.getCode();
if (cmdcode == -1)
logger.error("[" + this.serverName + "] : CommandEncoder.encode msg has not addCommand to CommandSet:" + cmd.getClass().toString());
channelHandlerContext.close();
else
int length = bytes.length;
ByteBuf buf = Unpooled.buffer(2 + length);
buf.writeInt(2 + length);
buf.writeShort(cmdcode);
buf.writeBytes(bytes);
out.writeBytes(buf);


来看看encoder中的encode方法,可以发现,这里就是底层传输协议定义的地方了:

int length = bytes.length;
ByteBuf buf = Unpooled.buffer(2 + length);
buf.writeInt(2 + length); // 算上消息头的消息的长度
buf.writeShort(cmdcode);// 消息:消息头
buf.writeBytes(bytes); // 消息:消息体
out.writeBytes(buf);

来看看decoder

package com.xiaoyao.game.net.framework.codec;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandDecoder extends ByteToMessageDecoder
private static final Logger logger = LoggerFactory.getLogger(CommandDecoder.class);
private CommandSet _commandSet;
private String serverMode;
private String serverName;

public CommandDecoder(String serverName, CommandSet cmdSet, String serverMode)
this._commandSet = cmdSet;
this.serverMode = serverMode;
this.serverName = serverName;


protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception
NetCommand cmd = null;

try
cmd = this.readOneCommand(byteBuf, channelHandlerContext);
catch (Exception var6)
logger.error("[" + this.serverName + "] : " + var6.toString());


if (cmd != null)
list.add(cmd);



private NetCommand readOneCommand(ByteBuf cmdDataBuf, ChannelHandlerContext context) throws Exception
if (!this.isCommandDataReady(cmdDataBuf, context))
return null;
else
int cmdLen = cmdDataBuf.readInt();
int cmdCode = cmdDataBuf.readUnsignedShort();
int bodyLen = cmdLen - 2;
ByteBuf buf = Unpooled.buffer(bodyLen);
cmdDataBuf.readBytes(buf);
NetCommand cmd;
UnknownCommand unknownCommand;
if ("server".equals(this.serverMode))
if (!this._commandSet.isExitMessage(cmdCode))
System.out.println("[" + this.serverName + "] : =====command read error, invalid command code: 0x" + Integer.toHexString(cmdCode));
unknownCommand = new UnknownCommand();
return unknownCommand;


cmd = new NetCommand(cmdCode);

try
cmd.setBody(this._commandSet.parseMessage(cmdCode, buf.array()));
catch (Exception var10)
System.out.println("[" + this.serverName + "] : cmdcode:0x" + Integer.toHexString(cmdCode) + " paresform error: " + var10.toString());
unknownCommand = new UnknownCommand();
return unknownCommand;

else
cmd = this._commandSet.newNetCommandClass(cmdCode);
if (cmd == null)
System.out.println("[" + this.serverName + "] : =====command read error, invalid command code: 0x" + Integer.toHexString(cmdCode));
unknownCommand = new UnknownCommand();
return unknownCommand;


try
cmd.parseFrom(buf.array());
catch (Exception var9)
System.out.println("[" + this.serverName + "] : cmdcode:0x" + Integer.toHexString(cmdCode) + " paresform error: " + var9.toString());
unknownCommand = new UnknownCommand();
return unknownCommand;



return cmd;



protected boolean isCommandDataReady(ByteBuf cmdDataBuf, ChannelHandlerContext channelHandlerContext)
cmdDataBuf.markReaderIndex();

int cmdLen;
try
cmdLen = cmdDataBuf.readInt();
catch (Exception var5)
cmdDataBuf.resetReaderIndex();
return false;


if (cmdLen >= 2 && cmdLen <= 65536)
boolean isReady = cmdLen > 0 && cmdDataBuf.readableBytes() >= cmdLen;
cmdDataBuf.resetReaderIndex();
return isReady;
else
channelHandlerContext.close();
System.out.println("[" + this.serverName + "] : MessageDecoder.isCommandDataReady command exceeds limit:" + cmdLen + ", close:");
return false;


如果对netty有了解的话,可以发现这俩操作是处理netty里面的黏包(数据包)问题。这样一来,对于要传输的数据,netty可以将头和体放到一起传输,然后取出来。
来看看 ProtocolCodecFilter 的 decode和encode方法在哪里被调用过了。其实是自动调用的。NetServer看完了,再回到 RPCServer。

public class RPCServer implements ClientConnectionListenerFactory 

RPCServer实现的接口,来看看

public interface ClientConnectionListenerFactory 
ClientConnectionListener createListener(ClientConnection var1);

ClientConnection

package com.xiaoyao.game.net.framework.server;

import com.xiaoyao.game.net.framework.codec.NetCommand;

public interface ClientConnection
void sendCommand(NetCommand var1);

void close(boolean var1);

String getClientIP();

boolean isConnected();

这是一个接口,里面有一个sendCommand.
再根据RPCServer中的

public ClientConnectionListener createListener(ClientConnection conn) 
return new RPCListener(conn);

来看看RPCListener

package com.xiaoyao.game.rpc.server;

import com.xiaoyao.game.net.framework.codec.NetCommand;
import com.xiaoyao.game.net.framework.codec.UnknownCommand;
import com.xiaoyao.game.net.framework.server.ClientConnection;
import com.xiaoyao.game.net.framework.server.ClientConnectionListener;
import com.xiaoyao.game.rpc.remote.RPCRequest;
import com.xiaoyao.game.rpc.remote.RPCResponse;

public class RPCListener implements ClientConnectionListener
private ClientConnection _conn;

public RPCListener(ClientConnection conn)
this._conn = conn;


public void onCommand(NetCommand cmd)
try
RPCRequest request = (RPCRequest)cmd;
RPCResponse response = (RPCResponse)request.getResponseClass().newInstance();
response.handleRequest(request);
this._conn.sendCommand(response);
catch (InstantiationException var4)
var4.printStackTrace();
catch (IllegalAccessException var5)
var5.printStackTrace();
catch (Exception var6)
var6.printStackTrace();




public void onUnknowCommand(UnknownCommand cmd)


public void onDisconnected(boolean graceful)
System.out.println("disconnected " + graceful);


public void onIdle(int idleCount)

我们来到netty框架那里,在我们使用Netty框架的时候,start的时候需要定义一些参数,同时,也可以对接收的东西进行过滤。

// NettyServerHandler.java
public class NettyServerHandler extends ChannelInboundHandlerAdapter
public void channelRead(ChannelHandlerContext ctx, Object msg)
this.idleCount = 0;

try
NetCommand cmd = (NetCommand)msg;
if (cmd instanceof UnknownCommand)
this.server.getClientSession(ctx).onUnknowCommand((UnknownCommand)cmd);
else
this.server.getClientSession(ctx).onCommand(cmd);

catch (Exception var4)
logger.debug("[" + this.server.getName() + "] : onCommand: Exception in server messageReceived: " + var4.toString());
var4.printStackTrace();



可以看到,这里就是我们找了很久的分发中心了。自己如果要写一个rpc的话,只需要按照上面的内容倒着写就可以了


以上是关于用 java 简单实现 rpc 通信的主要内容,如果未能解决你的问题,请参考以下文章

Java实现简单的RPC框架

Java实现简单的RPC框架

Java实现简单RPC框架

Java 实现简单的RPC框架

教你用 Netty 实现一个简单的 RPC!

RPC 和 HTTP