刚学习netty和java,不会用,但现在急需,您有没有用netty编写的服务器和客户端的例子,要能互相发送包
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了刚学习netty和java,不会用,但现在急需,您有没有用netty编写的服务器和客户端的例子,要能互相发送包相关的知识,希望对你有一定的参考价值。
包的内容需是一个类,类里面再定义属性
(一)handler处理篇首先,是handler,初次接触netty的朋友要注意,handler不是一个单例.即每个channel下都会有自己的一个handler实例.
public class ServerHandler extends SimpleChannelUpstreamHandler
private static final Logger logger = Logger.getLogger(
ServerHandler.class.getName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception
if (e instanceof ChannelStateEvent)
logger.log(Level.INFO, "Channel state changed: 0", e);
super.handleUpstream(ctx, e);
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception
System.out.println(this);
String request = (String) e.getMessage();
//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
//服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
if (request.toLowerCase().equals("bye"))
ChannelFuture future = e.getChannel().write("bye\r\n");
future.addListener(ChannelFutureListener.CLOSE);
else
//以下是我初步解析客户端发送过来的数据,然后决定处理方式
RecevieData receivedaData = MessageDecoder.decode(request);
if (null != receivedaData)
//服务器第5版
if (VersionCode.V5.equals(receivedaData.getVersion()))
//然后判断命令是否存在
for (String s : CommandCode.COMMANDS)
if (s.equals(receivedaData.getActionType()))
COMMAND_FLAG.set(true);
if (s.equals(CommandCode.KEEP_ALIVE))
serverChannelGroup.addChannel(e.getChannel());
break;
else
COMMAND_FLAG.set(false);
if (COMMAND_FLAG.get())
COMMAND_FLAG.set(false);
//将这个命令传递给下一个handler来处理.
//这里的"下一个handler"即为用户自己定义的处理handler
ctx.sendUpstream(e);
else
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
else
//版本错误
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
else
//如果格式错误,那么直接返回
e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception
logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
在上面这个handler中,我使用了ctx.sendUpstream(e);来处理,个人觉得此处为了要实现执行运行时代码,也可以使用接口等方式.但既然netty提供了sendUpstream 的方法,我们用这个岂不是更方便^_^
下面是使用SSL连接的handler
public class ServerSSLHandler extends SimpleChannelUpstreamHandler
private static final Logger logger = Logger.getLogger(
ServerSSLHandler.class.getName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception
if (e instanceof ChannelStateEvent)
logger.log(Level.INFO, "Channel state changed: 0", e);
super.handleUpstream(ctx, e);
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception
//ssl握手
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
sslHandler.handshake();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception
System.out.println(this);
String request = (String) e.getMessage();
//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
//服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
if (request.toLowerCase().equals("bye"))
ChannelFuture future = e.getChannel().write("bye\r\n");
future.addListener(ChannelFutureListener.CLOSE);
else
//以下是我初步解析客户端发送过来的数据,然后决定处理方式
RecevieData receivedaData = MessageDecoder.decode(request);
if (null != receivedaData)
//服务器第5版
if (VersionCode.V5.equals(receivedaData.getVersion()))
//然后判断命令是否存在
for (String s : CommandCode.COMMANDS)
if (s.equals(receivedaData.getActionType()))
COMMAND_FLAG.set(true);
if (s.equals(CommandCode.KEEP_ALIVE))
serverChannelGroup.addChannel(e.getChannel());
break;
else
COMMAND_FLAG.set(false);
if (COMMAND_FLAG.get())
COMMAND_FLAG.set(false);
//将这个命令传递给下一个handler来处理.
//这里的"下一个handler"即为用户自己定义的处理handler
ctx.sendUpstream(e);
else
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
else
//版本错误
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
else
//如果格式错误,那么直接返回
e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception
logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
当我们有了2个handler后,当然就是要把他们添加到我们的Pipeline中
public class ServerPipelineFactory implements
ChannelPipelineFactory
public ChannelPipeline getPipeline()
ChannelPipeline pipeline = pipeline();
ServerConfig config = ServerConfig.getInstance();
try
if (config.ssl())
SSLEngine engine =
SecureSslContextFactory.getServerContext().createSSLEngine();
//说明是服务器端SslContext
engine.setUseClientMode(false);
pipeline.addLast("ssl", new SslHandler(engine));
//此Decoder可以自动解析一句以\r\n结束的命令,我为了方便,也用了这个Decoder
//使用这个Decoder,我不用刻意发送命令长度用于解析,只要没有收到\r\n说明数据还
//没有发送完毕.这个Decoder会等到收到\r\n后调用下个handler
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
//字串解码,可以自己设置charset
pipeline.addLast("decoder", new StringDecoder());
//字串编码,可以自己设置charset
pipeline.addLast("encoder", new StringEncoder());
if (config.ssl())
//如果开启了SSL,那么使用sslhandler
pipeline.addLast("sslhandler", new ServerSSLHandler());
else
//如果没有开启SSL,那么使用普通handler
pipeline.addLast("handler", new ServerHandler());
//遍历配置文件中的服务器handler,将其添加进Pipeline链中
for (Element e : config.handler())
pipeline.addLast(e.attribute(e.getQName("id")).getValue().trim(),
(ChannelHandler) Class.forName(e.attribute(e.getQName("class")).getValue().trim()).newInstance());
catch (DocumentException ex)
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
catch (InstantiationException ex)
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
catch (IllegalAccessException ex)
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
catch (ClassNotFoundException ex)
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
return pipeline;
server.xml,放到lib下即可,注意其中的handler 以及clienthandler 项,如果你新建了自己的handler,那么需要在此xml中配置一下.
<?xml version="1.0" encoding="UTF-8"?>
<root>
<!-- 配置主机地址 -->
<host>127.0.0.1</host>
<!-- 配置服务端口 -->
<port>8080</port>
<!-- 是否启用ssl,1为启用,0为停用 -->
<ssl>0</ssl>
<!--服务器业务handler -->
<handler id="timeHandler" class="com.chinatenet.nio.server.handler.ServerTimeHandler" />
<!--客户端业务handler -->
<clienthandler id="timeHandler" class="com.chinatenet.nio.client.handler.ClientTimeHandler" />
</root>
到此,一个简单的可扩展handler的服务器雏形就出来了
下面,我们添加一个自定义的服务器处理handler进来
public class ServerTimeHandler extends SimpleChannelUpstreamHandler
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
RecevieData receivedaData = MessageDecoder.decode((String) e.getMessage());
if (CommandCode.GET_TIME.equals(receivedaData.getActionType())
|| CommandCode.KEEP_ALIVE.equals(receivedaData.getActionType()))
if (VersionCode.V5.equals(receivedaData.getVersion()))
//回复客户端后,即可进行自己的业务.当然.这里可以根据需要,看
//是先回复再处理还是等处理结果出来后,将结果返回客户端
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.OK,
System.currentTimeMillis() / 1000 + ""));
else
//版本错误
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED,
StatusCode.VERSION_NOT_SUPPORTED_TXET));
else
//如果不是此handler处理的命令,那么流下去
ctx.sendUpstream(e);
最后测试一下
public class Server
public static void main(String[] args) throws DocumentException
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ServerPipelineFactory());
//因为我要用到长连接
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
ServerConfig config = ServerConfig.getInstance();
bootstrap.bind(new InetSocketAddress(Integer.valueOf(config.port())));
总结:在整个服务器编码中,刚开始会遇到"远程主机强迫关闭了一个现有的连接。"等这类错误,最后修改成"相互告知对方我要关闭了"再进行关闭就可以了. 参考技术A 不好意思,没有哦
以上是关于刚学习netty和java,不会用,但现在急需,您有没有用netty编写的服务器和客户端的例子,要能互相发送包的主要内容,如果未能解决你的问题,请参考以下文章