JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(提交任务: submitexecute invokAllinvokeAny)

Posted Z && Y

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(提交任务: submitexecute invokAllinvokeAny)相关的知识,希望对你有一定的参考价值。

1. ThreadPoolExecutor 提交任务

// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
 throws InterruptedException, ExecutionException;
 // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;

1.1 execute 执行任务

示例代码:

package com.tian;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j(topic = "TestThreadPoolExecutors")
public class TestThreadPoolExecutors {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
            // 自增的原子整数
            private AtomicInteger number = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "my_pool_thread: " + number.getAndIncrement());
            }
        });

        pool.execute(() -> {
            log.debug("线程01执行了.");
        });

        pool.execute(() -> {
            log.debug("线程02执行了.");
        });

        pool.execute(() -> {
            log.debug("线程03执行了.");
        });

        pool.execute(() -> {
            log.debug("线程04执行了.");
        });
    }
}

运行结果:


1.2 submit 提交任务 获得任务执行结果

package com.tian;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Slf4j(topic = "c.TestSubmit")
public class TestSubmit {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // 主线程获取 future 的返回结果
        method1(pool);

    }

    private static void method1(ExecutorService pool) throws InterruptedException, ExecutionException {
        Future<String> future = pool.submit(() -> {
            log.debug("running");
            Thread.sleep(1000);
            return "成功获取future的返回结果";
        });

        log.debug("{}", future.get());
    }
}

运行结果:


1.3 invokeAll 提交所有任务 获取返回结果

示例代码:

package com.tian;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Slf4j(topic = "c.TestSubmit")
public class TestSubmit {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        method2(pool);

    }

    private static void method2(ExecutorService pool) throws InterruptedException {
        List<Future<String>> futures = pool.invokeAll(Arrays.asList(
                () -> {
                    log.debug("begin");
                    Thread.sleep(1000);
                    return "成功获取第一个线程返回的结果";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(500);
                    return "成功获取第二个线程返回的结果";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(2000);
                    return "成功获取第三个线程返回的结果";
                }
        ));

        futures.forEach(task -> {
            try {
                log.debug("{}", task.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}

运行结果:


1.4 invokeAny 提交 所有任务 一旦有任务先成功执行完毕,则返回此任务执行结果,其它任务取消

示例代码:

package com.tian;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j(topic = "c.TestSubmit")
public class TestSubmit {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        method3(pool);
    }

    private static void method3(ExecutorService pool) throws InterruptedException, ExecutionException {
        String result = pool.invokeAny(Arrays.asList(
                () -> {
                    Thread.sleep(500);
                    log.debug("begin 1");
                    log.debug("end 1");
                    return "成功获取第一个线程执行的结果";
                },
                () -> {
                    Thread.sleep(1000);
                    log.debug("begin 2");
                    log.debug("end 2");
                    return "成功获取第二个线程执行的结果";
                },
                () -> {
                    Thread.sleep(1500);
                    log.debug("begin 3");
                    log.debug("end 3");
                    return "成功获取第三个线程执行的结果";
                }
        ));
        log.debug("{}", result);
    }
}

运行结果:



以上是关于JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(提交任务: submitexecute invokAllinvokeAny)的主要内容,如果未能解决你的问题,请参考以下文章

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(线程池状态构造方法)

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(提交任务: submitexecute invokAllinvokeAny)

JUC并发编程 共享模式之工具 线程池 -- Fork / Join 框架(JDK1.7 新加入的线程池实现)

JUC并发编程 共享模式之工具 线程池 -- 自定义线程池(阻塞队列)

JUC并发编程 共享模式之工具 ThreadPoolExecutor -- 正确处理线程池异常

JUC并发编程 共享模式之工具 ThreadPoolExecutor -- 线程池应用之定时任务(在每周周四执行定时任务)