Netty 作为TCP server

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty 作为TCP server相关的知识,希望对你有一定的参考价值。


目录

​​使用Netty框架作为TCP server,做上位机​​

        ​​1. 引用Netty框架POM.xml ,添加引入​​

​​2. 创建NettyServer​​

​​3. 创建 NettyChannelService​​

​​4. 创建 MyServerHandler ​​

​​5. 创建MyChannelInitializer​​

​​6. 如何外部发给消息到TCP Client那?​​

​​7. 发消息测试:​​


使用Netty框架作为TCP server,做上位机

1. 引用Netty框架POM.xml ,添加引入

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.1.55.Final</version>
</dependency>

2. 创建NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioserverSocketChannel;

import java.util.HashMap;
import java.util.Map;

/**
* @author huochengyan
* @version 1.0
* @date 2022/5/24 17:09
*/
public class NettyServer

static NettyServer server;
static Map sensorIdChannel=new HashMap();
public static void main(String[] args)

//自定义发送任务 给下位机发送的。
Thread customTaskThread=new Thread(new CustomTaskInfo());
customTaskThread.start();

server=new NettyServer();
//server.bing(8088);
server.bing(JdbcUtil.serverPort);



private void bing(int port)
//配置服务端NIO线程组
EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
EventLoopGroup childGroup = new NioEventLoopGroup();
try
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new MyChannelInitializer());
ChannelFuture f = b.bind(port).sync();
System.out.println("run........");
f.channel().closeFuture().sync();
catch (InterruptedException e)
e.printStackTrace();
finally
childGroup.shutdownGracefully();
parentGroup.shutdownGracefully();




3. 创建 NettyChannelService

import io.netty.channel.ChannelHandlerContext;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author huochengyan
* @version 1.0 保存Netty 链接状态
* @date 2022/5/24 17:36
*/
public class NettyChannelService
private static ConcurrentHashMap<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();

public static Map<String, ChannelHandlerContext> getChannels()
return map;


public static void saveChannel(String key, ChannelHandlerContext ctx)
if (map == null)
map = new ConcurrentHashMap<>();

map.put(key, ctx);


public static ChannelHandlerContext getChannel(String key)
if (map == null || map.isEmpty())
return null;

return map.get(key);


public static void removeChannel(String key)
map.remove(key);

4. 创建 MyServerHandler 

import com.longder.sensor.NettySensorClientMsg;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.CharsetUtil;

import java.awt.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.logging.Logger;

/**
* @author huochengyan
* @version 1.0
* @date 2022/5/24 17:08
*/

public class MyServerHandler extends ChannelInboundHandlerAdapter

/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("链接报告开始");
System.out.println("链接报告信息:有一客户端链接到本服务端");
System.out.println("链接报告IP:" + channel.localAddress().getHostString());
System.out.println("链接报告Port:" + channel.localAddress().getPort());
System.out.println("链接报告完毕");
//通知客户端链接建立成功
// String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "rn";
// ByteBuf buf = Unpooled.buffer(str.getBytes().length);
// buf.writeBytes(str.getBytes("GBK"));
// ctx.writeAndFlush(buf);


String uuid = ctx.channel().id().asLongText();
NettyChannelService.saveChannel(uuid, ctx);
System.out.println("连接请求进入: " + uuid + " 地址: " + ctx.channel().remoteAddress());


/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
//通知客户端链消息发送成功

ByteBuf in = (ByteBuf)msg;
int readableBytes = in.readableBytes();
byte[] bytes =new byte[readableBytes];
in.readBytes(bytes);
System.out.println("[client->server]:"+new String(bytes));
System.out.println("[server ip]:"+ctx.channel().remoteAddress());
System.out.print(in.toString(CharsetUtil.UTF_8));

//业务
String clientIp=ctx.channel().remoteAddress().toString().replace("/","").split(":")[0];

String clientMsg=new String(bytes);
String deviceId="";
if(clientMsg.substring(0,1).equals("$"))
String[] arr = clientMsg.split(",");
deviceId = arr[4];

NettyServer.sensorIdChannel.put(deviceId,ctx.channel().id());
NettySensorClientMsg.uploadSensorMsg(clientIp,clientMsg);

String resultClientMsg=NettySensorClientMsg.getSensorResult(clientMsg);

// String resultClientMsg = "服务端收到:" + new Date() + " " + msg + "rn";
ByteBuf buf = Unpooled.buffer(resultClientMsg.getBytes().length);
buf.writeBytes(resultClientMsg.getBytes("UTF-8"));
ctx.writeAndFlush(buf);



/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
ctx.close();
System.out.println("异常信息:rn" + cause.getMessage());



5. 创建MyChannelInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

import java.nio.charset.Charset;

/**
* @author huochengyan
* @version 1.0
* @date 2022/5/24 17:06
*/

public class MyChannelInitializer extends ChannelInitializer<SocketChannel>

@Override
protected void initChannel(SocketChannel channel)
// 基于换行符号
//channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 解码转String,注意调整自己的编码格式GBK、UTF-8
//channel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyServerHandler());


6. 如何外部发给消息到TCP Client那?

看下面的方法:

/**
* 发送消息给指定的tcp Client
*
* @throws InterruptedException
*/
public static void sendToClient(String clientIp, String deviceId, String exeStr) throws InterruptedException
for (String key : NettyChannelService.getChannels().keySet())
ChannelHandlerContext ctx = NettyChannelService.getChannel(key);
if (ctx == null)
continue;

if (ctx.channel().isActive())

if (!clientIp.equals(ctx.channel().remoteAddress().toString().replace("/", "").split(":")[0]))
continue;

String reqStr = exeStr; // "OK\\r";
byte[] reqStrBytes = reqStr.getBytes(); //getHexBytes(reqStr);
ByteBuf reqStrByteBuf = ctx.alloc().buffer(reqStrBytes.length);
reqStrByteBuf.writeBytes(reqStrBytes);
ctx.writeAndFlush(reqStrByteBuf);
// System.out.println("[server --> client: " + ctx.channel().remoteAddress() + "] channel id: " + key);
System.out.println("[server --> client" + ctx.channel().remoteAddress() + "]: " + reqStr);
// 这里暂停一下是防止channelRead收到的数据粘包
Thread.sleep(1000);
else
NettyChannelService.removeChannel(key);


7. 发消息测试:

Netty

以上是关于Netty 作为TCP server的主要内容,如果未能解决你的问题,请参考以下文章

Netty解码的艺术

Netty粘包拆包解决方案

Java分布式框架netty之NIO框架区别分析

Netty解码器

Netty 中 TCP 消息边界问题及按行分割消息

这就是大名鼎鼎的Netty, HadoopDubbo都用了