线程基础:JDK1.5+——线程新特性(中)

Posted llguanli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程基础:JDK1.5+——线程新特性(中)相关的知识,希望对你有一定的参考价值。

(接上文《线程基础:JDK1.5+(8)——线程新特性(上)》)

3、工作在多线程环境下的“计数器”:

从这个小节開始,我们将以一个“赛跑”的样例。解说JDK1.5环境下一些线程控制工具(包含Semaphore、CountDownLatch和java.util.concurrent.atomic子包),而且复习这个专题讲到的知识点:同步快、锁、线程池、BlockingQueue、Callable等。

3-1、 赛跑比赛的需求

如今您不仅能够通过我们已经介绍的知识点,实现对100米田径比赛的初赛和决赛的模拟,然后发布出比赛的冠亚季军。还能够基于这些知识,模拟机场T1和T2跑道的起降工作。这里我们一起实现前者的需求,首先来看看100米田径比赛的业务需求:

技术分享

  1. 选手要參加比赛,首先就要报名。

    为了使功能足够简单,參赛选手的基本仅仅包含:姓名、起跑指数(最低速度)、參赛号三个信息。

  2. 同一个选手的状态不稳定性。也就是说某一个选手,在初赛阶段的速度可能是A,可是决赛阶段由于发挥失常,可能速度就变成了B。而这一切都是随机进行的

  3. 选手们首先进行“初赛”。全部选手的“初赛”成绩将进行汇总。成绩最好的5名选手。将參加“决赛”。“决赛”成绩最好的三名选手,将分别获得冠亚季军。并发布出来。

  4. 比赛场地仅仅有一个,总共同拥有5条跑道可供使用。

    所以不管是“初赛”还是“决赛”。同一时间參加比赛的选手都不能超过5名。

3-1-1、基本类:Player选手类

本小节兴许的内容中,我们将对跑步比赛的实现代码进行多次更改优化,可是不管实现代码怎样变化,有几个基本的模型是不会变化的:选手描写叙述和比赛结果描写叙述。

选手除了名字、參赛编号的描写叙述外,另一个“最低速度”的描写叙述,这是为了保证不管这个选手跑多少次。其状态都不会太过失常。

“最低速度”是在创建选手时,系统随机生成的。

下面是Player选手类的定义代码:

package test.thread.track;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;

/**
 * 这就是一个选手。
 * 为了简单起见,我们仅仅记录这个选手名字、选手编号、最低速度(创建时随机生成)。

* 当然。最为一名选手,最重要的工作就是“跑步” * @author yinwenjie */ public class Player implements Callable<Result> , Comparable<Player>{ /** * 选手编号 */ private int number; /** * 选手的名字 */ private String name; /** * 最低速度 */ private float minSpeed; /** * 本次比赛结果 */ private Result result; /** * 跑道 */ private Semaphore runway; public Player(String name , int number , Semaphore runway) { this.name = name; this.number = number; this.runway = runway; // 这个最低速度设置是 8米/秒(否则就真是‘龟速’了) this.minSpeed = 8f; } /* (non-Javadoc) * @see java.util.concurrent.Callable#call() */ @Override public Result call() throws Exception { try { // 申请上跑道 this.runway.acquire(); return this.doRun(); } catch(Exception e) { e.printStackTrace(System.out); } finally { // 都要进入初赛结果排序(中途退赛的成绩就为0) this.runway.release(); } // 假设运行到这里,说明异常发生了 this.result = new Result(Float.MAX_VALUE); return result; } /** * 開始跑步 * @return * @throws Exception */ private Result doRun() throws Exception { /* * 为了表现一个选手每一次跑步都有不同的状态(可是都不会低于其最低状态), * 所以每一次跑步,系统都会为这个选手分配一个即时速度。 * * 这个即时速度不会低于其最小速度。可是也不会高于 14米/秒(否则就是‘超人’咯) * */ // 生成即时速度 float presentSpeed = 0f; presentSpeed = this.minSpeed * (1.0f + new Random().nextFloat()); if(presentSpeed > 14f) { presentSpeed = 14f; } // 计算跑步结果(BigDecimal的使用可自行查阅资料) BigDecimal calculation = new BigDecimal(100).divide(new BigDecimal(presentSpeed) , 3, RoundingMode.HALF_UP); float presentTime = calculation.floatValue(); // 让线程等待presentSpeed的时间。模拟该选手跑步的过程 synchronized (this) { this.wait((long)(presentTime * 1000f)); } // 返回跑步结果 this.result = new Result(presentTime); return result; } /** * @return the result */ public Result getResult() { return result; } /** * @return the number */ public int getNumber() { return number; } /** * @return the name */ public String getName() { return name; } /* (non-Javadoc) * @see java.lang.Comparable#compareTo(java.lang.Object) */ @Override public int compareTo(Player o) { /* * 两个选手间。还能够通过他们的result进行比較 * 耗时越小,当然越靠前 * */ Result myReslut = this.getResult(); Result targetReslut = o.getResult(); // 假设出现了reslut为null或者targetReslut为null,说明比赛结果出现了问题 // 当然假设真的出现这种问题,最可能的选手中途退赛了 if(myReslut == null) { return 1; } if(targetReslut == null) { return -1; } // 耗时越少的选手,当然应该排在“成绩”队列的越前面 if(myReslut.getTime() < targetReslut.getTime()) { return -1; } else { return 1; } } }

为什么Player选手类要实现Comparable接口呢?在实现代码中,我将使用PriorityBlockingQueue队列,将选手根据其比赛成绩进行排序。为了能够保证PriorityBlockingQueue队列能够正常排序。所以须要实现该接口。

当然有的读者会说,实现Comparable接口后,使用普通的List也能够排序。可是List接口的实现类(ArrayList、LinkedList、Vector等等)并非线程安全的。它们经常使用的处理场景还是在某一个线程内进行数据线性化处理时使用。

而就眼下我们的场景来看。程序猿根本就不知道某一个选手什么时候能够跑完100米,而且多个选手跑步的处理结果都将随机的送入队列。所以保证线程安全性是需求实现中重要的一部分

当然。假设您硬是要使用传统的List也行。

能能够通过JDK提供的“同步包装器”(Collections.synchronizedList)将它变成线程安全的。但这个问题不是本小节讨论的范围。

另外。做为一个选手来说。最根本的功能就是“跑”这个动作。而且根据需求,非常明显我们须要在选手“跑完后”知道“跑”的成绩。

所以我们还须要Player类实现Callable接口,以便让选手能够跑起来。

为了模拟跑的过程和选手的状态有关,代码中使用随机数确定本次选手“跑”的速度。可是这个速度不会低于选手的“最低速度”(眼下给定的是14秒)。

3-1-2、基本类:Result比赛结果

另外一个不会变动的基本类就是Result成绩:

package test.thread.track;

/**
 * 选手某一次跑步的成绩
 * @author yinwenjie
 *
 */
public class Result {
    /**
     * 记录了本次赛跑的用时情况
     */
    private float time;

    public Result(float time) {
        this.time = time;
    }

    /**
     * @return the time
     */
    public float getTime() {
        return time;
    }

    /**
     * @param time the time to set
     */
    public void setTime(float time) {
        this.time = time;
    }
}

每一次选手“跑”的成绩都是不一样的。成绩中仅仅包含一个属性。就是跑完100米的用时情况。

3-2、Semaphore:信号量

3-2-1、基本使用

Semaphore信号量,是concurrent包的一个重要工具类。它通过申请和回收“证书”。实现多个线程对同一资源的訪问控制。详细的做法是,某个线程在訪问某个(可能出现资源抢占的)资源的时候,首先向Semaphore对象申请“证书”,假设没有拿到“证书”就一直堵塞;当拿到“证书”后。线程就解除堵塞状态,然后訪问资源;在完毕资源操作后,再向Semaphore对象归还“证书”;让我们先来看看Semaphore信号的简单演示样例:

package test.thread.semaphore;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {

    public static void main(String[] args) throws Throwable {
         new SemaphoreTest().doTest();
    }

    public void doTest() {
        Semaphore semp = new Semaphore(5 , false);

        // 我们创建10个线程,并通过0-9的index进行编号
        for(int index = 0 ; index < 10 ; index++) {
            Thread semaphoreThread = new Thread(new SemaphoreRunnableNonfair(semp , index));
            semaphoreThread.start();
        }
    }

    /**
     * 測试Semaphore的非公平模式
     * @author yinwenjie
     */
    private static class SemaphoreRunnableNonfair implements Runnable {

        private Semaphore semp;

        /**
         * 编号
         */
        private Integer index;

        public SemaphoreRunnableNonfair(Semaphore semp , Integer index) {
            this.semp = semp;
            this.index = index;
        }

        @Override
        public void run() {
            try {
                System.out.println("线程" + this.index + "等待信号。

。。。

。。"); this.semp.acquire(); // 停止一段时间,模拟业务处理过程 synchronized(this) { System.out.println("index 为 " + this.index + " 的线程,获得信号。開始处理业务"); this.wait(5000); } } catch (InterruptedException e) { e.printStackTrace(System.out); } finally { // 最后都要释放这个信号/证书 this.semp.release(); } } } }

以上代码我们创建了10个线程。分别编号为0-9(这里我们没有使用Thread自带的id,主要还是为了读者能够看得清楚)。Semaphore信号量对象中。我们放置了5个“证书”,也就是说最多同一时候能够有5个线程进行业务处理。处理完毕后向线程向Semaphore信号对象归还“证书”。以上代码的处理结果,可能例如以下图所看到的(注意。是“可能”):

线程0等待信号。。

。。。。

线程2等待信号。。

。。

index2 的线程。获得信号,開始处理业务 index0 的线程,获得信号,開始处理业务 线程3等待信号。

。。

。。。 index3 的线程。获得信号,開始处理业务 线程4等待信号。。。。。。 index4 的线程,获得信号,開始处理业务 线程5等待信号。

。。。

。。 index5 的线程,获得信号,開始处理业务 线程7等待信号。。。。。。 线程8等待信号。。

。。

。。 线程6等待信号。

。。。

。。 线程9等待信号。。。。

。。 线程1等待信号。。。。。。 index8 的线程,获得信号。開始处理业务 index7 的线程。获得信号,開始处理业务 index6 的线程,获得信号,開始处理业务 index9 的线程。获得信号。開始处理业务 index1 的线程。获得信号。開始处理业务

3-2-2、Semaphore的基本操作方式

为了方便读者查阅,这里我们列举了Semaphore中经常使用的操作方式

  • 申请/获取证书:

    void acquire():从此信号量获取一个许可,在Semaphore能够提供一个许可前。当前线程将一直堵塞等待。

    假设在等待过程中。当前线程收到了interrupt信号,那么将抛出InterruptedException异常。

    void acquire(permits):从此信号量获取permits个许可,在Semaphore能够提供permits个许可前,当前线程将一直堵塞等待。

    假设在等待过程中。当前线程收到了interrupt信号,那么将抛出InterruptedException异常。

    void acquireUninterruptibly():从此信号量获取一个许可,在Semaphore能够提供一个许可前,当前线程将一直堵塞等待。使用这种方法获取许可时。不会受到线程interrupt信号的影响。

    void acquireUninterruptibly(permits):从此信号量获取permits个许可。在Semaphore能够提供permits个许可前,当前线程将一直堵塞等待。使用这种方法获取许可时,不会受到线程interrupt信号的影响。

    boolean tryAcquire():从此信号量获取一个许可,假设无法获取,线程并不会堵塞在这里。

    假设获取到了许可,则返回true,其它情况返回false。

    boolean tryAcquire(permits):从此信号量获取permits个许可,假设无法获取,线程并不会堵塞在这里。

    假设获取到了许可,则返回true,其它情况返回false。

    boolean tryAcquire(int permits, long timeout, TimeUnit unit):从此信号量获取permits个许可,假设无法获取,则当前线程等待设定的时间。假设超过等待时间后。还是没有拿到许可。则解除等待继续运行。

    假设获取到了许可,则返回true,其它情况返回false。

  • 证书状态:

    int availablePermits():返回此信号量中当前可用的许可数。

    int getQueueLength():返回正在等待获取的线程的预计数目。该值仅是预计的数字,由于在此方法遍历内部数据结构的同一时候。线程的数目可能动态地变化。此方法用于监视系统状态,不用于同步控制。

    boolean hasQueuedThreads():查询是否有线程正在等待获取。注意,由于同一时候可能发生取消,所以返回 true 并不保证有其它线程等待获取许可。此方法主要用于监视系统状态。

    boolean isFair():假设此信号量的公平设置为 true,则返回 true。

  • 释放/返还证书:

    void release():释放一个许可,将其返回给信号量。

    最好将这种方法的调用,放置在finally程序块中运行。

    void release(permits):释放给定数目的许可,将其返回到信号量。

    最好将这种方法的调用,放置在finally程序块中运行。

  • fair:公平与非公平

    Semaphore一共同拥有两个构造函数,各自是:Semaphore(int permits)和Semaphore(int permits, boolean fair);permits是指由Semaphore信号量控制的“证书”数量。

    fair參数是设置这个信号量对象的工作方式。

当fair參数为true时,信号量将以“公平方式”运行。

即首先申请证书,并进入堵塞状态的线程,将有权利首先获取到证书。当fair參数为false时。信号量对象将不会保证“先来先得”。默认情况下。Semaphore採用“非公平”模式运行。

3-2-3、实现比赛场景

在介绍了Semaphore的使用方式后,如今我们就要将Semaphore增加“赛跑比赛”的代码实现中。

非常显然Semaphore在我们需求中的应用任务是:给选手使用“跑道”的证书/权利,以便让选手“跑步”,而且在选手使用完跑道后,回收跑道的使用证书/权利,给下一位选手。

......
// 这就是跑道。需求上说了仅仅有5条跑道。所以仅仅有5个permits。
Semaphore runway = new Semaphore(5);
......

这个代码片段控制着全部选手的跑步动作:仅仅有在获得跑道的使用权限后,才干运行“跑步”动作。

3-2-4、关键的一个问题

  • 什么情况下视为“初赛”、“决赛”完毕?

    那么最直观的描写叙述就是:全部报名的选手都完毕了跑步过程(中途退赛也算),才干算“初赛”完毕;“初赛”排名最靠前的前5名选手都完毕了跑步过程(中途退赛也算)才算是“决赛”完毕。

    假设没有完毕“初赛”,那么比赛进程就必须停在那里。直到“初赛”过程完毕;假设没有完毕“决赛”过程,比赛进程就必须停在那里,知道“决赛”完毕:

......
//! 仅仅有当PLAYERNAMES.length位选手的成绩都产生了,才干进入决赛。这非常重要
synchronized (this.preliminaries) {
    while(this.preliminaries.size() < OneTrack.PLAYERNAMES.length) {
        try {
            this.preliminaries.wait();
        } catch(InterruptedException e) {
            e.printStackTrace(System.out);
        }
    }
}
......
//! 仅仅有当5位选手的决赛成绩都产生了,才干到下一步:发布成绩
synchronized (this.finals) {
    while(this.finals.size() < 5) {
        try {
            this.finals.wait();
        } catch(InterruptedException e) {
            e.printStackTrace(System.out);
        }
    }
}
......
  • 怎么监控某一个选手。是否完毕了跑步过程?

在我们定义的Player选手类中,已经实现了Callable接口,而且将会在运行完毕后。返回Result结果信息。所以看选手是否完毕了跑步过程,仅仅须要监控Player的Future就能够了。

可是监控Player的Future可不能在100米比赛的主线程上进行。否则就会出现上一个选手没有跑完就不能启动下一个选手的跑步线程的情况

所以我们须要为每个选手都创建一个“监控线程”FutureThread:

/**
 * 这是计分线程,是为了保证产生比赛结果后,在计入PriorityBlockingQueue
 * 这样才有排列成绩的根据
 * @author yinwenjie
 *
 */
private class FutureThread extends Thread {

    /**
     * 选手跑步任务(Player)的运行状态对象
     */
    private Future<Result> future;

    /**
     * 跑步成绩出来后,须要操作的队列
     * (要将相应的选手增加到队列,以便根据成绩进行排序)
     */
    private PriorityBlockingQueue<Player> achievementQueue;

    /**
     * 当前进行跑步的选手
     */
    private Player player;

    public FutureThread(Future<Result> future , Player player , PriorityBlockingQueue<Player> achievementQueue) {
        this.future = future;
        this.player = player;
        this.achievementQueue = achievementQueue;
    }

    /* (non-Javadoc)
     * @see java.lang.Thread#run()
     */
    @Override
    public void run() {
        // 假设条件成立,最有可能的就是选手在比赛过程中,
        // 由于某种原因退赛了!

if(this.future == null) { System.out.println("选手退赛,计分为0"); } else { try { // 假设选手没有跑完。FutureThread将堵塞在这里 // 当然出现跑步过程中退赛。就会抛出异常 this.future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 运行到这里,就说明这个选手跑完了(或者退赛了) // 不管什么情况,都计入队列,然后通知主线程 this.achievementQueue.put(this.player); synchronized (this.achievementQueue) { this.achievementQueue.notify(); } } }

这样,每个选手在跑步过程中,就会有两个线程:一个用来跑步的线程:Player-Callable;另一个用来监控跑步情况。并操作成绩队列的线程:FutureThread。

3-3、完整的比赛代码

实现代码中基本的问题都攻克了,如今我们能够给出完毕的实现代码了(注意,之前已经给出的代码,就不在赘述了):

package test.thread.track;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;


/**
 * 这是第一个比赛程序。
 * @author yinwenjie
 *
 */
public class OneTrack {

    private static final String[] PLAYERNAMES = new String[]{"白银圣斗士","黄金圣斗士"
        ,"青铜圣斗士","神斗士","冥斗士","哈迪斯","龟仙人","孙悟空","孙悟饭","贝吉塔","孙悟天"};

    /**
     * 报名队列(非线程安全)
     */
    private List<Player> signupPlayers = new LinkedList<Player>();

    /**
     * 初赛结果队列(有排序功能。且线程安全)
     */
    private PriorityBlockingQueue<Player> preliminaries = new PriorityBlockingQueue<Player>();

    /**
     * 决赛结果队列(有排序功能。且线程安全)
     */
    private PriorityBlockingQueue<Player> finals = new PriorityBlockingQueue<Player>();

    public void track() {
        /*
         * 赛跑分为下面几个阶段进行;
         * 
         * 1、报名
         * 2、初赛。10名选手,分成两组,每组5名选手。
         * 分两次进行初赛(由于场地仅仅有5条赛道,仅仅有拿到进场许可的才干使用赛道,进行比赛)
         * 
         * 3、决赛:初赛结果将被写入到一个队列中进行排序。仅仅有成绩最好的前五名选手。能够參加决赛。

* * 4、决赛结果的前三名将分别作为冠亚季军被发布出来 * */ //1、================报名 // 这就是跑道。需求上说了仅仅有5条跑道,所以仅仅有5个permits。 Semaphore runway = new Semaphore(5); this.signupPlayers.clear(); for(int index = 0 ; index < OneTrack.PLAYERNAMES.length ; ) { Player player = new Player(OneTrack.PLAYERNAMES[index], ++index , runway); this.signupPlayers.add(player); } //2、================进行初赛 // 这是裁判 ExecutorService refereeService = Executors.newFixedThreadPool(5); for (final Player player : this.signupPlayers) { Future<Result> future = null; future = refereeService.submit(player); new FutureThread(future, player, this.preliminaries).start(); } //! 仅仅有当PLAYERNAMES.length位选手的成绩都产生了,才干进入决赛,这非常重要 synchronized (this.preliminaries) { while(this.preliminaries.size() < OneTrack.PLAYERNAMES.length) { try { this.preliminaries.wait(); } catch(InterruptedException e) { e.printStackTrace(System.out); } } } // 3、============决赛(仅仅有初赛结果的前5名能够參见) for(int index = 0 ; index < 5 ; index++) { Player player = this.preliminaries.poll(); Future<Result> future = null; future = refereeService.submit(player); new FutureThread(future, player, this.finals).start(); } //! 仅仅有当5位选手的决赛成绩都产生了,才干到下一步:发布成绩 synchronized (this.finals) { while(this.finals.size() < 5) { try { this.finals.wait(); } catch(InterruptedException e) { e.printStackTrace(System.out); } } } // 4、============发布决赛成绩(前三名) for(int index = 0 ; index < 3 ; index++) { Player player = this.finals.poll(); switch (index) { case 0: System.out.println("第一名:" + player.getName() + "[" + player.getNumber() + "],成绩:" + player.getResult().getTime() + "秒"); break; case 1: System.out.println("第二名:" + player.getName() + "[" + player.getNumber() + "]。成绩:" + player.getResult().getTime() + "秒"); break; case 2: System.out.println("第三名:" + player.getName() + "[" + player.getNumber() + "]。成绩:" + player.getResult().getTime() + "秒"); break; default: break; } } } public static void main(String[] args) throws RuntimeException { new OneTrack().track(); } //......这里是FutureThread的代码。上面已给出了 }

下面是可能的运行结果。

“可能的运行结果”那是由于结果全然是随机的,您的运行结果可能和我给出的不一样:

第一名:龟仙人[7],成绩:7.143秒
第二名:白银圣斗士[1],成绩:7.477秒
第三名:哈迪斯[6],成绩:7.531

(接下文:CountDownLatch同步器、java.util.concurrent.atomic子包)

以上是关于线程基础:JDK1.5+——线程新特性(中)的主要内容,如果未能解决你的问题,请参考以下文章

线程基础:JDK1.5+——线程新特性(中)

多线程(JDK1.5的新特性互斥锁)(掌握)

JAVA-初步认识-第十四章-线程间通信-多生产者多消费者问题-JDK1.5新特性解决办法-范例

JAVA-初步认识-第十四章-线程间通信-多生产者多消费者问题-JDK1.5新特性-Condition

线程池

jdk 1.5 1.6 1.7 加入新特性