一个基于TCP协议的Socket通信实例
Posted 点融黑帮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个基于TCP协议的Socket通信实例相关的知识,希望对你有一定的参考价值。
1. 前言
一般接口对接多以http/https或webservice的方式,socket方式的对接比较少并且会有一些难度。正好前段时间完成了一个socket的接口的对接需求,现将实现的思路做一个整理。
2. 需求概述
2.1 需要提供一个socket服务端,实时接收三方传递过来的数据
2.2 实时报文规范说明
2.2.1 通讯及接口格式说明
通讯方式:
通讯采用 TCP 协议, SOCKET 同步短连接方式。
报文结构:
报文为不定长报文,以定长报文头+不定长报文体的方式
报文基本结构如下图所示:
报文长度 |
报文体 |
6位交易报文长度+交易报文。其中 6 位交易报文长度以 ASCII 码字符串方式表示(6 个字节),右对齐,左补 0,不包括自身的长度,表示的是报文体的长度。如“000036fbced3fe-7025-4b5c-9cef-2421cd981f39”, 000036 为长度,“fbced3fe-7025-4b5c-9cef-2421cd981f39”为报文内容。
报文结构符合 XML 标准的报文格式,报文以无 BOM 格式的 GBK 编码。报文根节点为 Transaction节点。除非报文里有特殊说明,报文定义的字段都是 Transaction 节点的子节点。报文格式参考下节示例。
2.2.2 报文示例
请求:
000410<?xml version="1.0" encoding="GBK"?><req:request xmlns:req="http://chimera.intele.com/gw/xsd/SMSGateway/Request/2013/02"><serviceId>29</serviceId><username>gre</username><password>erg</password><message><recipient>+4741414141</recipient><content>test</content><price>0</price><settings><sendWindow><startDate>2018-06-15</startDate><startTime>16:15:00</startTime></sendWindow></settings></message></req:request>
响应:
000683<?xml version="1.0" encoding="GBK"?><rsl:result xmlns:rsl=http://chimera.intele.com/gw/xsd/TCPGateway/Result/2015/10 xmlns:sms=http://chimera.intele.com/gw/xsd/SMSGateway/Response/2013/02 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><status><code>1</code><description>OK</description></status><sms:response><batchReference>0c2c002f-ccc6-4c7b-86e1-c7871b1c98b3</batchReference><messageStatus><statusCode>1</statusCode><statusMessage>Message enqueued for sending</statusMessage><clientReference>SMS-AFFS-000000100</clientReference><recipient>+4741915590</recipient><messageId>6y06b02hdo00</messageId><sequenceIndex>1</sequenceIndex></messageStatus></sms:response></rsl:result>
3 代码实现
3.1 BIO 阻塞模式
简单的描述一下BIO的服务端通信模型:采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端分配一个线程进行业务逻辑处理,通过输出流返回应答给客户端,线程销毁。即典型的请求应答模型。
传统BIO通信模型图(此图来源于网络)
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系, Java中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终崩溃。
但是这种模式在一些特定的应用场景下效果是最好的,比如只有少量的TCP连接通信,且双方都非常快速的传输数据,此时这种模式的性能最好,实现比较简单。
实现代码如下:
3.1.1 服务端同步阻塞模式的:
import java.io.*;
import java.net.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import javax.annotation.PostConstruct;
public class TCPBlockServer {
// 服务IP
private final String SERVER_IP = "127.0.0.1";
// 服务端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final String CHARSET_NAME = "GBK";
@PostConstruct
public void start() throws Exception {
System.out.println("server Socket 启动 。。。。。。。");
// 这里使用了Java的自动关闭的语法
try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {
while (true) {
Socket socket = serverSocket.accept() ;
new Thread(()->handler(socket)).start();
}
}
}
private void handler(Socket socket2) {
String msg = null;
try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {
msg = receiveMsg(input, socket);
System.out.println("msg:" + msg);
doBusinessLogic(msg,out);
} catch (Exception e) {
e.printStackTrace();
}
}
// 处理业务逻辑
private void doBusinessLogic(String msg,OutputStream out) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
out.write(msg.getBytes(CHARSET_NAME));
out.flush();
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
private String receiveMsg(InputStream input, Socket socket) throws Exception {
byte[] lengthBytes = new byte[6];
int count = input.read(lengthBytes);
int length = Integer.valueOf(new String(lengthBytes));
byte[] buffer = new byte[length + 2];
int readBytes = 0;
while (readBytes < length) {
count = input.read(buffer, readBytes, length - readBytes);
if (count == -1) {
break;
}
readBytes += count;
}
return new String(buffer, Charset.forName("GBK"));
}
public static void main(String[] args) throws Exception {
TCPBlockServer server = new TCPBlockServer();
server.start();
}
}
3.1.2 服务端伪异步I/O模型:
上面实现方面存在的一些不足之处:
1:服务器创建和销毁工作线程的开销很大。如果服务器需要和许多客户通信,并且与每个客户的通信时间都很短,那么有可能服务器为客户创建新线程的开销比实际与客户通信的开销还大。
2:除了创建和销毁线程的开销之外,活动的线程也消耗系统资源。并且每个线程本身也会占用一定的内存(每个线程大约需要1MB内存),如果同时有大量客户连接到服务器,就必须创建大量的工作线程,他们会消耗大量内存,可能会导致系统内存不足,应用产生OOM的错误。
3:如果线程数目固定,并且每个线程都有很长的生命周期,那么线程切换也是相对固定的。不同的操作系统有不同的切换周期,一般在20毫秒左右。这里所说的线程切换是指Java虚拟机,以及底层操作系统的调度下,线程之间转让CPU的使用权。如果频繁创建和销毁线程,那么将导致频繁的切换线程,因为一个线程被销毁后,必然要把CPU转移给另外一个已经就绪的线程,是该线程获得运行机会。这种情况下,线程间的切换不再遵循系统的固定切换周期,切换线程的开销甚至比创建及销毁的开销还大。
为了改进客户端访问就会创建线程的场景,改为由一个线程池去管理固定数量的线程来执行客户所需业务逻辑。实现线程池线程和客户端 N(N>= 1): M的关系。如下图所示:
相关实现代码如下,根据实际场景需要设置线程池中合适的线程数量:
import java.io.*;
import java.net.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.concurrent.*;
import javax.annotation.PostConstruct;
public class TCPBlockThreadPoolServer {
// 服务IP
private final String SERVER_IP = "127.0.0.1";
// 服务端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final int THREADS = 150 ;
private final String CHARSET_NAME = "GBK";
private ExecutorService executorService ;
@PostConstruct
public void start() throws Exception {
System.out.println("server Socket 启动 。。。。。。。");
executorService = Executors.newFixedThreadPool(THREADS) ;
// 这里使用了Java的自动关闭的语法
try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {
while (true) {
Socket socket = serverSocket.accept() ;
executorService.execute(()->handler(socket));
}
}
}
private void handler(Socket socket2) {
String msg = null;
try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {
msg = receiveMsg(input, socket);
System.out.println("msg:" + msg);
doBusinessLogic(msg,out);
} catch (Exception e) {
e.printStackTrace();
}
}
// 处理业务逻辑
private void doBusinessLogic(String msg,OutputStream out) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
out.write(msg.getBytes(CHARSET_NAME));
out.flush();
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
private String receiveMsg(InputStream input, Socket socket) throws Exception {
byte[] lengthBytes = new byte[6];
int count = input.read(lengthBytes);
int length = Integer.valueOf(new String(lengthBytes));
byte[] buffer = new byte[length + 2];
int readBytes = 0;
while (readBytes < length) {
count = input.read(buffer, readBytes, length - readBytes);
if (count == -1) {
break;
}
readBytes += count;
}
return new String(buffer, Charset.forName("GBK"));
}
public static void main(String[] args) throws Exception {
TCPBlockServer server = new TCPBlockServer();
server.start();
}
}
3.1.3 客户端
简单的客户端实现如下:
import java.io.*;
import java.net.Socket;
import java.nio.charset.Charset;
import org.apache.commons.lang3.StringUtils;
public class Client {
public String sendAndRecv(String content, String charsetName,String ip,int port) throws Exception {
try(Socket socket = new Socket(ip,port)){
socket.setTcpNoDelay(true);
socket.setKeepAlive(true);
socket.setSoTimeout(60000);
try(OutputStream output = socket.getOutputStream();InputStream input = socket.getInputStream()){
output.write(content.getBytes(charsetName));
output.flush();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(input, Charset.forName("GBK")));
StringBuffer buffer = new StringBuffer();
String message = null ;
while((message = bufferedReader.readLine()) != null){
buffer.append(message);
}
return StringUtils.substring(buffer.toString(), 6);
}
}
}
}
3.2 NIO 模式
相对于BIO(阻塞通信)模型来说,NIO模型非常复杂,以至于花费很大的精力去学习也不太容易能够精通,难以编写出一个没有缺陷,高效且适应各种意外情况的稳定的NIO通信模块。之所以有这样的问题,是因为NIO编程不是单纯的一个技术点,而是涵盖了一系列的相关技术、专业知识、编程经验和编程技巧的复杂工程,所以精通这些技术相当有难度。
和BIO相比NIO有如下几个新的概念:
1. 通道(Channel)
Channel对应BIO中Stream的模型,到任何目的地(或来自任何地方)的所有数据都必须通过一个Channel对象。但是Channel和Stream不同的地方在于,Channel是双向的而Stream是单向的(分为InputStream和OutputStream),所以Channel可以用于读/写,或同时用于读写。
2. 缓冲区(Buffer)
虽然Channel用于读写数据,但是我们不能直接操作Channel进行读写,必须通过缓冲区来完成(Buffer)。NIO设计了一个全新的数据结构Buffer,具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer等。
Buffer中有3个重要的参数:位置(position)、容量(capactiy)和上限(limit)
参数 |
写模式 |
读模式 |
位置(position) |
当前缓冲区的位置,将从position的下一个位置写数据 |
当前缓存区读取的位置,将从此位置后读取数据。 |
容量(capacity) |
缓存区总容量的上限 |
缓存区总容量的上限 |
上限(limit) |
缓存区实际上限,它总是小于等于容量。通常情况下和容量相等 |
代表可读取的总容量,和上次写入的容量相等。 |
3. 选择器(Selector)
Selector 可以同时检测多个Channel的事件以实现异步I/O,我们可以将感兴趣的事件注册到Selector上面,当事件发生时可以通过Selector获取事件发生的Channel,并进行相关的事件处理操作。一个Selector可以同时轮询多个Channel。
3.2.1 服务端
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.Iterator;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TCPNioserver {
// 服务IP
private final String SERVER_IP = "127.0.0.1";
// 服务端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final String CHARSET_NAME = "GBK";
private Selector selector;
public TCPNioServer() throws Exception {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置通道为非阻塞
serverChannel.configureBlocking(false);
// 将该通道所对应的serverSocket绑定到指定的ip和port端口
InetAddress inetAddress = InetAddress.getByName(SERVER_IP);
serverChannel.socket().bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);
// 获得一个通道管理器(选择器)
selector = Selector.open();
/*
* 将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件
* 注册该事件后,当事件到达的时候,selector.select()会返回, 如果事件没有到达selector.select()会一直阻塞
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
/**
* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,进行处理
*/
@PostConstruct
public void start() throws Exception {
log.info("==start server ip {} , port {}. ==", SERVER_IP, SERVER_PORT);
while (true) {
selector.select();//此方法会阻塞,直到至少有一个以注册的事件被触发
//获取发生事件的SelectionKey集合
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
try {
SelectionKey selectedKey = iterator.next();
if (selectedKey.isValid()) { // 如果key的状态是有效的
if (selectedKey.isAcceptable()) { //如key是阻塞状态,调用accept()方法
accept(selectedKey);
}
if (selectedKey.isReadable()) { //如key是可读状态,调用handle()方法
handle(selectedKey);
}
}
} catch (Exception e) {
iterator.remove();
} finally {
iterator.remove();//从集合中移除,避免重复处理
}
}
}
}
private void accept(SelectionKey key) throws IOException {
// 1 获取服务器通道
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 2 执行阻塞方法
SocketChannel chennel = server.accept();
// 3 设置阻塞模式为非阻塞
chennel.configureBlocking(false);
// 4 注册到多路复用选择器上,并设置读取标识
chennel.register(selector, SelectionKey.OP_READ);
}
private void handle(SelectionKey key) throws Exception {
// 获取之前注册的SocketChannel通道
try (SocketChannel channel = (SocketChannel) key.channel()) {
int length = getMsgLength(key, channel);
String msg = recvMsg(key, channel, length);
System.out.println("Server:" + msg);
doBusinessLogic(msg, channel);
}
}
private byte[] read(SelectionKey key, SocketChannel channel,int capacity) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(capacity);
channel.read(buffer);
// 将channel中的数据放入buffer中
int count = channel.read(buffer);
if (count == -1) { // == -1表示通道中没有数据
key.channel().close();
key.cancel();
return null;
}
// 读取到了数据,将buffer的position复位到0
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
// 将buffer中的数据写入byte[]中
buffer.get(bytes);
return bytes ;
}
private int getMsgLength(SelectionKey key, SocketChannel channel) throws Exception {
byte[] bytes = this.read(key, channel, 6) ;
String length = new String(bytes, CHARSET_NAME);
return new Integer(length);
}
private String recvMsg(SelectionKey key, SocketChannel channel,int msgLength) throws Exception{
byte[] bytes = this.read(key, channel, msgLength) ;
return new String(bytes, CHARSET_NAME);
}
// 处理业务逻辑
private void doBusinessLogic(String msg, SocketChannel channel) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes(CHARSET_NAME));
channel.write(outBuffer);
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
public static void main(String[] args) throws Exception {
TCPNioServer server = new TCPNioServer();
server.start();
}
}
3.3 AIO模式
与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。 即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。 在JDK1.7中,这部分内容被称作NIO2,主要在java.nio.channels包下增加了下面四个异步通道:
AsynchronousSocketChannel
对应BIO中的ServerSocket和NIO中的ServerSocketChannel,用于server端网络程序
AsynchronousServerSocketChannel
对应BIO中的Socket和NIO中的SocketChannel,用于client端网络应用
AsynchronousFileChannel
AsynchronousDatagramChannel
异步channel API提供了两种方式监控/控制异步操作(connect,accept, read,write等)。
第一种方式是返回java.util.concurrent.Future对象, 检查Future的状态可以得到操作是完成还是失败,还是进行中, future.get会阻塞当前进程。
第二种方式为操作提供一个回调参数java.nio.channels.CompletionHandler,这个回调类包含completed,failed两个方法。channel的每个I/O操作都为这两种方式提供了相应的方法, 你可以根据自己的需要选择合适的方式编程。
下面的例子中在accept和read方法中使用了回调CompletionHandler的方式,而发送数据(write)使用了future的方式,当然write也可以采用回调CompletionHandler的方式。因为CompletionHandler是完全异步的,所以需要在mian方法中使用一个 while循环确保程序不退出,或者也可以在start方法的最后使用channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
3.3.1 服务端
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TCPAioServer {
// 服务IP
private final String SERVER_IP = "127.0.0.1";
// 服务端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final String CHARSET_NAME = "GBK";
private ExecutorService executorService;
private AsynchronousChannelGroup channelGroup;
private AsynchronousServerSocketChannel serverSocketChannel;
public void start() throws IOException, Exception {
// 创建线程池
executorService = Executors.newCachedThreadPool();
// 创建线程组
channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
// 创建服务器通道
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
InetAddress inetAddress = InetAddress.getByName(SERVER_IP);
serverSocketChannel.bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);
log.info("server start, ip: {} , port:{}", SERVER_IP, SERVER_PORT);
serverSocketChannel.accept(this, new ServerCompletionHandler());
//channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, TCPAioServer> {
@Override
public void completed(AsynchronousSocketChannel channel, TCPAioServer attachment) {
try {
handle(channel);
} finally {
// 当有下一个客户端接入的时候,直接调用Server的accept方法,这样反复执行下去,保证多个客户端都可以阻塞
serverSocketChannel.accept(attachment, this);
}
}
private void handle(AsynchronousSocketChannel channel) {
ByteBuffer buffer = allocateByteBuffer(channel);
channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String msg = null;
try {
msg = new String(attachment.array(), CHARSET_NAME);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
log.info("Server 收到客户端发送的数据为:{}", msg);
doBusinessLogic(msg, channel);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
private ByteBuffer allocateByteBuffer(AsynchronousSocketChannel channel) {
ByteBuffer buffer = ByteBuffer.allocate(6);
try {
channel.read(buffer).get(1000, TimeUnit.SECONDS);
// 读取到了数据,将buffer的position复位到0
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
// 将buffer中的数据写入byte[]中
buffer.get(bytes);
String length = new String(bytes, CHARSET_NAME);
buffer = ByteBuffer.allocate(new Integer(length));
} catch (InterruptedException | ExecutionException | TimeoutException | UnsupportedEncodingException e1) {
e1.printStackTrace();
}
return buffer;
}
// 处理业务逻辑
private void doBusinessLogic(String msg, AsynchronousSocketChannel result) {
try (AsynchronousSocketChannel channel = result) {
msg = formatMsg(msg);
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
ByteBuffer buffer = ByteBuffer.allocate(bodyBytes.length);
buffer.put(bodyBytes);
buffer.flip();
channel.write(buffer).get();
} catch (Exception e) {
e.printStackTrace();
}
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
@Override
public void failed(Throwable exc, TCPAioServer attachment) {
exc.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
TCPAioServer server = new TCPAioServer();
server.start();
while (true) {
Thread.sleep(1000);
}
}
}
目前Linux上的AIO实现主要有两种:Posix AIO 与Kernel Native AIO,前者是用户态实现的,而后者是内核态实现的。所以Kernel Native AIO的性能和前景要好于他的前辈Posix AIO,比较有名的的软件如nginx,mysql等在高版本中都有支持Kernel Native AIO,但是只应用在少部分功能中。因为当下Linux的AIO实现还不是很完美,充斥着各种Bug,并且AIO Socket 还并非真正的异步I/O机制,使用AIO所带来的性能提升也不太明显,稳定性并非十分可靠,如是Kernel Native AIO引起的问题,解决的难度会非常大。但是AIO是未来的发展方向,需要我们持续的关注。
3.4 开源框架Netty实现的Socket服务
Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源软件也基于Netty的NIO框架构建,如Spark、RocketMQ、Dubbo、Elasticsearch等等。
Netty的优点
1、API使用简单,有丰富的例子,开发门槛低。
2、功能强大,预置了多种编解码功能,支持多种主流协议。
3、定制功能强,可以通过ChannelHandler对通信框架进行灵活的扩展。
4、性能高,通过与其他业界主流的NIO框架对比,Netty综合性能最优。
5、成熟、稳定,Netty修复了已经发现的NIO所有BUG。
6、社区活跃。
7、经历了很多商用项目的考验。
3.4.1 服务端(Netty4.X)
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.*;
import io.netty.handler.logging.*;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettySocketServer {
private final String CHARSET_NAME = "GBK";
private final int bosscount = 2;
private final int workerCount = 8;
private final int tcpPort = 8888;
private final int backlog = 100;
private final int receiveBufferSize = 1048576;
private ServerBootstrap serverBootstrap;
private ChannelFuture serverChannelFuture;
public NamedThreadFactory bossThreadFactory() {
return new NamedThreadFactory("Server-Worker");
}
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bosscount, bossThreadFactory());
}
public NamedThreadFactory workerThreadFactory() {
return new NamedThreadFactory("Server-Worker");
}
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount, workerThreadFactory());
}
public ServerBootstrap bootstrap() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup(), workerGroup()).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, backlog)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("logging", new LoggingHandler(LogLevel.ERROR))
.addLast("stringEncoder", new StringEncoder(Charset.forName("GBK")))
.addLast("frameDecoder", new MsgLengthFieldBasedFrameDecoder(receiveBufferSize, 0, 6, 0, 6))
.addLast("stringDecoder", new StringDecoder(Charset.forName("GBK")))
.addLast("messageHandler", new ServerMessageHandler());
}
});
return bootstrap;
}
@PostConstruct
public void start() throws Exception {
serverBootstrap = bootstrap();
serverChannelFuture = serverBootstrap.bind(tcpPort).sync();
log.info("Starting server at tcpPort {}" , tcpPort);
}
@PreDestroy
public void stop() throws Exception {
serverChannelFuture.channel().closeFuture().sync();
}
static class NamedThreadFactory implements ThreadFactory {
public static AtomicInteger counter = new AtomicInteger(1);
private String name = this.getClass().getName();
private boolean deamon ;//守护线程
private int priority ; //线程优先级
public NamedThreadFactory(String name){
this(name, false);
}
public NamedThreadFactory(String name,boolean deamon){
this(name, deamon, -1);
}
public NamedThreadFactory(String name,boolean deamon,int priority){
this.name = name ;
this.deamon = deamon ;
this.priority = priority ;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,name+"["+counter.getAndIncrement()+"]");
thread.setDaemon(deamon);
if(priority != -1){
thread.setPriority(priority);
}
return thread;
}
}
//拆包
class MsgLengthFieldBasedFrameDecoder extends LengthFieldBasedFrameDecoder {
/**
* @param maxFrameLength 解码时,处理每个帧数据的最大长度
* @param lengthFieldOffset 该帧数据中,存放该帧数据的长度的数据的起始位置
* @param lengthFieldLength 记录该帧数据长度的字段本身的长度
* @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数
* @param initialBytesToStrip解析的时候需要跳过的字节数
*/
public MsgLengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
@Override
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
if(length == 6){
buf = buf.order(order);
byte[] lengthBytes = new byte[6];
buf.readBytes(lengthBytes);
buf.resetReaderIndex();
return Integer.valueOf(new String(lengthBytes));
} else {
return super.getUnadjustedFrameLength(buf, offset, length, order);
}
}
}
class ServerMessageHandler extends ChannelInboundHandlerAdapter {
/**
* 功能:读取服务器发送过来的信息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof String) {
try {
doBusinessLogic(ctx,(String)msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
// 处理业务逻辑
private void doBusinessLogic(ChannelHandlerContext ctx,String msg) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
ctx.channel().writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public static void main(String[] args) throws Exception{
NettySocketServer server = new NettySocketServer();
server.start();
}
}
总结
同步阻塞IO |
伪异步IO |
非阻塞IO |
异步IO |
Netty的非阻塞IO |
|
客户端:服务端 |
1:1 |
N:M(M>=1) |
N:M(M>=1,单线程非阻塞,多线程非阻塞) |
N:0(不需要启动额外的IO线程,被动回调) |
N:M(M>=1) |
IO类型 |
BIO |
BIO |
NIO |
AIO |
NIO |
API使用难度 |
简单 |
简单 |
非常复杂 |
复杂 |
简单 |
可靠性 |
相当差 |
差 |
高 |
高 |
高+ |
吞吐量 |
低 |
中 |
高 |
高 |
高+ |
并发 |
低 |
中 |
高 |
高 |
高+ |
▲http://www.ibm.com/developerworks/cn/linux/l-async/
▲http://openjdk.java.net/projects/nio/presentations/TS-4222.pdf
▲http://blog.csdn.net/anxpp/article/details/51512200
▲Netty权威指南
▲Asynchronous I/O Tricks and Tips
今日推荐
以上是关于一个基于TCP协议的Socket通信实例的主要内容,如果未能解决你的问题,请参考以下文章