批量任务调优

Posted Jensun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了批量任务调优相关的知识,希望对你有一定的参考价值。

背景:
一个批量任务中,每个数据都需要依赖第三方查询服务,而第三方服务较慢。
目标:
提高批量任务时效;支持并发。
思路:
第三方查询服务池化多线程处理降低时效;
根据查询请求线程建立处理结果对应关系,因发起query请求和查询结果是同一个线程且一次有效,故考虑使用threadLocal;
解决方案:

class DailyAccountLimitHelper{
//第三方服务池化查询
private final PoolHelper poolHelper;
//设置InheritableThreadLocal初始化方法
//使用InheritableThreadLocal因为最终写入结果的是池化查询线程,由客户查询线程继承而来
private static final InheritableThreadLocal<HashMap<String,UserInfo>> accoutByCreditCard=new InheritableThreadLocal<HashMap<String,UserInfo>>(){
    @Override
    protected HashMap<String,UserInfo> initialValue() {
        return new HashMap<>();
    }
};
//客户查询线程的请求队列
//使用InheritableThreadLocal因要传递给处理线程
 private static InheritableThreadLocal<LinkedBlockingDeque<BaseReq<A100001Req>>> reqQueue= new InheritableThreadLocal<LinkedBlockingDeque<BaseReq<A100001Req>>>(){
    @Override
    protected LinkedBlockingDeque<BaseReq<A100001Req>> initialValue() {
        return new LinkedBlockingDeque<>();
    }
};

//客户查询线程和处理线程的对应关系,当客户线程使用完毕,需要终止对应的处理线程,因处理线程会以阻塞形式等待处理
private HashMap<Thread,Thread> inworkingThreads = new HashMap<>();

//客户端调用dailyAccountLimitHelper.putToQuery(a1Req);
public void putToQuery(BaseReq<A100001Req> a1Req){
 //客户端将请求放入队列,并查询是否已有对应的处理线程,没有则new处理线程,并启动
    reqQueue.get().addLast(a1Req);
    if(!inworkingThreads.containsKey(Thread.currentThread())) {
        accoutByCreditCard.get();
        Thread workThread = new Thread(() -> {
            while (true) {
                BaseReq<A100001Req> req = null;
                try {
                    req = reqQueue.get().takeFirst();
                    poolHelper.queryAcctAsync(req);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        inworkingThreads.put(Thread.currentThread(),workThread);
        workThread.start();
    }
    
     
    //dailyAccountLimitHelper.clearResult();
   //当用户线程查询结果完成,需要手工clearResult关闭对应的处理线程,并清理结果集
     public void clearResult() {
     
    Thread workThread = inworkingThreads.get(Thread.currentThread());
    workThread.interrupt();
    accoutByCreditCard.get().clear();
}
   
    
    //内部池化查询服务
    public static class PoolHelper{

    @Async(value = "DailyBatchPool")
    public void queryAcctAsync(BaseReq<A100001Req> a1Req){
    
    log.info("DailyBatchPool 查询到UserInfo:{}",GsonUtil.toGsonString(userInfo));

   //直接写入继承下来的threadlocal结果集
accoutByCreditCard.get().put(userInfo.getCreditCardNo(),userInfo);
    }

}

实际上public void clearResult()清理结果集的方法有问题,清理的是查询线程里的变量,而无法清理池化查询线程中的变量,导致后续查询线程复用已存在的线程池时查询结果无法向查询线程同步。

解决方案:将InheritableThreadLocal改成TransmittableThreadLocal在线程submit时会重更新copy InheritableThreadLocal.

TransmittableThreadLocal的原理
TransmittableThreadLocal继承自InheritableThreadLocal,设置一个类静态变量holder,持有所有需要从线程池submit时重新同步的TransmittableThreadLocal。
class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> {

private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder;
}
//需要使用TtlExecutors和TransmittableThreadLocal
public  static ExecutorService qexecutorServiceTTL = TtlExecutors.getTtlExecutorService(qexecutorService);
public static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();
 public static void main(String[] args) throws InterruptedException {

    inheritableThreadLocal.set(new HashMap<>());
    inheritableThreadLocal.get().put("k","v");
    transmittableThreadLocal.set("trasmittable-main1");
    Thread.sleep(1000);
    qexecutorServiceTTL.submit(()->{

        executorServiceTTL.submit(()->{

            System.out.println("子线程1-inheritable:"+inheritableThreadLocal.get().get("k"));
            System.out.println("子线程1-trasmittable:"+transmittableThreadLocal.get());
            inheritableThreadLocal.get().put("k","2");
        });


    });
    Thread.sleep(1000);
    inheritableThreadLocal.set(new HashMap<>());
    inheritableThreadLocal.get().put("k","v2");
    transmittableThreadLocal.set("trasmittable-main2");

    qexecutorServiceTTL.submit(()->{

        executorServiceTTL.submit(()->{
            System.out.println("子线程2:"+inheritableThreadLocal.get().get("k"));
            System.out.println("子线程2-trasmittable:"+transmittableThreadLocal.get());
        });

    });

}
 

以上是关于批量任务调优的主要内容,如果未能解决你的问题,请参考以下文章

实时即未来,大数据项目车联网之原始数据实时ETL任务HBase调优

技术分享|大数据初探之Spark内存管理与调优

MyBatissaveBatch 性能调优

flink调优之压测任务的合理并行度

Spark任务性能调优总结

SkyPilot 开源框架-批量作业调优