Fork/Join框架
在JDK7后提供一套并行任务的框架,它可以把发大任务拆分成很多的小任务,汇总每个小任务的结果得到大任务的结果。
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务。
那么,为什么需要使用窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务粉笔放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。
比如A线程负责处理A队列里的任务,但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程的和窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
由两个类完成工作:
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoin类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。
- RecursiveAction:用于没有返回结果的任务
- RecursiveTask:用于有返回结果的任务
ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
Fork/Join有同步和异步两种方式。
举个例子:
青年高智商训练班开始招生了,此时来了5万名小伙伴参与报名筛选,但只有年龄在18~30岁之间,并且智商超过145的人才能通过。为了提高效率,每位老师最多测试不超过100名参与者,最终将通过人数进行统计。
// 参与者 public class Young { private Integer age; private String name; private Integer iq; public Young(Integer age, String name, Integer iq) { this.age = age; this.name = name; this.iq = iq; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getIq() { return iq; } public void setIq(Integer iq) { this.iq = iq; } }
// 筛选接口 public interface Pick<T> { boolean pick(T person); }
// 考题 public class Exam implements Pick<Young> { @Override public boolean pick(Young person) { if (30 > person.getAge() && 18 < person.getAge() && 145 < person.getIq()) { return true; } return false; } }
// 监考老师 public class Teacher extends RecursiveTask<Integer> { private final Integer THRESHOLD = 100; private List<Young> youngs; private Integer fromIdx; private Integer toIdx; private Pick pick; private Integer count = 0; public Teacher(List<Young> youngs, Integer fromIdx, Integer toIdx, Pick pick) { this.youngs = youngs; this.fromIdx = fromIdx; this.toIdx = toIdx; this.pick = pick; } @Override protected Integer compute() { if (this.toIdx - this.fromIdx < this.THRESHOLD) { for (int i = this.fromIdx; i < this.toIdx; i ++) { if (this.pick.pick(this.youngs.get(i))) { this.count ++; } } return this.count; } else { Integer mid = (this.toIdx - this.fromIdx) / 2; Teacher teacher1 = new Teacher(this.youngs, this.fromIdx, this.fromIdx + mid, this.pick); Teacher teacher2 = new Teacher(this.youngs, this.fromIdx + mid, this.toIdx, this.pick); this.invokeAll(teacher1, teacher2); return teacher1.join() + teacher2.join(); } } }
public class Test { private static Integer COUNT = 50000; // 所有参与者 public static List<Young> makeData() { List<Young> list = new LinkedList<>(); Random rand = new Random(); Exam exam = new Exam(); Young young; int cnt = 0; for (int i = 1; i <= COUNT; i ++) { young = new Young(rand.nextInt(50), "参与者" + i, rand.nextInt(100) + 100); list.add(young); if (exam.pick(young)) { cnt ++; } } System.out.println("共有" + cnt + "参与者符合条件"); return list; } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); Teacher teacher = new Teacher(makeData(), 0, COUNT - 1, new Exam()); pool.invoke(teacher); System.out.println("Fork/Join计算结果:" + teacher.join()); } }
输出:
共有6037参与者符合条件 Fork/Join计算结果:6037
可以看出两次结果是一致的。