JVM 并发性: 阻塞还是不阻塞?

Posted huzhigenlaohu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JVM 并发性: 阻塞还是不阻塞?相关的知识,希望对你有一定的参考价值。

在任何并发性应用程序中,异步事件处理都至关重要。事件来源可能是不同的计算任务、I/O 操作或与外部系统的交互。无论来源是什么,应用程序代码都必须跟踪事件,协调为响应事件而采取的操作。Java 应用程序可采用两种基本的异步事件处理方法:该应用程序有一个协调线程等待事件,然后采取操作,或者事件可在完成时直接执行某项操作(通常采取执行应用程序所提供的代码的方式)。让线程等待事件的方法被称为阻塞 方法。让事件执行操作、线程无需显式等待事件的方法被称为非阻塞 方法。

旧的 java.util.concurrent.Future 类提供了一种简单方式来处理预期的事件完成,但仅通过轮询或等待完成的方法。Java 8 中添加的 java.util.concurrent.CompletableFuture 类扩展了此功能,添加了大量合成或处理事件的方法。(请查阅本系列的前一篇文章 “Java 8 并发性基础”,了解 CompletableFuture 的介绍。)具体地讲,CompletableFuture 提供了在事件完成时执行应用程序代码的标准技术,包括各种组合任务(由 future 表示)的方式。这种组合技术使得编写非阻塞代码来处理事件变得很容易(或者至少比过去容易)。本文将介绍如何使用 CompletableFuture 执行阻塞和非阻塞事件处理。您将获得一些线索,通过这些线索您可以了解为什么非阻塞方法值得花费更多的精力来实现。(可以从作者的 GitHub 存储库获取 完整的示例代码。)

阻塞 和非阻塞

在计算中,根据具体上下文,阻塞 和非阻塞 这两个词的使用通常会有所不同。举例而言,共享数据结构的非阻塞算法不需要线程等待访问数据结构。在非阻塞 I/O 中,应用程序线程可以启动一个 I/O 操作,然后离开执行其他事情,同时该操作会异步地执行。在本文中,非阻塞 指的是在无需等待线程的情况下完成某个执行操作的事件。这些用法中的一个共同概念是,阻塞操作需要一个线程来等待某个结果,而非阻塞操作不需要。

合成事件

等待事件的完成很简单:您有一个线程等待该事件,线程恢复运行时,您就可以知道该事件已经完成。如果您的线程在此期间有其他事要做,它会做完这些事再等待。该线程甚至可以使用轮询方法,通过该方法中断它的其他活动,从而检查事件是否已完成。但基本原理是相同的:需要事件的结果时,您会让线程停靠 (park),以便等待事件完成。

阻塞很容易完成且相对简单,只要您有一个等待事件完成的单一主线程。使用多个因为彼此等待而阻塞的线程时,可能遇到一些问题,比如:

  • 死锁:两个或更多线程分别控制其他线程继续执行所需的资源。
  • 饥饿 (Starvation):一些线程可能无法继续执行,因为其他线程贪婪地消耗着共享资源。
  • 活锁:线程尝试针对彼此而调整,但最终没有进展。

非阻塞方法为创造力留出的空间要多得多。回调是非阻塞事件处理的一种常见技术。回调是灵活性的象征,因为您可以在发生事件时执行任何想要的代码。回调的缺点是,在使用回调处理许多事件时,您的代码会变得凌乱。而且回调特别难调试,因为控制流与应用程序中的代码顺序不匹配。

Java 8 CompletableFuture 同时支持阻塞和非阻塞的事件处理方法,包括常规回调。CompletableFuture 也提供了多种合成和组合事件的方式,实现了回调的灵活性以及干净、简单、可读的代码。在本节中,您将看到处理由 CompletableFuture 表示的事件的阻塞和非阻塞方法的示例。

任务和排序

应用程序在一个特定操作中通常必须执行多个处理步骤。例如,在向用户返回结果之前,Web 应用程序可能需要:

  1. 在一个数据库中查找用户的信息
  2. 使用查找到的信息来执行 Web 服务调用,并执行另一次数据库查询。
  3. 基于来自上一步的结果而执行数据库更新。

图 1 演示了这种结构类型。

图 1. 应用程序任务流
技术分享

图 1 将处理过程分解为 4 个不同的任务,它们通过表示顺序依赖关系的箭头相连接。任务 1 可直接执行,任务 2 和任务 3 都在任务 1 完成后执行,任务 4 在任务 2 和任务 3 都完成后执行。这是我在本文中用于演示异步事件处理的任务结构。真实应用程序(尤其是具有多个移动部分的服务器应用程序)可能要复杂得多,但这个简单的示例仅用于演示所涉及的原理。

建模异步事件

在真实系统中,异步事件的来源一般是并行计算或某种形式的 I/O 操作。但是,使用简单的时间延迟来建模这种系统会更容易,这也是本文所采用的方法。清单 1 显示了我用于生成事件的基本的赋时事件 (timed-event) 代码,这些事件采用了 CompletableFuture 格式。

清单 1. 赋时事件代码
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;

public class TimedEventSupport {
    private static final Timer timer = new Timer();
    
    /**
     * Build a future to return the value after a delay.
     * 
     * @param delay
     * @param value
     * @return future
     */
    public static <T> CompletableFuture<T> delayedSuccess(int delay, T value) {
        CompletableFuture<T> future = new CompletableFuture<T>();
        TimerTask task = new TimerTask() {
            public void run() {
                future.complete(value);
            }
        };
        timer.schedule(task, delay * 1000);
        return future;
    }

    /**
     * Build a future to return a throwable after a delay.
     * 
     * @param delay
     * @param t
     * @return future
     */
    public static <T> CompletableFuture<T> delayedFailure(int delay, Throwable t) {
        CompletableFuture<T> future = new CompletableFuture<T>();
        TimerTask task = new TimerTask() {
            public void run() {
                future.completeExceptionally(t);
            }
        };
        timer.schedule(task, delay * 1000);
        return future;
    }
}

为什么不采用 lambda?

清单 1 中的 TimerTask 被实现为一个匿名内部类,仅包含一个 run() 方法。您可能认为这里可以使用 lambda代替内部类。但是,lambda 仅能用作接口的实例,而TimerTask 被定义为一种抽象类。除非 lambda 特性的 future 扩展添加了对抽象类的支持(有可能,但由于设计问题,未必行得通),或者为 TimerTask 等情形定义了并行接口,否则您必须继续使用 Java 内部类创建单一方法实现。

清单 1 的代码使用一个 java.util.Timer 来计划 java.util.TimerTask 在一定的延迟后执行。每个 TimerTask 在运行时完成一个有关联的 future。delayedSuccess() 计划一个任务来成功完成一个 CompletableFuture<T> 并将 future 返回调用方。delayedFailure() 计划了一个任务来完成一个 CompletableFuture<T> 并抛出异常,然后将 future 返回给调用方。

清单 2 展示了如何使用 清单 1 中的代码创建 CompletableFuture<Integer> 形式的事件,这些事件与 图 1 中的 4 个任务相匹配。(此代码来自示例代码中的 EventComposition类。)

清单 2. 示例任务的事件
// task definitions
private static CompletableFuture<Integer> task1(int input) {
    return TimedEventSupport.delayedSuccess(1, input + 1);
}
private static CompletableFuture<Integer> task2(int input) {
    return TimedEventSupport.delayedSuccess(2, input + 2);
}
private static CompletableFuture<Integer> task3(int input) {
    return TimedEventSupport.delayedSuccess(3, input + 3);
}
private static CompletableFuture<Integer> task4(int input) {
    return TimedEventSupport.delayedSuccess(1, input + 4);
}

清单 2 中 4 个任务方法中的每一个都为该任务的完成时刻使用了特定的延迟值:task1 为 1 秒,task2 为 2 秒,task3 为 3 秒,task4 重新变为 1 秒。每个任务还接受一个输入值,是该输入加上任务编号作为 future 的(最终)结果值。这些方法都使用了 future 的成功形式;稍后我们将会查看一些使用失败形式的例子。

这些任务要求您按 图 1 中所示的顺序运行它们,向每个任务传递上一个任务返回的结果值(或者对于 task4,传递前两个任务结果的和)。如果中间两个任务是同时执行的,那么总执行时间大约为 5 秒(1 秒 + (2 秒、3 秒中的最大值)+ 1 秒。如果 task1 的输入为 1,那么结果为 2。如果该结果传递给 task2 和 task3,结果将为 4 和 5。如果这两个结果的和 (9) 作为输入传递给 task4,最终结果将为 13。

阻塞等待

在设置了执行环境后,是时候设置一些操作了。协调 4 个任务的执行的最简单方式是使用阻塞等待:主要线程等待每个任务完成。清单 3(同样来自示例代码中的 EventComposition 类)给出了此方法。

清单 3. 阻塞等待任务执行
private static CompletableFuture<Integer> runBlocking() {
    Integer i1 = task1(1).join();
    CompletableFuture<Integer> future2 = task2(i1);
    CompletableFuture<Integer> future3 = task3(i1);
    Integer result = task4(future2.join() + future3.join()).join();
    return CompletableFuture.completedFuture(result);
}

清单 3 使用 CompletableFuture 的 join() 方法来完成阻塞等待。join() 等待任务完成,然后,如果成功完成任务,则返回结果值,或者如果失败或被取消,则抛出一个未经检查的异常。该代码首先等待 task1 的结果,然后同时启动 task2 和 task3,并等待两个任务依次返回 future,最后等待 task4 的结果。runBlocking() 返回一个 CompletableFuture,以便与我接下来将展示的非阻塞形式保持一致,但在本例中,future 实际上将在该方法返回之前完成。

合成和组合 future

清单 4(同样来自示例代码中的 EventComposition 类)展示了如何将 future 连接在一起,以便按正确顺序并使用正确的依赖关系执行任务,而不使用阻塞。

清单 4. 非阻塞的合成和组合
private static CompletableFuture<Integer> runNonblocking() {
    return task1(1).thenCompose(i1 -> ((CompletableFuture<Integer>)task2(i1)
        .thenCombine(task3(i1), (i2,i3) -> i2+i3)))
        .thenCompose(i4 -> task4(i4));
}

清单 4 中的代码基本上构造了一个执行计划,指定不同的任务如何执行和它们彼此有何关联。此代码精美而简洁,但是,如果您不熟悉CompletableFuture 方法,或许难以理解该代码。清单 5 通过将 task2 和 task3 部分分离到一个新方法 runTask2and3 中,将同样的代码重构为更容易理解的形式。

清单 5. 重构后的非阻塞的合成和组合
private static CompletableFuture<Integer> runTask2and3(Integer i1) {
    CompletableFuture<Integer> task2 = task2(i1);
    CompletableFuture<Integer> task3 = task3(i1);
    BiFunction<Integer, Integer, Integer> sum = (a, b) -> a + b;
    return task2.thenCombine(task3, sum);
}

private static CompletableFuture<Integer> runNonblockingAlt() {
    CompletableFuture<Integer> task1 = task1(1);
    CompletableFuture<Integer> comp123 = task1.thenCompose(EventComposition::runTask2and3);
    return comp123.thenCompose(EventComposition::task4);    }

在 清单 5 中,runTask2and3() 方法表示任务流的中间部分,其中 task2 和 task3 同时执行,然后将它们的结果值组合在一起。此顺序是使用一个 future 上的 thenCombine() 方法来编码的,该方法接受另一个 future 作为它的第一个参数,接受一个二进制函数实例(其输入类型与 future 的结果类型匹配)作为其第二个参数。thenCombine() 返回了第三个 future,表示应用到最初的两个 future 的结果上的函数的值。在本例中,两个 future 是 task2 和 task3,该函数将结果值求和。

runNonblockingAlt() 方法使用在一个 future 上调用了 thenCompose() 方法两次。thenCompose() 的参数是一个函数实例,它接收原始 future 的值类型作为输入,返回另一个 future 作为输出。thenCompose() 的结果是第三个 future,具有与该函数相同的结果类型。这个 future 用作在原始 future 完成后,该函数最终将返回的 future 的占位符。

对 task1.thenCompose() 的调用将会返回一个 future,表示对 task1 的结果应用 runTask2and3() 函数的结果,该结果被保存为 comp123。对comp123.thenCompose() 的调用返回一个 future,表示对第一个 henCompose() 的结果应用 task4() 函数的结果,这是执行所有任务的总体结果。

试用示例

示例代码包含一个 main() 方法,以便依次运行事件代码的每个版本,并显示完成事件(约 5 秒)和结果 (13) 是正确的。清单 6 显示了从一个控制台运行这个 main() 方法的结果。

清单 6. 运行 main() 方法
[email protected]:~/devworks/scala3/code/bin> java com.sosnoski.concur.article3.EventComposition
Starting runBlocking
runBlocking returned 13 in 5008 ms.
Starting runNonblocking
runNonblocking returned 13 in 5002 ms.
Starting runNonblockingAlt
runNonblockingAlt returned 13 in 5001 ms.

不顺利的道路

目前为止,您看到了以 future 形式协调事件的代码,这些代码总是能够成功完成。在真实应用程序中,不能寄希望于事情总是这么顺利。处理任务过程中将发生问题,而且在 Java 术语中,这些问题通常表示为 Throwable

更改 清单 2 中的任务定义很容易,只需使用 delayedFailure() 代替 delayedSuccess() 方法即可,如这里的 task4 所示:

private static CompletableFuture<Integer> task4(int input) {
    return TimedEventSupport.delayedFailure(1, new IllegalArgumentException("This won‘t work!"));
}

如果运行 清单 3 并且仅将 task4 修改为完成时抛出异常,那么您会得到 task4 上的 join() 调用所抛出的预期的IllegalArgumentException。如果在 runBlocking() 方法中没有捕获该问题,该异常会在调用链中一直传递,最终如果仍未捕获问题,则会终止执行线程。幸运的是,修改该代码很容易,因此,如果在任何任务完成时抛出异常,该异常会通过返回的 future 传递给调用方来处理。清单 7 展示了这一更改。

清单 7. 具有异常的阻塞等待
private static CompletableFuture<Integer> runBlocking() {
    try {
        Integer i1 = task1(1).join();
        CompletableFuture<Integer> future2 = task2(i1);
        CompletableFuture<Integer> future3 = task3(i1);
        Integer result = task4(future2.join() + future3.join()).join();
        return CompletableFuture.completedFuture(result);
    } catch (CompletionException e) {
        CompletableFuture<Integer> result = new CompletableFuture<Integer>();
        result.completeExceptionally(e.getCause());
        return result;
    }}

清单 7 非常浅显易懂。最初的代码包装在一个 try/catch 中,catch 在返回的 future 完成时传回异常。此方法稍微增加了一点复杂性,但任何 Java 开发人员应该仍然很容易理解它。

清单 4 中的非阻塞代码甚至不需要添加 try/catchCompletableFuture 合成和组合操作负责自动为您传递异常,以便依赖的 future 也会在完成时抛出异常。

阻塞还是不阻塞

您已经查看了由 CompletableFuture 表示的事件的阻塞和非阻塞处理方法。至少对于本文中建模的基本的任务流,两种方法都非常简单。对于更复杂的任务流,该代码也会更加复杂。

在阻塞情况下,增加的复杂性不是大问题,您仍然只需要等待事件完成。如果要在线程之间执行其他类型的同步,则会遇到线程饥饿问题,甚至是死锁问题。

在非阻塞情况下,事件的完成所触发的代码执行很难调试。在执行许多类型的事件且事件之间存在许多交互时,跟踪哪个事件触发了哪次执行就会变得很难。这种情形基本上就是回调的噩梦,无论是使用传统的回调还是 CompletableFuture 组合和合成操作。

总而言之,阻塞代码通常具有简单性优势。那么为什么有人希望使用非阻塞方法?本节将给出一些重要的理由。

切换的成本

一个线程阻塞时,以前执行该线程的处理器核心会转而执行另一个线程。以前执行的线程的执行状态必须保存到内存中,并加载新线程的状态。这种将核心从运行一个线程切换到运行另一个线程的操作称为上下文切换

除了直接的上下文切换性能成本,新线程一般会使用来自前一个线程的不同数据。内存访问比处理器时钟慢

以上是关于JVM 并发性: 阻塞还是不阻塞?的主要内容,如果未能解决你的问题,请参考以下文章

阻塞队列线程池原子性及并发工具类

25Java并发性和多线程-阻塞队列

JEP 尝鲜系列 3 - 使用虚线程进行同步网络 IO 的不阻塞原理

java并发编程11.原子变量与非阻塞同步机制

sql server 阻塞与锁

JEP 尝鲜系列 3 - 使用虚线程进行同步网络 IO 的不阻塞原理