手写dubbo-2netty实现简单群聊私聊
Posted 叁滴水
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写dubbo-2netty实现简单群聊私聊相关的知识,希望对你有一定的参考价值。
一、功能背景
巩固netty知识,使用netty完成一个聊天系统,通过该聊天系统更加深入的了解netty。设计知识点:nio、reactor模型、tcp粘包拆包、自定义协议等等。
二、功能描述
- 服务端启动,客户端连接服务端。
- 服务器记录在线客户端,并且分配客户端userId。
- 客户端发送消息。
- 服务端根据发发送类型判断群发还是私聊。
- 客户端收到消息并且打印消息。
三、功能架构图
四、功能预览
server
client1 群发消息
client2消息接收
client3消息接收
client1发送消息给client3
client3控制台打印
五、代码示例
5.1、pom引入jar
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
<!-- 阿里JSON解析器 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
5.2、server端
新建QQChatServer.java文件。主要完成功能:server启动,设置监听端口。指定编解码器、处理器等等。其中有内部类QQChatHandler
。其主要完成用户连接成功之后保存所有连接用户Channel信息并且分配对应userId
。并且通知客户端他自己的userId。
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioserverSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class QQChatServer
{
public static void main (String[] args) throws InterruptedException
{
//单线程接收tcp连接
EventLoopGroup bossGroup = new NioEventLoopGroup (1);
EventLoopGroup workerGroup = new NioEventLoopGroup ();
try
{
ServerBootstrap serverBootstrap = new ServerBootstrap ();
serverBootstrap.group (bossGroup, workerGroup)
.option (ChannelOption.SO_BACKLOG, 128)
.childOption (ChannelOption.SO_KEEPALIVE, true)
.channel (NioServerSocketChannel.class)
.childHandler (new ChannelInitializer<SocketChannel> ()
{
@Override
protected void initChannel (SocketChannel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline ();
//设置编解码器
pipeline.addLast ("qqdecoder", new QQChatDecoder ());
pipeline.addLast ("qqencoder", new QQChatEncoder ());
//处理器
pipeline.addLast ("qqChatHandler", new QQChatHandler ());
}
});
ChannelFuture sync = serverBootstrap.bind (7777).sync ();
//监听关闭
sync.channel ().closeFuture ().sync ();
}
finally
{
bossGroup.shutdownGracefully ();
workerGroup.shutdownGracefully ();
}
}
}
class QQChatHandler extends SimpleChannelInboundHandler<QQMessageProtocol>{
private static ChannelGroup channelGroup = new DefaultChannelGroup (GlobalEventExecutor.INSTANCE);
private static Map<Long,Channel> channelMap = new HashMap<> ();
//客户端连接成功
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//保存channel信息
channelGroup.add (ctx.channel ());
//分配userId,因为简单测试,这里用随机数简单测试
double random = Math.random ();
Long userId = (long)(random * 100);
channelMap.put (userId,ctx.channel ());
System.out.println ("userId:"+userId+"上线了。");
//返回用户userId,告知客户端
Content ackCon = new Content ();
ackCon.setMsg ("你的userId为:"+userId);
QQMessageProtocol ackmsg = new QQMessageProtocol (ackCon);
ctx.channel ().writeAndFlush (ackmsg);
}
//接收客户端的消息并抓发
@Override
protected void channelRead0 (ChannelHandlerContext ctx, QQMessageProtocol buf) throws Exception
{
//获取客户端发送的信息
String s = new String (buf.getContent (), "utf-8");
Content content = JSONObject.parseObject (s, Content.class);
//如果userId 为0,则认为是群发
if(content.getUserId ()!=null && content.getUserId ().equals (0l)){
Iterator<Channel> iterator = channelGroup.iterator ();
content.setUserId (null);
content.setMsg ("群聊消息:"+content.getMsg ());
QQMessageProtocol groupMsg = new QQMessageProtocol (content);
while (iterator.hasNext ()){
Channel next = iterator.next ();
if(next != ctx.channel ()){
next.writeAndFlush (groupMsg);
}
}
}else{
//私聊
Channel channel = channelMap.get (content.getUserId ());
channel.writeAndFlush (buf);
}
}
}
5.3、自定义协议
QQMessageProtocol.java
。主要功能:在数据传输时,解决tcp粘包、拆包等问题。
import com.alibaba.fastjson.JSONObject;
import java.nio.charset.Charset;
public class QQMessageProtocol
{
//数据包长度
private int len;
//数据
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
public QQMessageProtocol ()
{
}
public QQMessageProtocol (Content content)
{
//获取content的内容,字节数信息
String s = JSONObject.toJSONString (content);
byte[] con = s.getBytes(Charset.forName("utf-8"));
int length = s.getBytes(Charset.forName("utf-8")).length;
this.content = con;
this.len = length;
}
}
Content.java
。主要功能:存放真正的信息,私聊对应的userId
和msg
。传输时content
转成字节数组存放在QQMessageProtocol
的content
中。
public class Content
{
private Long userId;
private String msg;
//此处省略get、set方法
}
QQChatEncoder.java
。主要功能:完成消息的编码。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class QQChatEncoder extends MessageToByteEncoder<QQMessageProtocol>
{
@Override
protected void encode(ChannelHandlerContext ctx, QQMessageProtocol msg, ByteBuf out) throws Exception {
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
QQChatDecoder.java
。主要功能:完成消息的解码。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class QQChatDecoder extends ReplayingDecoder<Void>
{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//需要将得到二进制字节码-> MessageProtocol 数据包(对象)
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
//封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
QQMessageProtocol QQMessageProtocol = new QQMessageProtocol ();
QQMessageProtocol.setLen(length);
QQMessageProtocol.setContent(content);
out.add(QQMessageProtocol);
}
}
5.4、客户端代码
QQChatClient.java
。主要功能:完成控制台输入进行消息传输、消息接收打印。
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Scanner;
public class QQChatClient
{
public static void main (String[] args) throws InterruptedException
{
EventLoopGroup group = new NioEventLoopGroup ();
try
{
Bootstrap serverBootstrap = new Bootstrap ();
serverBootstrap.group (group)
.channel (NioSocketChannel.class)
.handler (new ChannelInitializer<SocketChannel> ()
{
@Override
protected void initChannel (SocketChannel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline ();
pipeline.addLast ("qqdecoder", new QQChatDecoder ());
pipeline.addLast ("qqencoder", new QQChatEncoder ());
pipeline.addLast ("GroupChatHandler", new QQClientHandler ());
}
});
ChannelFuture sync = serverBootstrap.connect ("127.0.0.1",7777).sync ();
Channel channel = sync.channel();
System.out.println("-------请输入发送信息 {userId},{msg} userId=0时为群发--------");
//客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
Content con = new Content();
String[] split = msg.split (",");
con.setUserId (Long.parseLong (split[0]));
con.setMsg (split[1]);
QQMessageProtocol QQMessageProtocol = new QQMessageProtocol (con);
//通过channel 发送到服务器端
channel.writeAndFlush(QQMessageProtocol);
}
//监听关闭
sync.channel ().closeFuture ().sync ();
}
finally
{
group.shutdownGracefully ();
}
}
}
class QQClientHandler extends SimpleChannelInboundHandler<QQMessageProtocol> {
//打印消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, QQMessageProtocol msg) throws Exception {
String s = new String (msg.getContent (), "utf-8");
Content content = JSONObject.parseObject (s, Content.class);
String userId = content.getUserId () == null ? "server" : content.getUserId ().toString ();
System.out.println(userId+":"+content.getMsg ());
}
}
以上是关于手写dubbo-2netty实现简单群聊私聊的主要内容,如果未能解决你的问题,请参考以下文章
推荐一个.Net Core开发的Websocket群聊私聊的开源项目