如何做一个国产数据库 网络传输 java做订阅客户端
Posted qianbo_insist
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何做一个国产数据库 网络传输 java做订阅客户端相关的知识,希望对你有一定的参考价值。
如何做一个国产数据库一
如何做一个国产数据库二
如何做一个国产数据库三
如何做一个国产数据库四
如何做一个国产数据库五
如何做一个国产数据库六
server端协议定义
再次强调一下我们的protocol 应用层的协议,其中协议第一个字节的前两位如下所示
//1字节 2位
//00 发布数据
//01 订阅数据
//10 心跳数据
//11 返回数据
所以服务端在接收到头部字节结束后,可以知道客户端时订阅客户端还是发布了
int on_headers_complete(void* param)
//client_t* pclient = (client_t*)param;
printf("the header len is %d\\n", pclient->recvlen);
//printf("the id is %04x\\n", getid(pclient));
client_t* cl = (client_t*)param;
//得到头部字节
char head = cl->head[0];
char type = head >> 6;
switch (type)
case 0x00://00 发布数据
//放入发布列表
cout << "publish" << endl;
break;
case 0x01://01 订阅数据
//放入订阅列表
cout << "subscribe" << endl;
break;
case 0x02://10 心跳数据
cout << "heartbeat" << endl;
break;
return 0;
接下去,就可以把订阅和发布客户端分别放到不同的队列里面去了,暂时不讲这些,先讲如何使用java做我们的订阅客户端,java最常用的就是netty,下面我们使用netty来做一个客户端。以下是tcpclient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NiosocketChannel;
import io.netty.channel.ChannelPipeline;
public class TcpClient
// 要请求的服务器的ip地址
private String ip;
// 服务器的端口
private int port;
public TcpClient(String ip, int port)
this.ip = ip;
this.port = port;
// 请求端主题
private void action() throws InterruptedException
EventLoopGroup bossGroup = new NioEventLoopGroup();
Bootstrap bs = new Bootstrap();
bs.group(bossGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new MessageDecodeClient(255, 6, 1));
// 处理来自服务端的响应信息
socketChannel.pipeline().addLast(new TcpClientHandle());
);
// 客户端开启
ChannelFuture cf = bs.connect(ip, port).sync();
byte[] respByte = new byte[6];
//....以下为协议写入省略,请注意自行写出,若有问题,可以探讨
// 发送客户端的请求
cf.channel().writeAndFlush(Unpooled.copiedBuffer(respByte));
// 等待直到连接中断
cf.channel().closeFuture().sync();
public static void main(String[] args) throws InterruptedException
new TcpClient("127.0.0.1", 8054).action();
以下为解码函数
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* @author qianbo 协议重写成java
*/
public class MessageDecodeClient extends LengthFieldBasedFrameDecoder
private static final int hsize = 6;
public MessageDecodeClient(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength)
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
if (in == null)
return null;
if (in.readableBytes() < hsize)
return null;
in.markReaderIndex();
byte magic = in.readByte(); //头部字节0x69
byte titlelen = in.readByte(); //四字节大端ID号码
int dataLength = in.readIntLE();
dataLength += titlelen;
if (in.readableBytes() < dataLength)
in.resetReaderIndex();
return null;
//钱波 :根据协议加上titlelen 和 数据len
byte[] data = new byte[dataLength];
in.readBytes(data);
String body = new String(data, "UTF-8");
return body;
处理的TcpClientHandle .java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TcpClientHandle extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
// empty
//可做一些工作
/*
* 建立连接时,返回消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
ctx.writeAndFlush("客户端"+ InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! \\n");
super.channelActive(ctx);
/**
* 客户端断开
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
System.out.println("channelInactive");
/**
* 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
以上为主要的框架,读者可以根据基础自行写出,后面我也会补充写完整,希望能给您有所启发。
以上是关于如何做一个国产数据库 网络传输 java做订阅客户端的主要内容,如果未能解决你的问题,请参考以下文章