并发编程—2并发工具类

Posted codetree

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程—2并发工具类相关的知识,希望对你有一定的参考价值。

2.线程的工具类

2.1 fork/join框架

### 什么是分而治之
    简单地说把一个大的问题,拆分成若干个子问题,每个问题相互独立,且和原来问题形式相同。最后将每个子问题的解合并得到原问题的解答。
### 什么是工作密取
### 举例
 带参数继承RecursiveTask<V> 
/**
 * fork/join 使用 情况1:带返回值,这时候要继承RecursiveTask<V> 
 * 情况2:不带返回值,这时候,要继承RecursiveAction
 * 
 * @author 45027056
 *
 */
public class SumArray2 extends RecursiveTask<Integer> {
    private int beginIndex;
    private int endIndex;
    private int[] src;
    //将这个计数任务拆分成10个子任务
    private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        int[] makeArray = MakeArray.makeArray();
        SumArray2 task = new SumArray2(0, makeArray.length - 1, makeArray);
        long start = System.currentTimeMillis();
        pool.invoke(task);// 同步调用
        System.out.println("Task is Running.....");
        System.out.println("The count is " + task.join() + " spend time:" + (System.currentTimeMillis() - start) + "ms");

    }

    //构造方法指定参数
    public SumArray2(int begin, int end, int[] makeArray) {
        this.beginIndex = begin;
        this.endIndex = end;
        this.src = makeArray;
    }

    @Override
    protected Integer compute() {
        if (endIndex - beginIndex <= THRESHOLD) {
            int sum = 0;
            for (int i : src) {
                sum = sum + i;
            }
            return sum;
        } else {
            int mid = (endIndex + beginIndex) / 2;
            // 创建任务1
            SumArray2 task1 = new SumArray2(beginIndex, mid, src);
            // 创建任务2
            SumArray2 task2 = new SumArray2(mid + 1, endIndex, src);
            // 调用2个子任务
            invokeAll(task1, task2);
            // 合并结果返回给主任务
            return task1.join() + task2.join();
        }
    }

}
### 举例2  不带参数继承RecursiveAction
    

2.2 CountDownLatch

> 一组线程等待另外一组线程执行完后再执行。通过CountDownLatch的构造函数指定条件,coutDownLatch.wait()阻塞一个或者一组线程,当coutDownLatch.countDown()减至0时,被阻塞的线程才可以运行。

一般用法

    /**
 * 
 * @author 45027056
 * 演示countDownLatch用法
 * 类说明:演示CountDownLatch,有5个初始化的线程,5个扣除点,
 * 扣除完毕以后,主线程和业务线程才能继续自己的工作
 */
public class UsCountDownLatch2 {
    //1。CountDownLatch构造函数必须指定一个整数N作为入参,且N必须大于0。
    //每次latch.countDown()方法,会扣减1,直到N为0,其他latch.await()的线程才会继续运行。
    private static CountDownLatch latch = new CountDownLatch(5);
    public static void main(String[] args) {
        UsCountDownLatch2 us = new UsCountDownLatch2();
        us.startInit();
        us.startBusiThread();
        try {
            //一直阻塞直到CountDownLatch(0)
            latch.await();
            System.out.println("Main Thread is doing itis working ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    //初始化线程
    public void startInit(){
        for(int i=0; i < 5; i++){
            new Thread(){
                @Override
                public void run() {
                    System.out.println("init thread working");
                    latch.countDown();
                }
            }.start();
            
        }
    }
    
    //业务线程
    public void startBusiThread(){
        new Thread(){
            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("startBusiThread is doing its working");
                
            }
        }.start();
    }
}
    

2.3 CycliBarrier

> 和CountDown不同,CycliBarrier是一个同步工具,用于控制一组线程,当且进当所有这组线程本身到达了栅栏点,await()的线程才可以继续运行下去(A synchronization aid that allows a set of threads to all wait for
> 常用方法: await()/getNumberWaiting()
### 使用例子
        /**
 * 演示如何使用UseCyclicBarrier
 *      1.CyclicBarrier可以指定当线程都到到达栅栏点的时候,执行自定义的线程。通过CyclicBarrier构造函数的第二个入参指定。
 *      2。注意:先执行CyclicBarrier指定的BarrierActionThread,再执行其他await()线程
 * @author 45027056
 *
 */
public class UseCyclicBarrier2 {
    CyclicBarrier barrier = new CyclicBarrier(5,new BarrierActionThread());
    private ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<String,Long>();
    public static void main(String[] args) {
        UseCyclicBarrier2 useBarrier = new UseCyclicBarrier2();
        for(int i=0; i < 6; i++){
            useBarrier.new SubThread().start();
        }
        //主线程sleep 2秒
        SleepTools.second(2);
        System.out.println("Main thread is end..");
    }
    //栅栏点都到达后执行的线程
    class BarrierActionThread implements Runnable{
        @Override
        public void run() {
            System.out.println("BarrierActionThread is running ...");
        }
    }
    
    //栅栏子线程
    class SubThread extends Thread{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "is await...");
            try {
                barrier.await();
                resultMap.put(Thread.currentThread().getName(), Thread.currentThread().getId());
                System.out.println(Thread.currentThread().getName() + "is free...and doing its wrok...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            
        }
    }
}

2.4 Semaphore

> 一个一般用来限流的并发工具类
### 使用用例
    /**
 * 演示Semaphore使用实现链接池
 * 可以理解Semaphore是用来限流的工具。控制某种资源(连接数)保持在某个范围内。
 * @author 45027056
 *
 */
public class DBPoolSemaphore2 {
    LinkedList<Connection> pools = new LinkedList<Connection>();
    Semaphore idle = new Semaphore(10);//空闲信号量
    Semaphore inuse = new Semaphore(0);//在用信号量
    public DBPoolSemaphore2(int poolSize){
        if(poolSize > 0){
            for(int i=0; i < poolSize; i++){
                pools.addFirst(SqlConnectImpl.fetchConnection());
            }
        } else {
            System.out.println("poolSize must greater than 0");
        }
    }
    
    public void returnConnect(Connection connection) throws InterruptedException {
        if(null != connection){
            //阻塞直到从inuse中获取到信号量。
            inuse.acquire();
            synchronized (pools) {
                pools.addFirst(connection);
                pools.notifyAll();
            }
            //释放一个信号量返回idle
            idle.release();
        }
    }
    
    public Connection takeConnect() throws InterruptedException {
        //阻塞直到从idle中获取到信号量。
        idle.acquire();
        Connection conn;
        synchronized (pools) {
            conn = pools.removeFirst();
        }
        //释放一个信号量返回inuse
        inuse.release();
        return conn;
    }
}
    

2.5 Exchange

> 用于2个线程交换数据.注意只局限2个线程之间交换数据。

使用举例

    public class UseExchange {
    private static final Exchanger<Set<String>> exchange 
        = new Exchanger<Set<String>>();
    public static void main(String[] args) {
        //第一个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setA = new HashSet<String>();//存放数据的容器
                try {
                    /*添加数据
                     * set.add(.....)
                     * */
                    setA = exchange.exchange(setA);//交换set
                    /*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

      //第二个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setB = new HashSet<String>();//存放数据的容器
                try {
                    /*添加数据
                     * set.add(.....)
                     * set.add(.....)
                     * */
                    setB = exchange.exchange(setB);//交换set
                    /*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

    }
}

2.6 Callable Future and FutureTask

/**
 * Callable FutureTask的用法
 * 1.Callable和Runnable接口的区别
 *      + Callable有返回值,Runnable没有返回值
 *      + Callable 可以抛出异常,Runnable不可以。
 * @author 45027056
 *
 */
public class UserFuture2 implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("user Callable begin");
        int sum = 0;
        for(int i=0;i < 5000;i++){
            sum = sum + i;
        }
        try {
            Thread.currentThread().sleep(2000);
        } catch (InterruptedException e) {
            System.out.println("have InterruptedException..");
        }
        
        System.out.println("user Callable end");
        return sum;
    }
    
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        UserFuture2 ueseFuture = new UserFuture2();
        //FutureTask 本质是一个Runnable,用来包装Callable,然后投放到线程中去执行。
        FutureTask<Integer> futureTask = new FutureTask(ueseFuture);
        new Thread(futureTask).start();
        SleepTools.second(1);
        Random random = new Random();
        //随机获得结果和中断。
        if(random.nextBoolean()){
            int sum = futureTask.get();
            System.out.println("the callable result sum is" + sum);
        } else {
            //如果这里调用能够cancel会中断call方法里面的Thread.currentThread.sleep(),这里会抛出一个InterruptedException
            futureTask.cancel(true); 
        }
    }
}

以上是关于并发编程—2并发工具类的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程-CountDownLatch

全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段

Java并发编程的艺术 -- 原子操作类和并发工具类(第七八章)

并发编程从零开始-同步工具类

Java并发编程系列- 线程的并发工具类

Java并发多线程编程——并发工具类CountDownLatch(线程计数器)