异步框架asyn4j的原理

Posted 大数据从业者

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步框架asyn4j的原理相关的知识,希望对你有一定的参考价值。

启动时调用init方法

 

 

[java] view plain copy
 
  1. public void init(){  
  2.     if (!run){  
  3.       run = true;  
  4.       //工作队列  
  5.       workQueue = newPriorityBlockingQueue(maxCacheWork);  
  6.       //是否存在工作队列满处理类  
  7.       if (this.workQueueFullHandler != null) {  
  8.         //自定义线程池  
  9.         workExecutor = newAsynThreadPoolExecutor(work_thread_num, work_thread_num, 0L,TimeUnit.MILLISECONDS,  
  10.           workQueue, new AsynThreadFactory(),new AsynWorkRejectedExecutionHandler(this.workQueueFullHandler),  
  11.           executeWorkNum);  
  12.       } else {  
  13.         workExecutor = newAsynThreadPoolExecutor(work_thread_num, work_thread_num, 0L,TimeUnit.MILLISECONDS,  
  14.           workQueue, new AsynThreadFactory(),executeWorkNum);  
  15.       }  
  16.       //回调函数工作队列  
  17.       callBackQueue = newLinkedBlockingQueue();  
  18.       //自定义回调函数线程池  
  19.       callBackExecutor = newCallBackThreadPoolExecutor(callback_thread_num, callback_thread_num, 0L,  
  20.         TimeUnit.MILLISECONDS, callBackQueue,new CallBackThreadFactory(),  
  21.         new CallBackRejectedExecutionHandler(),callBackNum);  
  22.       //service启动和关闭处理器  
  23.       if (this.serviceHandler != null) {  
  24.         this.serviceHandler.setServiceStat(0);  
  25.        this.serviceHandler.setAsynService(this);  
  26.         this.serviceHandler.process();  
  27.       }  
  28.       //启动工作队列满处理器  
  29.       if (this.workQueueFullHandler != null) {  
  30.         this.workQueueFullHandler.process();  
  31.       }  
  32.       //程序关闭后的处理  
  33.       Runtime.getRuntime().addShutdownHook(newThread(){  
  34.         public void run()  
  35.         {  
  36.          AsynServiceImpl.this.close(AsynServiceImpl.closeServiceWaitTime);  
  37.         }  
  38.       });  
  39.     }  
  40.   }  

 

 

AsynServiceImpl主要有两个线程池和三个处理器组成

 

工作线程池AsynThreadPoolExecutor

 

[java] view plain copy
 
  1. public classAsynThreadPoolExecutor  
  2.   extends ThreadPoolExecutor  
  3. {  
  4.   private AtomicLong executeWorkNum;  

 

 

回调函数线程池CallbackThreadPoolExecutor

 

[java] view plain copy
 
  1. public classCallBackThreadPoolExecutor  
  2.   extends ThreadPoolExecutor  
  3. {  
  4.   private AtomicLong callBackNum;  

 

 

AsynThreadPoolExecutor的executeWorkNum,统计任务的执行数

AsynThreadPoolExecutor的callBackNum,统计回调函数的执行数

 

ServiceHandler处理器(启动或关闭)

WorkQueueFullHandler处理器(任务数超过maxCacheWork)

ErrorAsynWorkHandler处理器(任务执行异常)

 

 

执行时调用addWork方法

 

  

[java] view plain copy
 
  1. public void addWork(Object tagerObject, Stringmethod, Object[] params, AsynCallBack asynCallBack, WorkWeight weight, booleancache)  
  2.   {  
  3.     if ((tagerObject == null) || (method ==null)) {  
  4.       throw newIllegalArgumentException("target name is null or  target method name is null");  
  5.     }  
  6.     Object target = null;  
  7.     //如果对象是String  
  8.     if(tagerObject.getClass().isAssignableFrom(String.class)){  
  9.       addWorkWithSpring(tagerObject.toString(),method, params, asynCallBack, weight);  
  10.       return;  
  11.     }  
  12.     //如果对象是class  
  13.     if ((tagerObject instanceof Class)) {  
  14.       String classKey =((Class)tagerObject).getSimpleName();  
  15.       //是否缓存,一个class缓存一个默认的对象,防止重复创建  
  16.          if (cache)  
  17.       {  
  18.         target = targetCacheMap.get(classKey);  
  19.         if (target == null)  
  20.         {  
  21.           target =newObject((Class)tagerObject);  
  22.           targetCacheMap.put(classKey, target);  
  23.         }  
  24.       }  
  25.       else  
  26.       {  
  27.         target = newObject((Class)tagerObject);  
  28.       }  
  29.     }  
  30.     else  
  31.     {  
  32.       target = tagerObject;  
  33.     }  
  34.     if (target == null) {  
  35.       throw newIllegalArgumentException("target object is null");  
  36.     }  
  37.     //封装成异步任务  
  38.     AsynWork anycWork = newAsynWorkEntity(target, method, params, asynCallBack, weight);  
  39.      
  40.     addAsynWork(anycWork);  
  41.   }  

 

 

 

[java] view plain copy
 
  1. public voidaddAsynWork(AsynWork asynWork){  
  2.     if (!run) {  
  3.       throw new Asyn4jException("asynservice is stop or no start!");  
  4.     }  
  5.     if (asynWork == null) {  
  6.       throw newIllegalArgumentException("asynWork is null");  
  7.     }  
  8.     try<span style="font-family: Arial, Helvetica, sans-serif;">{</span>  
[java] view plain copy
 
  1.     //通过semaphore来获取许可权限  
  2.     if(this.semaphore.tryAcquire(addWorkWaitTime, TimeUnit.MILLISECONDS))  
  3.     {  
  4.       WorkProcessor workProcessor = newWorkProcessor(asynWork, this);  
  5.        
  6.       workExecutor.execute(workProcessor);  
  7.       totalWork.incrementAndGet();  
  8.     }  
  9.     else{  
  10.       log.warn("work queue is full,addwork to cache queue");  
  11.      this.workQueueFullHandler.addAsynWork(asynWork);  
  12.     }  
  13.   }  
  14.   catch (InterruptedException e)  
  15.   {  
  16.     log.error(e);  
  17.   }  
  18. }  



 

 

通过semaphore来控制最大的访问线程数,maxCacheWork就是semaphore的数量,也是工作队列的数量

这里的目的是控制工作队列的数量不能超过maxCacheWork(也可以通过用上限的queue来代替)

如果超过队列maxCacheWork,就用workQueueFullHandler去处理,处理方式同线程池的拒绝策略处理器(

newAsynWorkRejectedExecutionHandler(this.workQueueFullHandler))不过线程池的拒绝策略没用到(前提是队列的有上限的队列),

工作任务WorkProcessor(内含AsynWorkEntity)

 

[java] view plain copy
 
  1. public void run() {  
  2.   Thread currentThread =Thread.currentThread();  
  3.   if (this.asynWork.getThreadName() != null){  
  4.     setName(currentThread,this.asynWork.getThreadName());  
  5.   }  
  6.   AsynCallBack result = null;  
  7.   try{  
  8.     result = this.asynWork.call();  
  9.     if (result != null) {  
  10.      ApplicationContext.getCallBackExecutor().execute(result);  
  11.     }  
  12.   }  
  13.   catch (Throwable throwable){  
  14.     if(this.applicationContext.getErrorAsynWorkHandler() != null) {  
  15.      this.applicationContext.getErrorAsynWorkHandler().addErrorWork(this.asynWork,throwable);  
  16.     }  
  17.   }  
  18.   finally{  
  19.    this.applicationContext.getSemaphore().release();  
  20.   }  
  21. }  



 

首先修改线程名称,然后调用asynWork的call方法,如果有回调函数,就执行,如果有异常就执行ErrorAsynWorkHandler,最后Semaphore释放一个许可

 

AsynWorkEntity

 

[java] view plain copy
 
  1. public AsynCallBack call()  
  2.   throws Exception {  
  3.   if (this.target == null) {  
  4.     throw new RuntimeException("targetobject is null");  
  5.   }  
  6.   Class clazz = this.target.getClass();  
  7.    
  8.   String methodKey =MethodUtil.getClassMethodKey(clazz, this.params, this.method);  
  9.    
  10.   Method targetMethod =(Method)methodCacheMap.get(methodKey);  
  11.   if (targetMethod == null){  
  12.     targetMethod =MethodUtil.getTargetMethod(clazz, this.params, this.method);  
  13.     if (targetMethod != null) {  
  14.       methodCacheMap.put(methodKey,targetMethod);  
  15.     }  
  16.   }  
  17.   if (targetMethod == null) {  
  18.     throw newIllegalArgumentException("target method is null");  
  19.   }  
  20.   Object result =targetMethod.invoke(this.target, this.params);  
  21.   if (this.anycResult != null) {  
  22.     this.anycResult.setInokeResult(result);  
  23.   }  
  24.   return this.anycResult;  
  25. }  



 

通过反射技术调用用具体方法,也用缓存技术,anycResult就是你定义的回调函数,没有就返回null

 

关闭时调用close方法

 

 

[java] view plain copy
 
    1. publicvoid close(long waitTime){  
    2.   if (run){  
    3.     run = false;  
    4.     try {  
    5.       workExecutor.awaitTermination(waitTime,TimeUnit.MILLISECONDS);  
    6.        
    7.       callBackExecutor.awaitTermination(0L,TimeUnit.MILLISECONDS);  
    8.     }  
    9.     catch (InterruptedException e)  
    10.     {  
    11.       log.error(e);  
    12.     }  
    13.     //关闭工作线程和回调函数线程  
    14.     workExecutor.shutdown();  
    15.     callBackExecutor.shutdown();  
    16.     //service关闭处理器  
    17.     if (this.serviceHandler != null)  
    18.     {  
    19.      this.serviceHandler.setAsynWorkQueue(workQueue);  
    20.       this.serviceHandler.setCallBackQueue(callBackQueue);  
    21.       this.serviceHandler.setServiceStat(1);  
    22.       this.serviceHandler.process();  
    23.     }  
    24.   }  
    25. }  

以上是关于异步框架asyn4j的原理的主要内容,如果未能解决你的问题,请参考以下文章

360四面:说说Spring Boot程序启动中Netty异步架构的原理!

Go语言异步服务器框架原理和实现

Android异步任务AsyncTask的使用与原理分析

360四面:说说Spring Boot程序启动中Netty异步架构的原理!

Java 异步任务执行服务:基本概念和原理

Tornado异步非阻塞的使用以及原理