分布式理论,架构设计Socket和IO模型

Posted 拐柒

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式理论,架构设计Socket和IO模型相关的知识,希望对你有一定的参考价值。

分布式理论,架构设计(一)

Socket和IO模型

Socket

socket,套接字,就是两台主机之间的连接端点,TCP/IP协议是传输层协议,主要解决数据如何在网络中传输,而HTTP是应用层协议,主要解决如何包装数据。socket是通信的基石,是支持TCP/IP协议的网络通信的基本操作单元。他是网络通信过程中端点的抽象标识,包含进行网络通信必须的五种信息:连接使用的协议,本地主机的IP地址,本地进程的协议端口,远程主机的IP地址,远程进程的协议端口。

socket整体流程

代码实现

服务端

public static void main(String[] args) throws IOException 
        //1、创建线程池,如果有客户端连接就创建一个线程,与之通信
        ExecutorService service= Executors.newCachedThreadPool();
        //2、
        ServerSocket serverSocket=new ServerSocket(7777);
        while (true)
            //3、进行端口监听
            final Socket socket=serverSocket.accept();
            System.out.println("有客户端连接");
            //4、从线程池中获取一个线程进行socket处理
            service.execute(new Runnable() 
                @Override
                public void run() 
                    handle(socket);
                
            );
        
    
    public static void handle(Socket socket)
        try 
            System.out.println("线程ID:"+Thread.currentThread().getId()+"线程名称:"+Thread.currentThread().getName());
            //获取输入流,获取传递的信息
            InputStream inputStream = socket.getInputStream();
            byte[] b=new byte[1024];
            int read = inputStream.read(b);
            System.out.println("客户端通信信息:"+new String(b,0,read));
            //获取输出流进行信息回复
            OutputStream outputStream = socket.getOutputStream();
            outputStream.write("回复信息".getBytes());
         catch (Exception e) 
            e.printStackTrace();
        finally 
            try 
                //关闭资源
                socket.close();
             catch (IOException e) 
                e.printStackTrace();
            
        

    

客户端

public static void main(String[] args) throws Exception 
        while (true)
            //创建socket并监听localhost,端口7777
            Socket socket=new Socket("localhost",7777);
            //获取输出流
            OutputStream outputStream = socket.getOutputStream();
            System.out.println("请输入:");
            //输出流获取信息来自于控制台输入
            Scanner scanner=new Scanner(System.in);
            String msg=scanner.nextLine();
            outputStream.write(msg.getBytes(StandardCharsets.UTF_8));
            //获取输入流,获取服务端回复消息
            InputStream inputStream = socket.getInputStream();
            byte[] bytes=new byte[1024];
            int read = inputStream.read(bytes);
            System.out.println("服务端回复的消息是:"+new String(bytes,0,read).trim());
            //关闭资源
            socket.close();
        
    

I/O模型

I/O模型就是用什么样的通道进行输出的发送和接受,很大成都上决定了程序通信的性能。
java工支持三种网络编程I/O模型:BIO(同步并阻塞)、NIO(同步非阻塞)、AIO(异步非阻塞)。

阻塞非阻塞:主要指的是访问IO的线程是否会阻塞(或处于等待状态)

同步和异步:主要是指数据的请求方式。
BIO

  • BIO模型图

  • BIO问题分析
    1、每个请求都需要创建独立的线程,与对应的客户端进行数据read、业务处理,数据write
    2、并发数较大的时候,需要创建大量线程来处理连接,系统资源占用较大
    3、建立建立后,如果当前线程暂时没有数据可读,则线程就会则色在read操作上,造成线程资源浪费。

NIO
NIO模型图

AIO
AIO引入异步通道 的该年,采用了proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是现有操作系统完成后才通知服务端程序启动线程去处理,一般使用鱼连接数较多且连接时间较长的应用

NIO详解

1、NIO有三大核心部分:channel(通道),buffer(缓冲区),selector(选择器)
2、NIO是面向缓冲区编程的,数据读取到一个缓冲区中,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
3、java NIO的非阻塞模式,使一个线程从某个通道发送请求或者读取数据,但是他仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变得可读之前,该线程可用继续做其他事情。
NIO核心原理示意图


缓冲区(Buffer)

缓冲区本质上就是一个可读写数据的内存块,可用理解为一个数组,该对象提供了一组方法,可用更轻松的使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的变化情况。channel提供从网络读取数据的渠道,但是读取或写入的数据都必须经过buffer。
buffer图解

代码实现
创建缓冲区

public static void main(String[] args) 
        //1、创建指定长度的byteBuffer
        ByteBuffer buffer=ByteBuffer.allocate(5);
        for (int i = 0; i < 5; i++) 
            System.out.println(buffer.get());//从缓冲区中获取数据
        
//        System.out.println(buffer.get());
        //2、创建有内容的缓冲区
        ByteBuffer buffer1=ByteBuffer.wrap("张三".getBytes());
        for (int i = 0; i < 4; i++) 
            System.out.println(buffer1.get());
        
    

向缓冲区中添加数据

代码实现

//1、创建一个缓冲区
        ByteBuffer buffer=ByteBuffer.allocate(10);
        System.out.println(buffer.position());//0 获取当前索引所在位置
        System.out.println(buffer.limit());//10 获取最多能操作到哪个索引
        System.out.println(buffer.capacity());//10 获取缓冲区长度
        System.out.println(buffer.remaining());//10 limit-position,还有多少个可以操作的个数
//        System.out.println("-------------------");
//        buffer.position(1);
//        buffer.limit(9);
//        System.out.println(buffer.position());//1 获取当前索引所在位置
//        System.out.println(buffer.limit());//9 获取最多能操作到哪个索引
//        System.out.println(buffer.capacity());//10 获取缓冲区长度
//        System.out.println(buffer.remaining());//8 limit-position,还有多少个可以操作的个数
        buffer.put((byte) 97);
        System.out.println(buffer.position());//1 获取当前索引所在位置
        System.out.println(buffer.limit());//10 获取最多能操作到哪个索引
        System.out.println(buffer.capacity());//10 获取缓冲区长度
        System.out.println(buffer.remaining());//9 limit-position,还有多少个可以操作的个数
        System.out.println("-------------------");
        buffer.put("aaa".getBytes());
        System.out.println(buffer.position());//4 获取当前索引所在位置
        System.out.println(buffer.limit());//10 获取最多能操作到哪个索引
        System.out.println(buffer.capacity());//10 获取缓冲区长度
        System.out.println(buffer.remaining());//6 limit-position,还有多少个可以操作的个数
        System.out.println("-------------------");
        buffer.put("12345".getBytes());
        System.out.println(buffer.position());//9 获取当前索引所在位置
        System.out.println(buffer.limit());//10 获取最多能操作到哪个索引
        System.out.println(buffer.capacity());//10 获取缓冲区长度
        System.out.println(buffer.remaining());//1 limit-position,还有多少个可以操作的个数
        System.out.println(buffer.hasRemaining());//是否还能操作
        //缓冲区满了,再添加就会报错,如果修改position,就会覆盖之前索引位置的信息
        buffer.position(1);
        buffer.put("123456".getBytes());
        System.out.println("-------------------");
        System.out.println(buffer.position());//7 获取当前索引所在位置
        System.out.println(buffer.limit());//10 获取最多能操作到哪个索引
        System.out.println(buffer.capacity());//10 获取缓冲区长度
        System.out.println(buffer.remaining());//3 limit-position,还有多少个可以操作的个数

从缓存区中读取数据


2.5通道(Channel)

通常来说NIO中的所有IO都是从Channel开始的,NIO的通道类似于流,但是有些区别如下:
1、通道可以读也可以写,流一般来说是单向的
2、通道可以异步读写
3、通道总是基于缓冲区buffer来读写。

ServerSocketChannel

服务实现步骤:
1.打开一个服务端通道
2.绑定对应的端口号
3.通道默认是阻塞的,需要设置为非阻塞
4.检查是否有客户端连接 有客户端连接会返回对应的通道
5.获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
6.给客户端回写数据
7.释放资源

public static void main(String[] args) throws IOException, InterruptedException 
        /**
         * 1.打开一个服务端通道
         * 2.绑定对应的端口号
         * 3.通道默认是阻塞的,需要设置为非阻塞
         * 4.检查是否有客户端连接 有客户端连接会返回对应的通道
         * 5.获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
         * 6.给客户端回写数据
         * 7.释放资源
         */
//        1.打开一个服务端通道
        ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
//        2.绑定对应的端口号
        serverSocketChannel.bind(new InetSocketAddress(7777));
//        3.通道默认是阻塞的,需要设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        System.out.println("服务端启动+++");
        while (true)
//        4.检查是否有客户端连接 有客户端连接会返回对应的通道
            SocketChannel accept = serverSocketChannel.accept();
            if(accept==null)
                System.out.println("没有客户端连接");
                Thread.sleep(1000);
                continue;
            
//        5.获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
            ByteBuffer allocate = ByteBuffer.allocate(1024);
            //返回值结果,返回是整数,代表本次读取到的有效字节数,0代表没有读到数据,-1代表读到末位
            int read = accept.read(allocate);
            System.out.println("客户端读取到的数据:"+new String(allocate.array(),0,read, StandardCharsets.UTF_8));
//        6.给客户端回写数据
            accept.write(ByteBuffer.wrap("嘻嘻".getBytes(StandardCharsets.UTF_8)));
            //        7.释放资源
            accept.close();
        
    

SocketChannel

实现步骤
1.打开通道
2.设置连接IP和端口号
3.写出数据
4.读取服务器写回的数据
5.释放资源

代码实现

 public static void main(String[] args) throws IOException 
//        1.打开通道
        SocketChannel channel=SocketChannel.open();
//        2.设置连接IP和端口号
        channel.connect(new InetSocketAddress("localhost",7777));
//        3.写出数据
        channel.write(ByteBuffer.wrap("嘿嘿".getBytes(StandardCharsets.UTF_8)));
//        4.读取服务器写回的数据
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        int read = channel.read(allocate);
        System.out.println("服务端消息:"+new String(allocate.array(),0,read,StandardCharsets.UTF_8));
//        5.释放资源
        channel.close();
    

Selector(选择器)

可以用一个线程,处理多个的客户端连接,就会使用到NIO的Selector(选择器). Selector 能够检测多个注册的服务端通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的 处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求

实现步骤:
1.打开一个服务端通道
2.绑定对应的端口号
3.通道默认是阻塞的,需要设置为非阻塞
4.创建选择器
5.将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
6.检查选择器是否有事件
7.获取事件集合
8.判断事件是否是客户端连接事件SelectionKey.isAcceptable()
9.得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
10.判断是否是客户端读就绪事件SelectionKey.isReadable()
11.得到客户端通道,读取数据到缓冲区
12.给客户端回写数据
13.从集合中删除对应的事件, 因为防止二次处理.

代码实现

public static void main(String[] args) throws IOException 
//        1.打开一个服务端通道
        ServerSocketChannel channel = ServerSocketChannel.open();
//        2.绑定对应的端口号
        channel.bind(new InetSocketAddress(7777));
//        3.通道默认是阻塞的,需要设置为非阻塞
        channel.configureBlocking(false);
//        4.创建选择器
        Selector selector = Selector.open();
//        5.将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
        channel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务端启动成功");
        while (true)
            //        6.检查选择器是否有事件,返回值为时间个数
            int select = selector.select(2000);
            if(select==0)
                System.out.println("无事发生");
                continue;
            
//        7.获取事件集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
//        8.判断事件是否是客户端连接事件SelectionKey.isAcceptable()
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext())
                SelectionKey next = iterator.next();
                if(next.isAcceptable())
                    //        9.得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
                    SocketChannel accept = channel.accept();
                    accept.configureBlocking(false);
                    System.out.println("有客户端连接");
                    //将通道设置为非阻塞状态,因为selector需要轮询监听每个通道
                    accept.register(selector,SelectionKey.OP_READ);
                
                //        10.判断是否是客户端读就绪事件SelectionKey.isReadable()
                if (next.isReadable()) 
//        11.得到客户端通道,读取数据到缓冲区
                    SocketChannel socketChannel = (SocketChannel) next.channel();
                    ByteBuffer allocate = ByteBuffer.allocate(1024);
                    int read = socketChannel.read(allocate);
                    if(read>0)
                        System.out.println("客户端消息:"+new String(allocate.array(),0,read, StandardCharsets.UTF_8));
                        //        12.给客户端回写数据
                        socketChannel.write(ByteBuffer.wrap("嘻嘻".getBytes(StandardCharsets.UTF_8)));
                        socketChannel.close();
                    
                
                //        13.从集合中删除对应的事件, 因为防止二次处理.
                iterator.remove();
            
        
    

以上是关于分布式理论,架构设计Socket和IO模型的主要内容,如果未能解决你的问题,请参考以下文章

分布式理论,架构设计 Netty

分布式理论,架构设计 Netty

架构设计:系统间通信——IO通信模型和JAVA实践 中篇

socket.io 广播功能 & Redis pub/sub 架构

架构IO多路复用

架构IO多路复用