java-NIO编程和线程池

Posted 飞机耳朵

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java-NIO编程和线程池相关的知识,希望对你有一定的参考价值。

NIO
------------
    1、传统IO
        传统IO是阻塞模式,处理并发的时候,需要启动多个线程,
        cpu需要在多线程上下文之间进行频繁切换,而大多数线程通
        常处于阻塞状态,导致CPU资源利用底下。

    2、New IO
        非阻塞,传统IO是阻塞模式,不需要启动大量线程,通常结合
        线程池能够实现高并发编程。

    
    
    3、零拷贝
        常规拷贝文件需要经过四次数据交换,分别是
        (1)从disk到系统空间,
        (2)从系统空间到用户空间,
        (3)用户空间到系统空间,
        (4)系统空间到目标设备的缓冲区。
        零拷贝是将磁盘文件复制系统内核,从系统内核直接输出数据到
        目标设备缓冲区。
        java的FileChannel.transfer()可以实现零拷贝。
        零拷贝有个2G文件大小限制。

    5、虚拟内存
        FileChannel.map()方法可以将磁盘文件的特定区域映射到内存,
        直接操纵内存缓冲区时,速度更快,而且最终数据会按照自己的
        设定同步到磁盘,这个磁盘写入过程不需要我们处理。

        映射模式有三种:
        a、read-only
            对缓冲区只能读,写入出异常
        b、read-write
            能读能写,最终缓冲区数据会进入到文件中。
        c、private
            可以写入数据到缓冲区,但不能传播到磁盘,并且对其程序不可见。
            该模式通话用于从磁盘文件加载数据到内存完成初始化工作,但不
            将缓冲区数据写回到磁盘文件中。
    
        e、编程实现
            import java.io.FileInputStream;
            import java.io.FileNotFoundException;
            import java.io.RandomAccessFile;
            import java.nio.MappedByteBuffer;
            import java.nio.channels.FileChannel;

            /**
             * 使用NIO实现虚拟内存
             */
            public class TestVirtualMemory {
                public static void main(String[] args) throws Exception {
                    RandomAccessFile raf = new RandomAccessFile("d:/1.txt" , "rw") ;
                    FileChannel fc = raf.getChannel() ;
                    MappedByteBuffer buf = fc.map(FileChannel.MapMode.READ_WRITE , 1 , 9) ;
            //        buf.put(4 , (byte)‘Y‘) ;
                    char c = (char)buf.get(1);
                    System.out.println(c);
                    for(int i = 0 ; i < 9 ; i ++){
                        buf.put(i ,(byte)(97 + i)) ;
                    }
                    fc.close();
                }
            }

工作模式
----------------
    1、单工
        消息只能向一方传递。

    2、双工
        消息可以双向传递。
        半双工    :同一时刻只能向一方传输。
        全双工    : 同一时刻可以向两方传输。

Socket下NIO编程
----------------
    1、介绍
        编程思路和传统的socket编程大致相同,但引进了新的概念就是Channel。
        ServerSocket对应的ServerSocketChannel,Socket对应SocketChannel。
        先开启服务器通道,让后绑定到特定的端口上,在开启client端通道,并
        将客户端通道连接到服务器通道。

        java的NIO的socket编程可以工作在阻塞和非阻塞模式下,默认是阻塞的。
        通过ServerSocketChannel.configureBlocking(false) 配置成非阻塞。

    2、ServerSocketChannel工作流程
        a、开启ServerSocketChannel
        b、设置阻塞模式
        c、绑定到特定端口
        d、开启挑选器
        e、在挑选器中注册服务器通道,需要指定感兴趣事件(accept)
            感兴趣事件有四中
            1、op_accept
            2、op_connect
                对应的方法isConnectable(),在客户端判断该方法,并
                通过finishConnect()方法来完成连接。

            3、op_read
            4、op_write

        f、挑选器循环挑选
        g、取得挑选器内部的挑选集合
        h、处理每个key

    3、SocketChannel.finishConnect()方法
        客户端socket封装的channel,其有finishConnect()方法,
        该方法完成通道的连接过程。
        非阻塞连接通过设置通道为非阻塞模式以及调用connect()方法来
        完成初始化。一旦连接建立或连接尝试失败,socketchannel都会变
        成connectable状态,就可以调用该方法完成连接序列。如果连接操作
        失败,该方法就会抛出相应异常。

        如果连接已经建立,该方法不阻塞,而是立刻返回true。如果通道工作在
        非阻塞模式下而连接过程尚未完成,则该方法返回false。如果通道工作在
        阻塞模式下,该方法便会阻塞直到完成或失败,最终要么返回true,要么抛
        出异常。

        调用读写时等待连接完成。

        注意,客户端套接字通道调用该方法,

    4、selector中的内部集合
        内部维护了三个集合:
        1、key set
            包含了注册的所有通道对应的key,keys()返回所有注册的key。
            selector.keys()

        2、selected-key set
            挑选出来的key的集合,在自身已经发生了至少一个感兴趣事件通道
            所对应的key,selectedKeys()方法返回该集合,该集合是(1)的子集。
            select.selectedKeys()

        3、cancelled-key
            key被撤销但所对应通道尚未注销的key集合,该集合无法直接访问,也是
            key set的子集。
            select.cancel(key);

    3、编程实现
        [服务器端]
        package com.oldboy.java.nio;

        import java.io.ByteArrayOutputStream;
        import java.io.IOException;
        import java.net.InetSocketAddress;
        import java.nio.ByteBuffer;
        import java.nio.channels.SelectionKey;
        import java.nio.channels.Selector;
        import java.nio.channels.ServerSocketChannel;
        import java.nio.channels.SocketChannel;
        import java.util.Iterator;
        import java.util.Set;

        /**
         * 服务器端
         */
        public class MyServer {
            public static void main(String[] args) throws Exception {
                //开启服务器通道
                ServerSocketChannel ssc = ServerSocketChannel.open();
                //配置非阻塞
                ssc.configureBlocking(false) ;

                //创建地址对象
                InetSocketAddress addr = new InetSocketAddress( 8888) ;
                //绑定通道到特定的地址上
                ssc.bind(addr) ;

                //开启一个挑选器
                Selector sel = Selector.open() ;

                //在挑选中注册channel
                ssc.register(sel , SelectionKey.OP_ACCEPT) ;
                System.out.println("注册服务器通道完成!!!");

                //创建字节缓冲区
                ByteBuffer buf = ByteBuffer.allocate(1024) ;

                //开始循环挑选
                while(true){
        //            System.out.println("开始挑选....");
                    sel.select() ;
        //            System.out.println("挑出发生了!!!");
                    //得到挑选出来的key集合
                    Iterator<SelectionKey> it = sel.selectedKeys().iterator() ;


                    while(it.hasNext()){
                        //一定是服务器通道
                        SelectionKey key = it.next() ;
                        if(key.isAcceptable()){
                            //接受新socket
                            SocketChannel sc0 = ssc.accept();
                            //配置非阻塞模式
                            sc0.configureBlocking(false) ;
                            //在selector中注册socketChannel,指定感兴趣事件
                            sc0.register(sel, SelectionKey.OP_READ  | SelectionKey.OP_CONNECT) ;
                        }

                        //可连接
                        if(key.isConnectable()){
                            SocketChannel sc0 = (SocketChannel) key.channel();
                            //完成连接
                            sc0.finishConnect() ;
                        }

                        //是否可读
                        if(key.isReadable()){
                            SocketChannel sc0 = (SocketChannel) key.channel();
                            //内存输出流
                            ByteArrayOutputStream baos = new ByteArrayOutputStream() ;
                            //读到了数据
                            while(sc0.read(buf) > 0){
                                //
                                buf.flip();
                                byte[] arr = buf.array();
                                baos.write(arr , 0 , buf.limit());
                                buf.clear() ;
                            }
                            //
                            String msg = new String(baos.toByteArray());
                            InetSocketAddress remoteAddr = (InetSocketAddress) sc0.socket().getRemoteSocketAddress();
                            String ip = remoteAddr.getAddress().getHostAddress();
                            int port = remoteAddr.getPort() ;
                            System.out.printf("[%s:%d]说 : %s\r\n" , ip , port ,  msg);
                        }

                        //删除该key
                        it.remove();
                    }
                }
            }
        }


        [客户端]
        package com.oldboy.java.nio;

        import java.io.IOException;
        import java.net.InetSocketAddress;
        import java.nio.ByteBuffer;
        import java.nio.channels.SocketChannel;
        /**
         *
         */
        public class MyClient {
            public static void main(String[] args) throws Exception {
                SocketChannel sc = SocketChannel.open();
                InetSocketAddress addr = new InetSocketAddress("localhost", 8888);
                sc.connect(addr);
                while (true) {
                    ByteBuffer buf = ByteBuffer.wrap("hello world".getBytes());
                    sc.write(buf);
                    Thread.sleep(1000);
                }
            }
        }


线程池
-------------------------------
    1、简介
        池化设计模式,通过容器维护固定数量的线程,使用
        固定线程数实现任务的执行。

    2、Executors
        创建线程池的工厂类,提供很多工具方法。
        常用的Executors.newFixedThreadPool(3);
        执行任务时,将任务提交到任务队列,在将来的某个
        时刻调度执行。 

    3、ThreadPoolExecutor
        线程池执行器,优势一是提升异步执行大量任务的性能,线程池维护
        了基本统计信息,比如完成的线程数。
        线程池有corePoolSize和maximumPoolSize.
        如果线程数少于该值,创建新线程执行任务。
        如果线程数大于core数但小于最大数,只有队列满时创建新线程。
        意味着在队列没有满的时候,如果线程数超过核心数,则尽量使用
        现有线程执行这些任务。
        如果核心数据量和最大数量相同,则表示固定大小的线程池。

        如果最大值无界,则可以适应任意数量的并发度。

    4.AtomicInteger
        原子整数,内部封装了一个整数,确保对该整数的修改是原子性。
        //控制信号
        ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        控制信号控制的是池的状态,内部包含两个成分,
        workerCount        //线程数,最大值是2^29 - 1 ,  
        runState        //状态
        控制信号的前三位是状态信息,后面的29位是线程数信息。
        000 | 00000   0000 0000  0000 0000  0000 0000

        状态控制如下:
        [RUNNING]
        接受新任务,处理队列中的任务。
        -1 << COUNT_BITS;
        111 0..0

        [SHUTDOWN]
        不再接受新任务,但处理队列中任务。
        000 0..0

        [STOP]
        停止态,不接收新任务,不处理队列任务,终止执行的任务。
        马上整顿。
        001 0..0
        
        [TIDYING]
        所有任务都结束了,线程数归0,转换到该状态的线程调用terminated()方法。
        010 0..0

        [TERMINATED]
        terminated()方法执行结束。
        011 0..0

    5、同步执行问题
        线程池的工作原理是异步执行,执行任务时,仅仅是放入任务队列,将来的某个时刻
        来执行任务,如果想要同步执行,
        
            public class App {
                public static void main(String[] args) {
                    //创建任务对象
                    Runnable task = new Runnable() {
                        public void run() {
                            String name = Thread.currentThread().getName();
                            System.out.println(name + " : " + "hello world");
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    } ;

                    //新建固定线程池
                    ExecutorService pool = Executors.newFixedThreadPool(3);
                    for(int i = 0  ; i < 6 ; i ++){
                        //执行任务
                        Future f = pool.submit(task , 100);  //submit 和返回值调用的f.get实现同步执行
                        try {
                            Object obj = f.get();
                            System.out.println(obj);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }

    5.总结
        ThreadPoolExecutor内部有两个集合,一个是Workers集合,一个是WorkQueue任务
        队列,Worker内部关联一个Thread和Runnable对象。线程池执行任务时,要么添加
        新worker,要么添加任务到队列。如果添加新worker,直接启动worker关联的线程。
        新线程调用worker的runWorker()方法,该方法从队列中剪切任务来执行的。

        线程池的submit方法可以实现同步执行,因为有返回值Future对象,该对象包含执行
        结果,在没有计算完成前,无法得到该结果,只能等待。可以通过调用pool.submit(task , 100)
        来直接指定结果。execute方法提交就忘了,不能实现同步执行的。

 

以上是关于java-NIO编程和线程池的主要内容,如果未能解决你的问题,请参考以下文章

多线程编程

java-nio网络编程

Java-NIO:管道 (Pipe)

Java——线程池

Java线程池详解

Java线程池详解