Java多线程06——JUC并发包02
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程06——JUC并发包02相关的知识,希望对你有一定的参考价值。
1 线程的同步工具类 CountDownLatch
CountDownLatch
同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
-
CountDownLatch
类是一个同步计数器,构造时传入int参数,该参数就是计数器的初始值,每调用一次countDown()
方法,计数器减 1,当计数器大于0时,await()
方法会阻塞当前线程继续执行。 - 由于调用了
countDown()
方法,所以在当前计数到达零之前,await()
方法会一直受阻塞。之后,会释放所有处于等待的线程,await()
方法之后的所有后续调用都将立即返回,这种现象只出现一次,计数无法被重置。一个线程或者多个,等待另外N个线程完成某个事情之后才能执行。
创建线程类
import java.util.concurrent.CountDownLatch;
public class UserThread1 extends Thread
private int sum1 = 0;
private CountDownLatch cd;
public UserThread1(CountDownLatch cd)
this.cd = cd;
@Override
public void run()
for(int i=0;i<=50;i++)
sum1 += i;
cd.countDown();
public int getSum1()
return sum1;
import java.util.concurrent.CountDownLatch;
public class UserThread2 extends Thread
private int sum2 = 0;
private CountDownLatch cd;
public UserThread2(CountDownLatch cd)
this.cd = cd;
@Override
public void run()
for(int i=51;i<=100;i++)
sum2 += i;
cd.countDown();
public int getSum2()
return sum2;
线程类中使用了 countDown()
方法,用于更新计数器。
创建测试类
import java.util.concurrent.CountDownLatch;
public class Test
public static void main(String[] args)
CountDownLatch cd = new CountDownLatch(2);
UserThread1 thread1 = new UserThread1(cd);
UserThread2 thread2 = new UserThread2(cd);
thread1.start();
thread2.start();
try
cd.await();
int sum = thread1.getSum1() + thread2.getSum2();
System.out.println("1~100 的和是:" + sum);
catch (InterruptedException e)
e.printStackTrace();
输出结果为:
1~100 的和是:5050
在测试类中调用了 await() 方法,用于阻塞当前线程,直到 CountDownLatch 初始化时的计数器变为0为止。
注意:
- CountDownLatch 的初始值,应当与所有线程当中的 countDown() 数量相等,
- 如果上述测试类初始化为1,初始值小于 countDown() 数量,即
CountDownLatch cd = new CountDownLatch(1);
运行程序后,输出结果可能为
1~100 的和是:1275
- 如果上述测试类初始化为3,初始值大于 countDown() 数量,即
CountDownLatch cd = new CountDownLatch(3);
运行程序后,程序将被挂起,无法退出阻塞状态。
2 线程的同步工具类 CyclicBarrier
CyclicBarrier
是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点(common barrier point)。类似于集合点。
因为该barrier在释放等待线程后可以重用,所以称它为循环的 barrier。
public class TimeCount
private int count1;
private int count2;
private int sum;
public int getCount1()
return count1;
public void setCount1(int count1)
this.count1 = count1;
public int getCount2()
return count2;
public void setCount2(int count2)
this.count2 = count2;
public int getSum()
return this.count1 + this.count2;
public void setSum(int sum)
this.sum = sum;
import java.util.concurrent.CyclicBarrier;
public class UserRunn implements Runnable
private TimeCount tc;
private String name;
private CyclicBarrier cyclicBarrier;
public UserRunn(TimeCount tc, String name, CyclicBarrier cyclicBarrier)
this.tc = tc;
this.name = name;
this.cyclicBarrier = cyclicBarrier;
@Override
public void run()
if(name.equals("爬虫功能"))
try
Thread.sleep(4000);
tc.setCount1(4000);
catch (InterruptedException e)
e.printStackTrace();
else if(name.equals("发送邮件功能"))
try
Thread.sleep(2000);
tc.setCount2(2000);
catch (InterruptedException e)
e.printStackTrace();
//阻塞点,等所有线程运行完毕,自动解锁
try
cyclicBarrier.await();
System.out.println("------------" + name + " end-----------");
catch (Exception e)
e.printStackTrace();
import java.util.concurrent.CyclicBarrier;
public class Test
public static void main(String[] args)
TimeCount tc = new TimeCount();
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable()
@Override
public void run()
int sum = tc.getSum();
System.out.println("功能总耗时:" + sum);
);
new Thread(new UserRunn(tc, "爬虫功能", cyclicBarrier)).start();
new Thread(new UserRunn(tc, "发送邮件功能", cyclicBarrier)).start();
3 线程的同步工具类 Semaphore
Semaphore
是一个计数信号量,它的本质是一个共享锁,是基于AQS实现的,通过state变量来实现共享。
通过调用 acquire 方法,对 state 值减去一,当调用 release 的时候,对 state 值加一。
当 state 变量小0时,在AQS队列中阻塞等待。
import java.util.concurrent.Semaphore;
public class Address
private int num;
//通过引入Semaphore,实现停车场限流
private Semaphore semaphore;
public Address(int num)
this.num = num;
//设置最大可用的并行信号量
this.semaphore = new Semaphore(num);
public void autoCar()
try
//加锁
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "进入停车场");
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "离开停车场");
//释放锁
semaphore.release();
catch (InterruptedException e)
e.printStackTrace();
public class Car extends Thread
private Address address;
public Car(Address address)
this.address = address;
@Override
public void run()
address.autoCar();
public class Test
public static void main(String[] args)
Address address = new Address(2);
for(int i=0;i<5;i++)
new Car(address).start();
4 线程的交换类 Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类,Exchanger用于进行线程间的数据交换。
它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。
这两个线程通过 exchange() 方法交换数据。
如果第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange() 方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test
private static final Exchanger<String> exchanger = new Exchanger<>();
private static ExecutorService executorService = Executors.newFixedThreadPool(2);
public static void main(String[] args)
executorService.execute(new Runnable()
@Override
public void run()
String a = " A 银行转入";
System.out.println(Thread.currentThread().getName() + a);
try
String b = exchanger.exchange(a);
System.out.println(Thread.currentThread().getName() + b);
catch (InterruptedException e)
e.printStackTrace();
);
executorService.execute(new Runnable()
@Override
public void run()
String a = " B 银行转出";
System.out.println(Thread.currentThread().getName() + a);
try
String b = exchanger.exchange(a);
System.out.println(Thread.currentThread().getName() + b);
catch (InterruptedException e)
e.printStackTrace();
);
executorService.shutdown();
执行输出
pool-1-thread-1 A 银行转入
pool-1-thread-2 B 银行转出
pool-1-thread-2 A 银行转入
pool-1-thread-1 B 银行转出
5 线程的 Fork
-Join
机制
Fork/Join 框架是 JAVA7 提供的一个用于并行执行任务的框架,
是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
分治法:
把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解,得到原问题的解。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ContTask extends RecursiveTask<Integer>
private int start;
private int end;
//计算任务量的阈值
private static final int TASKSIZE = 30;
private static int count = 0;
public ContTask(int start, int end)
this.start = start;
this.end = end;
//重写compute方法,任务执行的主要计算
@Override
protected Integer compute()
int sum = 0;
System.out.println("开启线程进行计算" + count++);
boolean state = (end - start) <= TASKSIZE;
//如果小于等于任务的阈值
if(state)
//无需拆分任务计算
for(int i=start;i<=end;i++)
sum += i;
else
//进行拆分任务计算
System.out.println("这个任务需要进行拆分任务计算。。。" + Thread.currentThread().getName());
//分割成两个任务
int middle = (end + start) / 2;
ContTask contTask1 = new ContTask(start, middle);
ContTask contTask2 = new ContTask(middle+1, end);
//开启线程计算分布式任务
invokeAll(contTask1, contTask2);
//阻塞,直到任务完成或取消
Integer tasksum1 = contTask1.join();
Integer tasksum2 = contTask2.join();
//结果合并
sum = tasksum1 + tasksum2;
return sum;
public static void main(String[] args)
//分布式计算池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//初始化设置任务
ContTask contTask = new ContTask(1, 100);
//分布式计算任务,提交任务
ForkJoinTask forkJoinTask = forkJoinPool.submit(contTask);
//得到最终计算结果
try
System.out.println(forkJoinTask.get());
catch (InterruptedException e)
e.printStackTrace();
catch (ExecutionException e)
e.printStackTrace();
执行结果:
开启线程进行计算0
这个任务需要进行拆分任务计算。。。ForkJoinPool-1-worker-1
开启线程进行计算1
这个任务需要进行拆分任务计算。。。ForkJoinPool-1-worker-1
开启线程进行计算2
开启线程进行计算3
开启线程进行计算4
这个任务需要进行拆分任务计算。。。ForkJoinPool-1-worker-2
开启线程进行计算5
开启线程进行计算6
5050
以上是关于Java多线程06——JUC并发包02的主要内容,如果未能解决你的问题,请参考以下文章
Java并发包中CyclicBarrier的工作原理使用示例
举例详解 java.util.concurrent 并发包 4 种常见类
举例详解 java.util.concurrent 并发包 4 种常见类