java阻塞队列 线程同步合作

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java阻塞队列 线程同步合作相关的知识,希望对你有一定的参考价值。

参考技术A

  Queue接口与List Set同一级别 都是继承了Collection接口 LinkedList实现了Queue接口 Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果是Queue时 就完全只能访问Queue接口所定义的方法了 而不能直接访问 LinkedList的非Queue的方法) 以使得只有恰当的方法才可以使用 BlockingQueue 继承了Queue接口

  队列是一种数据结构.它有两个基本操作 在队列尾部加人一个元素 和从队列头部移除一个元素就是说 队列以一种先进先出的方式管理数据 如果你试图向一个已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索 将导致线程阻塞.在多线程进行合作时 阻塞队列是很有用的工具 工作者线程可以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们 队列会自动平衡负载 如果第一个线程集运行得比第二个慢 则第二个线程集在等待结果时就会阻塞 如果第一个线程集运行得快 那么它将等待第二个线程集赶上来 下表显示了jdk 中的阻塞队列的操作

  add        增加一个元索                     如果队列已满 则抛出一个IIIegaISlabEepeplian异常

  remove   移除并返回队列头部的元素    如果队列为空 则抛出一个NoSuchElementException异常

  element  返回队列头部的元素             如果队列为空 则抛出一个NoSuchElementException异常

  offer       添加一个元素并返回true       如果队列已满 则返回false

  poll         移除并返问队列头部的元素    如果队列为空 则返回null

  peek       返回队列头部的元素             如果队列为空 则返回null

  put         添加一个元素                      如果队列满 则阻塞

  take        移除并返回队列头部的元素     如果队列为空 则阻塞

  remove element offer poll peek 其实是属于Queue接口

  阻塞队列的操作可以根据它们的响应方式分为以下三类 aad removee和element操作在你试图为一个已满的队列增加元素或从空队列取得元素时抛出异常 当然 在多线程程序中 队列在任何时间都可能变成满的或空的 所以你可能想使用offer poll peek方法 这些方法在无法完成任务时只是给出一个出错示而不会抛出异常

  注意 poll和peek方法出错进返回null 因此 向队列中插入null值是不合法的

  还有带超时的offer和poll方法变种 例如 下面的调用

  boolean success = q offer(x TimeUnit MILLISECONDS);

  尝试在 毫秒内向队列尾部插入一个元素 如果成功 立即返回true 否则 当到达超时进 返回false 同样地 调用

  Object head = q poll( TimeUnit MILLISECONDS);

  如果在 毫秒内成功地移除了队列头元素 则立即返回头元素 否则在到达超时时 返回null

  最后 我们有阻塞操作put和take put方法在队列满时阻塞 take方法在队列空时阻塞

  ncurrent包提供了阻塞队列的 个变种 默认情况下 LinkedBlockingQueue的容量是没有上限的(说的不准确 在不指定时容量为Integer MAX_VALUE 不要然的话在put时怎么会受阻呢) 但是也可以选择指定其最大容量 它是基于链表的队列 此队列按 FIFO(先进先出)排序元素

  ArrayBlockingQueue在构造时需要指定容量 并可以选择是否需要公平性 如果公平参数被设置true 等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来达到这种公平性的 即等待时间最长的线程会先操作) 通常 公平性会使你在性能上付出代价 只有在的确非常需要的时候再使用它 它是基于数组的阻塞循环队列 此队列按 FIFO(先进先出)原则对元素进行排序

  PriorityBlockingQueue是一个带优先级的队列 而不是先进先出队列 元素按优先级顺序被移除 该队列也没有上限(看了一下源码 PriorityBlockingQueue是对PriorityQueue的再次包装 是基于堆数据结构的 而PriorityQueue是没有容量限制的 与ArrayList一样 所以在优先阻塞队列上put时是不会受阻的 虽然此队列逻辑上是无界的 但是由于资源被耗尽 所以试图执行添加操作可能会导致 OutOfMemoryError) 但是如果队列为空 那么取元素的操作take就会阻塞 所以它的检索操作take是受阻的 另外 往入该队列中的元素要具有比较能力

  最后 DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列 只有在延迟期满时才能从中提取元素 该队列的头部是延迟期满后保存时间最长的 Delayed 元素 如果延迟都还没有期满 则队列没有头部 并且poll将返回null 当一个元素的 getDelay(TimeUnit NANOSECONDS) 方法返回一个小于或等于零的值时 则出现期满 poll就以移除这个元素了 此队列不允许使用 null 元素 下面是延迟接口

  Java代码

  public interface Delayed extends Comparable<Delayed>

  long getDelay(TimeUnit unit);

  

  public interface Delayed extends Comparable<Delayed>

  long getDelay(TimeUnit unit);

  

  放入DelayQueue的元素还将要实现pareTo方法 DelayQueue使用这个来为元素排序

  下面的实例展示了如何使用阻塞队列来控制线程集 程序在一个目录及它的所有子目录下搜索所有文件 打印出包含指定关键字的文件列表 从下面实例可以看出 使用阻塞队列两个显著的好处就是 多线程操作共同的队列时不需要额外的同步 另外就是队列会自动平衡负载 即那边(生产与消费两边)处理快了就会被阻塞掉 从而减少两边的处理速度差距 下面是具体实现

  Java代码

  public class BlockingQueueTest

  public static void main(String[] args)

  Scanner in = new Scanner(System in);

  System out print( Enter base directory (e g /usr/local/jdk /src): );

  String directory = in nextLine();

  System out print( Enter keyword (e g volatile): );

  String keyword = in nextLine();

  final int FILE_QUEUE_SIZE = ;// 阻塞队列大小

  final int SEARCH_THREADS = ;// 关键字搜索线程个数

  // 基于ArrayBlockingQueue的阻塞队列

  BlockingQueue<File> queue = new ArrayBlockingQueue<File>(

  FILE_QUEUE_SIZE);

  //只启动一个线程来搜索目录

  FileEnumerationTask enumerator = new FileEnumerationTask(queue

  new File(directory));

  new Thread(enumerator) start();

  //启动 个线程用来在文件中搜索指定的关键字

  for (int i = ; i <= SEARCH_THREADS; i++)

  new Thread(new SearchTask(queue keyword)) start();

  

  

  class FileEnumerationTask implements Runnable

  //哑元文件对象 放在阻塞队列最后 用来标示文件已被遍历完

  public static File DUMMY = new File( );

  private BlockingQueue<File> queue;

  private File startingDirectory;

  public FileEnumerationTask(BlockingQueue<File> queue File startingDirectory)

  this queue = queue;

  this startingDirectory = startingDirectory;

  

  public void run()

  try

  enumerate(startingDirectory);

  queue put(DUMMY);//执行到这里说明指定的目录下文件已被遍历完

   catch (InterruptedException e)

  

  

  // 将指定目录下的所有文件以File对象的形式放入阻塞队列中

  public void enumerate(File directory) throws InterruptedException

  File[] files = directory listFiles();

  for (File file : files)

  if (file isDirectory())

  enumerate(file);

  else

  //将元素放入队尾 如果队列满 则阻塞

  queue put(file);

  

  

  

  class SearchTask implements Runnable

  private BlockingQueue<File> queue;

  private String keyword;

  public SearchTask(BlockingQueue<File> queue String keyword)

  this queue = queue;

  this keyword = keyword;

  

  public void run()

  try

  boolean done = false;

  while (!done)

  //取出队首元素 如果队列为空 则阻塞

  File file = queue take();

  if (file == FileEnumerationTask DUMMY)

  //取出来后重新放入 好让其他线程读到它时也很快的结束

  queue put(file);

  done = true;

   else

  search(file);

  

   catch (IOException e)

  e printStackTrace();

   catch (InterruptedException e)

  

  

  public void search(File file) throws IOException

  Scanner in = new Scanner(new FileInputStream(file));

  int lineNumber = ;

  while (in hasNextLine())

  lineNumber++;

  String line = in nextLine();

  if (ntains(keyword))

  System out printf( %s:%d:%s%n file getPath() lineNumber

  line);

  

  in close();

  

lishixinzhi/Article/program/Java/hx/201311/26657

Java核心技术读书笔记10-2 阻塞队列线程安全集合类Callable与Future线程池与任务组同步框架

3. 阻塞队列

阻塞队列是一种队列,当线程想满队中添加元素,或对空队取元素时会阻塞该线程。这种结构可以解决一些同步问题,如生产者与消费者。
对阻塞队列BlockingQueue一些操作方法和结果

当使用阻塞队列管理线程时,常用的就是put和take方法。
BlockingQueue的一些实现类:
LinkedBlockingQueue使用链表实现,因此容量没有上边界,但可以选择指定最大容量(若不指定,则容量为Integer.MAX_VALUE)。
LinkedBlockingDeque是一个双端队列
ArrayBlockingQueue在构造时需要指定容量,且可以用一个可选参数来指定是否需要公平性。若设置了公平性则等待了最长时间的线程将会优先得到处理,但这样会降低性能。
PriorityBlockingQueue是一个使用堆实现的优先级队列。元素会按照优先级的顺序被取出。
DelayQueue该队列存放实现了Delayed接口的元素,每个元素存在一个延迟,只有当延迟为负时才能从该队列中移除,该队列有序,队头元素的延迟最长,该队列可用于对超时元素做出某些处理的场景,如订单超时自动取消:

interface Delayed extends Comparable<Delayed>{
  long getDelay(TimeUnit unit);
}

在Java 7中,增加了一个TransferQueue接口(继承了BlockingQueue),其主要作用是生产者通过transfer方法将一个元素放置到队列中后将被阻塞,直到消费者从队列中取出这个元素。

void transfer(E e) throws InterruptedException;

4.线程安全的集合

由于对某些集合进行存取时并不是一个简单地一步操作,如对散列表添加元素后,散列表会需要内部调整各个桶的关系,这时假如存在其他线程抢占到了处理机对没有调整完的散列表进行操作,就会破坏集合的内部结构产生混乱导致抛出异常或陷入死循环。因此面对这种场景可以选择一些java.util.concurrent包中线程安全的集合类如:concurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet和ConcurrentLinkedQueue。
并发的散列映射可高效的支持大量的读者和一定数量的写者。
对于含有映射比较庞大的集合可以使用mappingCount方法返回其long类型的大小。
集合可以返回弱一致性的迭代器。迭代器在多线程环境下工作,不一定能返回所有对集合进行的修改,但同一个值不会出现两次,也不会抛出ConcurrentModificationException异常。

java.util中的迭代器构造之后集合发生改变会抛出上述异常。
线程安全的集合可以保证集合内部的操作具有原子性,但仍然不能保证使用集合时的线程安全问题,如使用一个映射完成转账操作。对于这种场景需要自行编写原子性的代码,如果对集合的值更新比较简单也可以使用集合提供的一些方法比如compute与merge方法。

4.1 同步包装器
Vector与HashTable是较早提供了线程安全的动态数组与散列表实现。现在对于其替代者ArrayList和HashMap类也可以使用同步包装器变为线程安全的,实际上任何集合类都可以通过同步包装器变为线程安全的。实现原理为为集合的方法使用锁加以保护,并返回包装后的集合。

但是在包装后的多线程环境下,应确保没有任何线程通过原始的非同步方法访问数据结构,也就是说,作为一种引用类型,不要有原始变量引用集合并对集合进行操作。对集合的操作也必须要使用客户端锁定,包括使用迭代器遍历集合时(构造迭代器后修改集合会抛出ConcurrentModificationException异常)。

实际上最好还是使用线程安全的集合而不是同步包装器返回的集合。

5.Callable与Future

Runnable封装一个异步运行的任务,可以将其想象为一个没有参数和返回值的异步方法。Callable与Runnable类似,但是有返回值。

public interface Callable<V>{
  V call() throws Exception; //call与run方法类似,都是封装一个任务。类型参数V即返回值的类型
}

获取结果使用Future对象,通过将Callable转换成Future,然后使用Future对象传入Thread构造器中进行计算。之后使用Future的方法返回结果。
转换类型使用FutureTask包装器,它可以将Callable转换成Future与Runnable。
public interface Future{
V get() throws..; //获取结果,获取时被阻塞,计算完成后返回结果。
V get(long timeout, TimeUnit unit) throws..; //时间内未返回结果抛出InterruptedException异常,同时运行计算的线程被中断,两个方法都会抛出InterruptedException异常。
void cancel(boolean mayInterrupt); //取消计算,计算未开始进行取消且不会再开始,否则对其进行中断
boolean isCancelled(); //若在计算完成前被取消返回true,否则返回false
boolean isDone(); //计算未完成返回false,计算完成返回true
}

6.执行器与线程池

频繁开辟新线程涉及和操作系统的交互,如果程序中创建了大量生命周期很短的线程应该使用线程池。将Runnable交给线程池,就会有一个线程调用run方法,run方法退出时方法不会死亡,而是在池中准备为下一个请求提供服务。
此外,一个系统采用线程池可以控制并发线程的数目,创建大量线程会大大降低性能甚至使JVM崩溃。
执行器(Executor)有许多静态工厂方法用来构建线程池:

6.1 常用线程池
下面为常用线程池的用法:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //线程池返回一个ExecutorService可以接受任务并为其分配线程
        ExecutorService executorService = Executors.newCachedThreadPool(); //对于每个任务,有空闲线程立即让他执行任务,否则创建线程,空闲线程会保留60秒
//        Executors.newFixedThreadPool(20); //创建一个固定大小线程池,若任务数多于线程数则进入队列中等待空闲线程
//        Executors.newSingleThreadExecutor(); //相当于容量为1的固定大小线程池

        //若使用submit提交对象为Runnable则会自动包装为FutureTask对象
        Future<?> f1 = executorService.submit(() -> System.out.println("执行任务"));//提交一个Runnable对象返回一个Future对象
        f1.get(); //可以用该返回的Future调用isDone、cancel或isCancelled但调用get方法会返回null

        Future<Object> f2 = executorService.submit(() -> System.out.println("执行任务"), new Employee(100, "测试")); //返回的Future对象调用get方法会得到传入的Employee对象
        System.out.println(f2.get()); //Employee{name=测试 salary=100.0}

        Future<Integer> f3 = executorService.submit(() -> {
            System.out.println("这是一个call()方法");
            return 100;
        }); //传入一个Callable对象
        System.out.println(f3.get()); //call方法的返回结果 100
        
        //确定不会再使用线程池时需要对其进行关闭
        executorService.shutdown(); //不在接收新任务,所有任务都完成后,线程池中的线程死亡。
//        executorService.shutdownNow(); //取消尚未开始的所有任务并试图中断正在运行的任务。
    }

6.2 预定执行
使用newScheduledThreadPool和newSingleThreadScheduledThreadPool将返回实现了ScheduledExecutorService接口的对象。它是一种允许使用线程池机制的java.util.Timer的泛化,即对预定的Runnable或Callable在初始的延迟之后只运行一次或对Runnable对象周期性的运行。

6.3 控制任务组
批量提交Callable
如果有多个带有返回结果的任务想要批量提交给线程池该怎么做?ExecutorService提供了两个方法invokeAll和invokeAny两个方法,可以接受存储Callable元素的集合,两个方法的不同之处在于:
invokeAny:返回一个最先完成计算的任务结果,例如你利用了多线程寻找迷宫的出口,其实只要最先计算完成的线程告知出口位置就可以。
invokeAll:返回的是全部任务对应的Future组成的List。

ExecutorCompletionService
对于invokeAll方法返回的集合,当遍历它时,可能会因为该Future对应的任务还没有计算完毕而发生阻塞,这样,可能有其它任务计算完了我们也不能获取。这个问题可以用ExecutorCompletionService解决,使用ExecutorCompletionService类包装一下ExecutorService,返回一个ExecutorCompletionService对象。然后使用该对象多次提交任务后,这个类会按结果可获得的顺序保存起来,这样就不用怕存在多个已经计算完毕的结果,而被当前计算时间较长的Future阻塞的情况了。之后使用take方法获取结果。
示例代码如下:

public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Callable<Integer> c1 = () -> {
            System.out.println("执行任务1");
            Thread.sleep(1000); //模拟计算时间最长的任务
            return 1;
        };

        Callable<Integer> c2 = () -> {
                System.out.println("执行任务2");
                return 2;
        };

        Callable<Integer> c3 = () -> {
            System.out.println("执行任务3");
            return 3;
        };

        List<Callable<Integer>> list = Arrays.asList(c1, c2, c3);
        //invokeAny
//        Integer integer = executorService.invokeAny(list); 
//        System.out.println(integer); //输出:2 得到计算最快的结果时就停止
        
        //invokeAll
        List<Future<Integer>> futures = executorService.invokeAll(list);
        futures.iterator().forEachRemaining((Future f) -> {
            try {
                System.out.println(f.get()); //输出:1 2 3 会顺序获取任务的结果,但任务1执行最慢,因此会被任务1阻塞才能获得其余结果
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }); 

        //ExecutorCompletionService
        ExecutorCompletionService ecs = new ExecutorCompletionService(executorService);
        ecs.submit(c1);
        ecs.submit(c2);
        ecs.submit(c3);
        for (int i = 0; i < 3; i++) {
            System.out.println(ecs.take().get()); //2 3 1 不会被任务1阻塞
        }
        executorService.shutdown(); 
    }

6.4 Fork-Join框架
Fork-Join框架是Java 7引入的一个能够并行执行任务的框架,一般出现一个大问题能划分为小问题,小问题和大问题的解决方式一样且小问题的解可以应用回大问题这样的递归式的问题结构时就可以使用该框架并行解决子问题,比如典型的图像处理问题。采用该框架需要扩展RecursiveTask或RecursiveAction类,前者的任务会返回结果,而后者的任务不会。该框架的其一般形式如下(以处理二分处理数组元素为例):

class Counter extends RecursiveTask<Integer>{
  设定阈值 = ..;
  参数 = ...;
  public Counter(问题参数){
    设定当前问题的参数;
  }
  ...
  protected Integer compute(){ //必须覆盖该方法,该方法为实际处理问题的逻辑
    if(当前子问题规模 < 阈值){
      直接解决子问题 
    }
    else{
      分解子问题
      Counter first = new Counter(子问题1);  //递归分解子问题
      Counter second = new Counter(子问题2);
      ...
      invokeAll(first, second,..) //处理子问题
      return first.join() + second.join() ...; //join方法返回子问题结果
    }
  }

}

6.5 CompletableFurture
如果你有一个多线程异步处理的逻辑需求,如希望使用多线程的方式从url中获取网页文本,在对这个文本使用多线程的方式获取其中的url,那么就可以使用java 8提供的CompletableFurture类。该类以函数式编程的方式提供了提供了Future处理链,或者组合多个Future完成更复杂的任务,如:

CompletableFurture<String> contents = readPage(url); //CompletableFurture
CompletableFurture<List<URL>> links = contents.thenApply(Parser::getLinks); //使用thenApply,传入一个方法处理contents,然后再返回一个CompletableFurture


6.6 同步器
java.util.concurrent包中包含了几个能够帮助人们管理相互合作的线程集的类,这些机制具有为线程之间的共用集结点模式提供的“预置功能”。如果一个相互合作的线程集满足这些行为模式之一,那么应该直接重用合适的类库而不要手动编写锁与条件的集合代码。

以上是关于java阻塞队列 线程同步合作的主要内容,如果未能解决你的问题,请参考以下文章

同步/异步 异步回调 协成 线程队列

Java AQS源码阅读

Linux多线程_(线程同步,基于阻塞队列的生产者消费者模型)

Java多线程_阻塞队列

死锁,线程协作(同步,阻塞队列,Condition,管道流)

Java_线程同步和死锁