JUC并发编程 多线程设计模式 -- 同步模式之保护性暂停(定义 & 实现 & 带超时版 GuardedObject)

Posted Z && Y

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC并发编程 多线程设计模式 -- 同步模式之保护性暂停(定义 & 实现 & 带超时版 GuardedObject)相关的知识,希望对你有一定的参考价值。

1. 定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点:

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式


2. 实现

class GuardedObject {
    private Object response;
    private final Object lock = new Object();

    // 获取结果
    public Object get() {
        synchronized (lock) {
// 条件不满足则等待
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

// 赋值
    public void complete(Object response) {
        synchronized (lock) {
// 条件满足,通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}

3. 实际运用 & 优点分析

这里有一个下载网页源码的类:

package tian;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
 * 下载网页的源代码
 */
public class Downloader {
    public static List<String> download() throws IOException {
        HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
        List<String> lines = new ArrayList<>();
        try (BufferedReader reader =
                     new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
            String line;
            while ((line = reader.readLine()) != null) {
                lines.add(line);
            }
        }
        return lines;
    }
}

主线程等待t1线程的下载结果:

package tian;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;

import static tian.Downloader.download;

@Slf4j(topic = "c.Test21")
public class Test21 {
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            try {
// 子线程执行下载
                List<String> response = download();
                log.debug("download complete...");
                guardedObject.complete(response);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, "t1").start();
        log.debug("waiting...");
        // 主线程阻塞等待
        Object response = guardedObject.get();
        log.debug("get response: [{}] lines", ((List<String>) response).size());
    }

    static class GuardedObject {
        private Object response;
        private final Object lock = new Object();

        // 获取结果
        public Object get() {
            synchronized (lock) {
// 条件不满足则等待
                while (response == null) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return response;
            }
        }

        // 赋值
        public void complete(Object response) {
            synchronized (lock) {
// 条件满足,通知等待线程
                this.response = response;
                lock.notifyAll();
            }
        }
    }
}

执行结果:

优点分析:

  • join方法需要等到线程运行结束,而wait方法不需要。
  • 使用join方法,等待结果的变量必须设置为全局变量。

4. 带超时版 GuardedObject

增加超时效果,等待超时直接退出等待:

// 增加超时效果
class GuardedObject {
    // 结果
    private Object response;

    // 获取结果
    // timeout 表示要等待多久 2000
    public Object get(long timeout) {
        synchronized (this) {
            // 开始时间 15:00:00
            long begin = System.currentTimeMillis();
            // 经历的时间
            long passedTime = 0;
            while (response == null) {
                // 这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                // 经历的时间超过了最大等待时间时,退出循环
                if (timeout - passedTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); 
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 求得经历时间
                passedTime = System.currentTimeMillis() - begin; // 15:00:02  1s
            }
            return response;
        }
    }

    // 产生结果
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

5. 测试超时等待效果

隔1s赋值:

@Slf4j(topic = "c.Test20")
public class Test20 {
    public static void main(String[] args) throws InterruptedException {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            log.debug("begin...");
            Object response = guardedObject.get(2000);
            log.debug("结果是:{}", response);
        }, "t1").start();

        new Thread(() -> {
            log.debug("begin...");
            try {
                // 隔1s赋值
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            guardedObject.complete("RESPONSE");
        }, "t2").start();
    }
}

运行结果:

隔3s赋值,最大等待时间为2s。

@Slf4j(topic = "c.Test20")
public class Test20 {
    public static void main(String[] args) throws InterruptedException {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            log.debug("begin...");
            Object response = guardedObject.get(2000);
            log.debug("结果是:{}", response);
        }, "t1").start();

        new Thread(() -> {
            log.debug("begin...");
            try {
                // 隔3s赋值
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            guardedObject.complete("RESPONSE");
        }, "t2").start();
    }
}

运行结果:

隔1s赋值null:

@Slf4j(topic = "c.Test20")
public class Test20 {
    public static void main(String[] args) throws InterruptedException {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            log.debug("begin...");
            Object response = guardedObject.get(2000);
            log.debug("结果是:{}", response);
        }, "t1").start();

        new Thread(() -> {
            log.debug("begin...");
            try {
                // 隔1s赋值
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            guardedObject.complete(null);
        }, "t2").start();
    }
}

运行结果:



以上是关于JUC并发编程 多线程设计模式 -- 同步模式之保护性暂停(定义 & 实现 & 带超时版 GuardedObject)的主要内容,如果未能解决你的问题,请参考以下文章

JUC并发编程 多线程设计模式 -- 同步模式之保护性暂停(定义 & 实现 & 带超时版 GuardedObject)

JUC并发编程 多线程设计模式 -- 同步模式之保护性暂停(join方法原理 & 保护性暂停-扩展-解耦等待和生产)

JUC并发编程 多线程设计模式 -- 异步模式之生产者/消费者

JUC并发编程 共享模式之工具 ThreadPoolExecutor 多线程设计模式 -- 异步模式之工作线程(定义饥饿 & 解决饥饿 & 线程池创建多少线程数目合适)

JUC并发编程 共享模式之工具 JUC 线程安全的集合类 -- 线程安全的集合类概述

JUC并发编程 共享模式之工具 JUC CountdownLatch(倒计时锁) -- CountdownLatch(使用CountdownLatch原理改进: 配合线程池使用)