如何利用线程池解决调用第三接口出现 429 too many requests问题?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何利用线程池解决调用第三接口出现 429 too many requests问题?相关的知识,希望对你有一定的参考价值。

参考技术A

降低请求并发速度就行了,第三方接口可能有限流或者处理线程不够,请求并发过高就直接给你异常返回了。

跟你用不用线程池关系不大。只要你调的不那么频繁,就不会报错了。(其实也差不多就是削峰)

那你怎么控制调用频率呢?通常你的并发量多说明你的处理线程多了,再去调第三方接口它反而处理不过来,那么可以这么搞

    降低你的并发处理线程(这么搞可能不太好)

    延迟随机时间调用第三方,比如你本来10s调完的,延迟为1min内的随机时间,压力减轻6倍

    使用小点的线程池处理调用,把所有的请求丢到这个线程池的队列里排队,就能用这个线程池控制速度了。异步处理,可以同步等待

172SpringBoot2的一个利用CountDownLatch和线程池优化查询接口执行效率的例子

一些统计的页面,需要显示不同的统计数据。因此这些统计的接口在执行不同的统计代码的时候会出现执行速度慢的情况。为了解决这个问题,我用多线程同时执行不同的统计代码来提高代码执行速度。

虽然使用多线程能提高速度,但是有两个问题需要解决:

  1. 新的线程因为和 Controller 方法的线程同时执行,如何能保证 Controller 返回新线程中查询到的数据?
  2. 频繁的开启新线程会带来额外的开销,线程过多可能导致cpu频繁切换线程导致系统效率降低,如何避免这种情况?

这两个问题的答案如下:

  1. Controller的方法中使用 CountDownLatch做局部变量。利用CountDownLatch的countDown()方法和await()方法。
  2. 使用线程池来优化线程的开销。

下面是演示代码,分别是 IOBoundThreadPoolUtils.java、ExceptionUtils.java、ThreadStatsDTO.java 和 TestController.java

IOBoundThreadPoolUtils.java

package com.rzfk.common.utils;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * IO密集情况下的线程池工具类
 * @author zhangchao
 */
public class IOBoundThreadPoolUtils 
    private static ThreadPoolExecutor threadPoolExecutor;

    static 
        // 具体参数的值需要程序员根据实际情况测试,以便得到最优解。
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(20);
        // 当任务队列已满,并且没有空余线程执行,并且线程数已经达到最大值的时候,执行拒绝策略 AbortPolicy。
        // 这个策略会导致 execute 方法抛出RejectedExecutionException异常。
        threadPoolExecutor = new ThreadPoolExecutor(8, 20, 0L,
                TimeUnit.MILLISECONDS, workQueue, new ThreadPoolExecutor.AbortPolicy());
    

    public static void execute(Runnable runnable) 
        threadPoolExecutor.execute(runnable);
    


ExceptionUtils.java

/**
 * 处理异常的工具类
 * @author zhangchao
 **/
public class ExceptionUtils 

    /**
     * 把异常处理成字符串
     * @param t 异常信息
     * @return 处理后的字符串
     */
    public static String excepToString(Throwable t) 
        StackTraceElement[] steArr = t.getStackTrace();
        String[] details = new String[steArr.length];
        for (int i = 0; i < details.length; i++) 
            details[i] = steArr[i].toString();
        
        String message = t.getMessage();
        StringBuilder content = new StringBuilder();
        for (String str : details) 
            content.append(str).append("\\n");
        
        content.append("\\n").append(message).append("\\n");
        return content.toString();
    

ThreadStatsDTO.java


/**
 * 保存线程执行结果的DTO
 * @author zhangchao
 */
public class ThreadStatsDTO 
    // 执行状态是否成功
    private boolean success = false;

    // 错误消息
    private String errorMsg;
    
    
    //   getters/setters
    

    public boolean isSuccess() 
        return success;
    

    public void setSuccess(boolean success) 
        this.success = success;
    

    public String getErrorMsg() 
        return errorMsg;
    

    public void setErrorMsg(String errorMsg) 
        this.errorMsg = errorMsg;
    


TestController.java


/**
 * 测试
 * @author zhangchao
 */
@RestController
@RequestMapping("/test")
public class TestController 
	private final Logger logger = LoggerFactory.getLogger(this.getClass());
	
    @GetMapping("/testapi")
    public R testapi() 
    	 TestapiDto dto = new TestapiDto();

        // 保存新线程的执行结果。
        ThreadStatsDTO threadStatsDTO = new ThreadStatsDTO();
        // 用于同步线程的倒数闩锁。因为只启动了一个线程,所以总数是1.
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try 
            IOBoundThreadPoolUtils.execute(new Runnable() 
                @Override
                public void run() 
                    try 
                        // 执行业务逻辑,做一些查询操作,获取统计数据
                        // .... 具体代码忽略  .....
                        dto.setXxx(xxx); // 保存线程中查询获取的统计数据。
                        // 线程执行结果设定为成功。
                        threadStatsDTO.setSuccess(true); 
                     catch (Exception threadEx) 
                        // 线程执行结果设定为失败。
                        threadStatsDTO.setSuccess(false);
                        // 保存详细的错误信息。包含堆栈信息。
                        threadStatsDTO.setErrorMsg(ExceptionUtils.excepToString(threadEx));
                     finally 
                    	// 不论是否执行成功,都要确保减一。
                        countDownLatch.countDown();
                    
                
				
				// AbortPolicy 会执行 Runnable 的toString()方法,并保存toString()返回的字符串。
				// 这里重写 toString()方法为了区别其他业务逻辑方法。
                @Override
                public String toString() 
                    return "BigScreenController.stats(),ageGenderList";
                
            );
         catch (RejectedExecutionException ree) 
            logger.error("/environment/bigscreen/stats  RejectedExecutionException", ree);
            return R.error("线程池繁忙请稍后再试");
        

		// 同时执行其他的业务逻辑。使得线程池中的代码和这里的代码同时执行。
		// ......具体代码忽略.....

        boolean awaitResult = false;
        try 
		    // 分两种情况讨论
		    // 第一种:
		    //   如果controller线程先执行到这里,新线程还没有执行结束,controller线程会在这里等待新线程,
		    //   一直等到countDownLatch计数变成0,再执行后面的代码。
		    // 第二种:
		    //   如果新线程先执行结束,countDownLatch计数变成0,controller线程执行到这里就不会等待,直接
		    //   执行后面的代码。


            // 倒数闩锁的await(long timeout, TimeUnit unit) 方法一共有三种情况可以解除等待:
            // 1.总数减至0,返回true。
            // 2.超时,返回false。
            // 3.抛出InterruptedException异常。
            //
            // 推荐使用倒数闩的带超时时间的await方法。可以规避线程中其他错误引起的死锁。此处最多等待十秒钟。
            awaitResult = countDownLatch.await(10, TimeUnit.SECONDS);
         catch (InterruptedException e) 
            logger.error("countDownLatch.await() InterruptedException ", e);
        
        if (!awaitResult) 
            return R.error("系统超时");
        
        if (!threadStatsDTO.isSuccess()) 
            logger.error("New thread exception.  \\n" + threadStatsDTO.getErrorMsg());
            return R.error(threadStatsDTO.getErrorMsg());
        

        return R.success(dto);
    

以上是关于如何利用线程池解决调用第三接口出现 429 too many requests问题?的主要内容,如果未能解决你的问题,请参考以下文章

elastic 线程池设置解决logstash-429

172SpringBoot2的一个利用CountDownLatch和线程池优化查询接口执行效率的例子

172SpringBoot2的一个利用CountDownLatch和线程池优化查询接口执行效率的例子

生产环境出现的几次线程池被占满的问题分析

wordpress 429 Too Many Requests,提供wordpress最新版免费下载

Java线程池入门了解