netty的一个小Demo-有ack和重连的功能
Posted weiguangyue
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty的一个小Demo-有ack和重连的功能相关的知识,希望对你有一定的参考价值。
起因
以前对接过一个tcp协议的接口,实现对类似于手机的pdt设备发送文本文字的功能,对接协议其实是文本形式的,很简单的一种协议。当初一路坎坷的对接完成,那时候实现方式也比较复杂,没有支持断连重连功能,本想着能优化一下,但是直到我从那家公司离职,也没有优化:)
想在回想起来当初实现功能的过程,比较曲折,这其中,当然是因为不熟悉多线程编程,对netty实现方式不熟悉。后续继续看netty的时候,就写下了这个demo程序,希望以后能够用到,这个demo搭建起来后,研究netty就比较方便了,如果以后有人也用netty对接什么协议接口,也是给其他人一些参考。
log4j配置文件
log4j.rootCategory=DEBUG,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%p %t %d{yyyy-MM-dd HH:mm:ss} %C %m%n
项目公有依赖
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
客户端
public class Client {
private final Random random = new Random();
private final static Logger log = LoggerFactory.getLogger(Client.class);
private final AtomicInteger requestSequence = new AtomicInteger(0);
private final Timer timer = new Timer("NettyClient_scanneResponseTable", true);
private Bootstrap bootstrap = new Bootstrap();
private EventLoopGroup eventLoopGroupWorker = new NioEventLoopGroup(2, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(-1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
private final int connectTimeout = 5000;
private final String hostname;
private final int port;
private final String charsetName = "UTF-8";
private Channel channel;
private volatile boolean inited = false;
private final int sendTimeout = 5;
private final int waitResponseTimeout = 10;
protected final ConcurrentMap<Integer, ReponseWrapper> responseTable = new ConcurrentHashMap<Integer, ReponseWrapper>(
256);
public Client(String hostname, int port) {
super();
this.hostname = hostname;
this.port = port;
}
/**
* 初始化
*/
private void init() {
log.info("初始化");
final Charset charset = Charset.forName(this.charsetName);
bootstrap.group(eventLoopGroupWorker)//
.channel(NiosocketChannel.class)//
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_KEEPALIVE, false)//
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout)//
.handler(new ChannelInitializer<SocketChannel>() {//
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("
".getBytes());
pipeline.addLast(//
new StringDecoder(charset), //
new DelimiterBasedFrameDecoder(1024, delimiter), //
new StringEncoder(charset), //
new NettyClinetHandler()//
);
}
});
}
/**
* 连接服务端
*/
public void connect() {
if(!inited) {
this.init();
}
log.info("开始连接");
final ChannelFuture cf = bootstrap.connect(this.hostname, this.port);
try {
cf.await(this.connectTimeout, TimeUnit.SECONDS);
if (cf.isSuccess()) {
log.info("连接[{}]成功", cf.channel());
this.channel = cf.channel();
this.inited = true;
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
Client.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
} else {
if(!inited) {
//是首次连接
this.eventLoopGroupWorker.shutdownGracefully();
}else {
log.info("继续重连");
this.eventLoopGroupWorker.schedule(new ReconnectTask(), nextReconnectDelayTime(), TimeUnit.SECONDS);
}
}
} catch (InterruptedException e) {
log.error("connect[{}] cause exception", cf.channel(), e);
}
}
/**
* 重连随机时间
* @return
*/
protected int nextReconnectDelayTime() {
return this.random.nextInt(5);
}
/**
* 断开连接
*/
public void disconnect() {
this.timer.cancel();
Future<?> future = this.eventLoopGroupWorker.shutdownGracefully();
try {
future.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("断开连接异常",e);
}
this.channel.close();
}
/**
* 发送消息,true成功,false失败
*
* @param msg
* @return
*/
public boolean send(String msg) {
final Integer seq = requestSequence.incrementAndGet();
JSONObject jsonObject = new JSONObject();
jsonObject.put("seq", seq);
jsonObject.put("msg", msg);
final ChannelFuture channelFuture = this.channel.writeAndFlush(jsonObject.toJSONString() + "
");
final int timeoutMillis = (this.sendTimeout + this.waitResponseTimeout) * 1000;
final ReponseWrapper rep = new ReponseWrapper(channelFuture, timeoutMillis);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
responseTable.put(seq, rep);
rep.setSendSuccess(true);
rep.releaseSendMessage();
}
});
try {
rep.awaitSendMessage(this.sendTimeout,TimeUnit.SECONDS);
rep.setSendSuccess(true);
} catch (InterruptedException e) {
log.error("awaitSendMessage[{}] cause exception", channelFuture.channel(), e);
return false;
}
if (responseTable.containsKey(seq)) {
try {
rep.awaitResponse(this.waitResponseTimeout,TimeUnit.SECONDS);
return rep.isResponseSuccess();
} catch (InterruptedException e) {
log.error("awaitResponse[{}] cause exception", channelFuture.channel(), e);
return false;
}
} else {
return false;
}
}
/**
* 检测请求和响应
*/
protected void scanResponseTable() {
Iterator<Entry<Integer, ReponseWrapper>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ReponseWrapper> next = it.next();
ReponseWrapper rep = next.getValue();
long time = rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000;
if (time <= System.currentTimeMillis()) {
rep.releaseAll();
it.remove();
log.warn("remove timeout request, " + rep);
}
}
}
/**
* 业务处理
* @param ctx
* @param msg
*/
protected void processMessageReceived(ChannelHandlerContext ctx, String msg) {
log.trace("接收消息[{}]",msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
Integer seq = jsonObject.getInteger("seq");
final ReponseWrapper responseChannelFutureWrapper = this.responseTable.get(seq);
if (responseChannelFutureWrapper != null) {
responseChannelFutureWrapper.setResponseSuccess(true);
responseChannelFutureWrapper.releaseResponse();
} else {
log.warn("不存的请求号[{}]的消息[{}]",seq,msg);
}
}
/**
* 业务处理
*/
class NettyClinetHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
processMessageReceived(ctx, msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("失去连接,开始准备重连[{}]",ctx.channel());
ctx.executor().schedule(new ReconnectTask(), nextReconnectDelayTime(), TimeUnit.SECONDS);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("发生异常[{}]",ctx.channel(),cause);
}
}
/**
* 重连任务
*/
class ReconnectTask implements Runnable{
@Override
public void run() {
Client.this.connect();
}
}
/**
* 响应
*/
class ReponseWrapper {
private final long beginTimestamp = System.currentTimeMillis();
private final long timeoutMillis;
private final ChannelFuture channelFuture;
private final CountDownLatch sendCountDownLatch = new CountDownLatch(1);
private final CountDownLatch waitResponseCountDownLatch = new CountDownLatch(1);
private volatile boolean sendSuccess = false;
private volatile boolean responseSuccess = false;
public ReponseWrapper(ChannelFuture channelFuture, long timeoutMillis) {
super();
this.timeoutMillis = timeoutMillis;
this.channelFuture = channelFuture;
}
public void awaitSendMessage(int sendTimeout,TimeUnit unit) throws InterruptedException {
this.sendCountDownLatch.await(sendTimeout, unit);
}
public void releaseSendMessage() {
this.sendCountDownLatch.countDown();
}
public void awaitResponse(int waitResponseTimeout,TimeUnit unit) throws InterruptedException {
this.waitResponseCountDownLatch.await(waitResponseTimeout, unit);
}
public void releaseResponse() {
this.waitResponseCountDownLatch.countDown();
}
public void releaseAll() {
this.sendCountDownLatch.countDown();
this.waitResponseCountDownLatch.countDown();
}
public long getBeginTimestamp() {
return beginTimestamp;
}
public long getTimeoutMillis() {
return timeoutMillis;
}
public boolean isSendSuccess() {
return sendSuccess;
}
public void setSendSuccess(boolean sendSuccess) {
this.sendSuccess = sendSuccess;
}
public boolean isResponseSuccess() {
return responseSuccess;
}
public void setResponseSuccess(boolean responseSuccess) {
this.responseSuccess = responseSuccess;
}
@Override
public String toString() {
return "ReponseWrapper [beginTimestamp=" + beginTimestamp + ", timeoutMillis=" + timeoutMillis
+ ", channelFuture=" + channelFuture + ", sendCountDownLatch=" + sendCountDownLatch
+ ", waitResponseCountDownLatch=" + waitResponseCountDownLatch + ", sendSuccess=" + sendSuccess
+ ", responseSuccess=" + responseSuccess + "]";
}
}
}
客户端的测试代码
public class BootstrapClientDemo {
public static void main(String[] args) {
Client client = new Client("127.0.0.1", 9000);
client.connect();
try {
System.out.println("开始发送:"+System.currentTimeMillis());
client.send("hello,world");
System.out.println("结束发送:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
服务端
public class Server {
private AtomicInteger requestSequence = new AtomicInteger(0);
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupBoss = new NioEventLoopGroup();
private final EventLoopGroup eventLoopGroupWorker = new NioEventLoopGroup();
private String charsetName = "UTF-8";
public Server() {
this.serverBootstrap = new ServerBootstrap();
}
public void startup() {
final Charset charset = Charset.forName(this.charsetName);
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NioServerSocketChannel.class)//
.option(ChannelOption.SO_BACKLOG, 1024)//
.option(ChannelOption.SO_REUSEADDR, true)//
.childOption(ChannelOption.TCP_NODELAY, true)//
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("
".getBytes());
pipeline.addLast(//
new StringDecoder(charset), //
new DelimiterBasedFrameDecoder(1024,delimiter),//
new StringEncoder(charset), //
new NettyServerHandler()//
);
}
});
try {
ChannelFuture sync = this.serverBootstrap.bind(9000).sync();
sync.get();
System.out.println("绑定结果:"+sync.isSuccess());
} catch (Exception e) {
e.printStackTrace();
}
}
class NettyServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
processMessageReceived(ctx,msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
}
public void shutdown() {
}
protected void processMessageReceived(final ChannelHandlerContext ctx, final String msg) {
System.out.println("接收消息:"+msg);
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
//System.out.println("回显消息:"+msg);
//ctx.channel().writeAndFlush(msg+"
");
}
}, 10, TimeUnit.SECONDS);
}
}
服务端测试代码
public class BootstrapServerDemo {
public static void main(String[] args) {
Server s = new Server();
s.startup();
try {
TimeUnit.SECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
以上是关于netty的一个小Demo-有ack和重连的功能的主要内容,如果未能解决你的问题,请参考以下文章
H265播放器EasyPlayer测试demo停顿后实现自动重连的优化分享
uniapp mqtt 频繁断开和重连问题(android真机)