使用CompletableFuture.runAsync()来完成异步任务

Posted 涂作权的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用CompletableFuture.runAsync()来完成异步任务相关的知识,希望对你有一定的参考价值。

转自:https://blog.csdn.net/m0_46539364/article/details/122529287

1.配置线程池

/**
     * int corePoolSize,
     * int maximumPoolSize,
     * long keepAliveTime,
     * TimeUnit unit,
     * BlockingQueue<Runnable> workQueue,
     * ThreadFactory threadFactory,
     * RejectedExecutionHandler handler
     *
     * @return
     */
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) 
        return new ThreadPoolExecutor(pool.getCoreSize(),
                pool.getMaxSize(),
                pool.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
                );
    

2.线程池参数配置类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
	my.thread.core-size=20
	my.thread.max-size=200
	my.thread.keep-alive-time=10
*/
@ConfigurationProperties(prefix = "my.thread")
@Component
@Data
public class ThreadPoolConfigProperties 
	//核心线程数
    private Integer coreSize;
    //最大线程数
    private Integer maxSize;
    //空余线程的存活时间
    private Integer keepAliveTime;

3.测试异步任务

	@Autowired
    private ThreadPoolExecutor executor;
	
	//在这里开启一个异步任务,提交给线程池,runAsync()方法没有返回值,需要有返回值的可使用supplyAsync()方法
    @Test
    void testCompletableFuture() 
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> 
            int result = 0;
            for (int i = 0; i <= 100; i++) 
                result += i;
            
            System.out.println(result);
        , executor);
    

4.关于CompletableFuture的其他相关用法

4.1 CompletableFuture的get()方法可以获取异步的结果,get方法是一个阻塞式等待的方法,也即get方法会等待异步任务的完成

CompletableFuture<AtomicInteger> completableFuture2 = CompletableFuture.supplyAsync(() -> 
            for (int i = 0; i <= 100; i++) 
                sum2.addAndGet(i);
            
            return sum2;
     , executor);
        //获取异步结果
 AtomicInteger integer = completableFuture2.get();

4.2 allOf : 等待所有任务完成完成

AtomicInteger sum = new AtomicInteger();
AtomicInteger sum2 = new AtomicInteger();
CompletableFuture<AtomicInteger> completableFuture1 = CompletableFuture.supplyAsync(() -> 
   for (int i = 0; i <= 100; i++) 
       sum.addAndGet(i);
   
   return sum;
, executor);


CompletableFuture<AtomicInteger> completableFuture2 = CompletableFuture.supplyAsync(() -> 
   for (int i = 0; i <= 100; i++) 
       sum2.addAndGet(i);
   
   return sum2;
, executor);
AtomicInteger integer = completableFuture2.get();

//allOf : 等待所有任务完成完成,注意get方法,是阻塞式等待,等待上面的异步任务都完成
CompletableFuture.allOf(completableFuture1,completableFuture2).get();

//获取异步结果
 AtomicInteger atomicInteger1 = completableFuture1.get();
 AtomicInteger atomicInteger2 = completableFuture2.get();

 System.out.println("结果是--->"+atomicInteger1.addAndGet(atomicInteger2.intValue()));

4.3 异步任务完成时,whenComplete,exceptionally

CompletableFuture<AtomicInteger> completableFuture3 = CompletableFuture.supplyAsync(() -> 
        for (int i = 0; i <= 10; i++) 
            sum2.addAndGet(i);
        
        return sum2;
    , executor).whenComplete((res, exception) -> 
        //当出现异常,可以拿到异常信息,但是无法修改返回数据
        System.out.println("结果是:" + res + ",异常:" + exception);
    ).exceptionally(throwable -> 
        //可以感知异常,同时返回默认值
        return new AtomicInteger(10);
    );

4.4 handle,方法完成后的后续处理

CompletableFuture<Integer> completableFuture4 = CompletableFuture.supplyAsync(() -> 
      int i = 10 / 2;
       return i;
   , executor).handle((res, throwable) -> 
       //res 为结果,throwable 为异常
       if (res != null) 
           return res * 2;
       
       if (throwable != null) 
           return -1;
       
       return 0;
   );
   System.out.println("completableFuture4--结果是:"+completableFuture4.get());

4.5 异步任务串行化

/**
* 异步任务串行化
*      thenAcceptAsync  可以接收上一步获取的结果,但是无返回值
*      thenApplyAsync   可以接收上一步获取的结果,有返回值
*/
   CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> 
       int i = 10 / 2;
       return i;
   , executor).thenApplyAsync(res -> 
       //res为上一步的结果
       return res * 2;
   , executor).thenAcceptAsync((res) -> 
       System.out.println("hello ...thenAcceptAsync");
   , executor);

以上是关于使用CompletableFuture.runAsync()来完成异步任务的主要内容,如果未能解决你的问题,请参考以下文章

在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?

今目标使用教程 今目标任务使用篇

Qt静态编译时使用OpenSSL有三种方式(不使用,动态使用,静态使用,默认是动态使用)

MySQL db 在按日期排序时使用“使用位置;使用临时;使用文件排序”

使用“使用严格”作为“使用强”的备份

Kettle java脚本组件的使用说明(简单使用升级使用)