Netty 实现聊天功能
Posted Java面试通关手册
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty 实现聊天功能相关的知识,希望对你有一定的参考价值。
对Netty不了解可以查看我的上一篇文章:
例子来源:https://waylau.com/ (推荐老卫的网站,感觉挺不错),另外点击阅读原文即可查看原文。
Netty 实现聊天功能
环境要求:
JDK 7+
Maven 3.2.x
Netty 4.x
开始编码:
让我们从 handler (处理器)的实现开始,handler 是由 Netty 生成用来处理 I/O 事件的。
SimpleChatServerHandler.java
/**
*服务端 channel
*SimpleChannelInboundHandler实现了 ChannelInboundHandler 接口,
*ChannelInboundHandler 提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承 SimpleChannelInboundHandler 类而不是你自己去实现接口方法。
*/
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)
/**
* A thread-safe Set Using ChannelGroup, you can categorize Channels into a
* meaningful group. A closed Channel is automatically removed from the
* collection,
*/
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 每当从服务端收到新的客户端连接时,客户端的 Channel 存入 ChannelGroup 列表中,并通知列表中的其他客户端 Channel
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
channels.add(ctx.channel());
}
/**
* 每当从服务端收到客户端断开时,客户端的 Channel 自动从 ChannelGroup 列表中移除了,并通知列表中的其他客户端 Channel
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
//将消息广播到多个Channel
channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");
// 一个关闭的Channel将自动从ChannelGroup中移除,
// so there is no need to do "channels.remove(ctx.channel());"
}
/**
* 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。
* 如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
if (channel != incoming) {
channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
} else {
channel.writeAndFlush("[you]" + s + "\n");
}
}
}
/**
* 服务端监听到客户端活动
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "在线");
}
/**
* 服务端监听到客户端不活动
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "掉线");
}
/**
* exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,
* 即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时。
* 在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。
* 然而这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
SimpleChatServerInitializer 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleChatServerHandler 等。
SimpleChatServerInitializer.java
/**
* 服务端 ChannelInitializer
* 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleChatServerHandler 等。
*/
public class SimpleChatServerInitializer extends
ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatServerHandler());
System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上");
}
}
编写一个 main() 方法来启动服务端。
SimpleChatServer.java
/**
* 简单聊天服务器-服务端
*/
public class SimpleChatServer {
private int port;
public SimpleChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
//在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。
//第一个经常被叫做‘boss’,用来接收进来的连接。
//第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//启动 NIO 服务的辅助启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//用于处理ServerChannel和Channel的所有事件和IO。
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.childHandler(new SimpleChatServerInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
System.out.println("SimpleChatServer 启动了");
// 绑定端口,开始接收进来的连接
ChannelFuture f = serverBootstrap.bind(port).sync(); // (7)
// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("SimpleChatServer 关闭了");
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new SimpleChatServer(port).run();
}
}
客户端的处理类比较简单,只需要将读到的信息打印出来即可
SimpleChatClientHandler.java
/**
*客户端 channel
*客户端的处理类比较简单,只需要将读到的信息打印出来即可
*/
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(s);
}
}
与服务端类似
SimpleChatClientInitializer.java
/**
* 客户端 ChannelInitializer
*/
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatClientHandler());
}
}
编写一个 main() 方法来启动客户端
SimpleChatClient.java
/**
* 简单聊天服务器-客户端
*/
public class SimpleChatClient {
public static void main(String[] args) throws Exception{
new SimpleChatClient("localhost", 8080).run();
}
private final String host;
private final int port;
public SimpleChatClient(String host, int port){
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChatClientInitializer());
Channel channel = bootstrap.connect(host, port).sync().channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while(true){
channel.writeAndFlush(in.readLine() + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
效果:
以上是关于Netty 实现聊天功能的主要内容,如果未能解决你的问题,请参考以下文章