异步框架asyn4j的原理
Posted 大数据从业者
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步框架asyn4j的原理相关的知识,希望对你有一定的参考价值。
启动时调用init方法
- public void init(){
- if (!run){
- run = true;
- //工作队列
- workQueue = newPriorityBlockingQueue(maxCacheWork);
- //是否存在工作队列满处理类
- if (this.workQueueFullHandler != null) {
- //自定义线程池
- workExecutor = newAsynThreadPoolExecutor(work_thread_num, work_thread_num, 0L,TimeUnit.MILLISECONDS,
- workQueue, new AsynThreadFactory(),new AsynWorkRejectedExecutionHandler(this.workQueueFullHandler),
- executeWorkNum);
- } else {
- workExecutor = newAsynThreadPoolExecutor(work_thread_num, work_thread_num, 0L,TimeUnit.MILLISECONDS,
- workQueue, new AsynThreadFactory(),executeWorkNum);
- }
- //回调函数工作队列
- callBackQueue = newLinkedBlockingQueue();
- //自定义回调函数线程池
- callBackExecutor = newCallBackThreadPoolExecutor(callback_thread_num, callback_thread_num, 0L,
- TimeUnit.MILLISECONDS, callBackQueue,new CallBackThreadFactory(),
- new CallBackRejectedExecutionHandler(),callBackNum);
- //service启动和关闭处理器
- if (this.serviceHandler != null) {
- this.serviceHandler.setServiceStat(0);
- this.serviceHandler.setAsynService(this);
- this.serviceHandler.process();
- }
- //启动工作队列满处理器
- if (this.workQueueFullHandler != null) {
- this.workQueueFullHandler.process();
- }
- //程序关闭后的处理
- Runtime.getRuntime().addShutdownHook(newThread(){
- public void run()
- {
- AsynServiceImpl.this.close(AsynServiceImpl.closeServiceWaitTime);
- }
- });
- }
- }
AsynServiceImpl主要有两个线程池和三个处理器组成
工作线程池AsynThreadPoolExecutor
- public classAsynThreadPoolExecutor
- extends ThreadPoolExecutor
- {
- private AtomicLong executeWorkNum;
回调函数线程池CallbackThreadPoolExecutor
- public classCallBackThreadPoolExecutor
- extends ThreadPoolExecutor
- {
- private AtomicLong callBackNum;
AsynThreadPoolExecutor的executeWorkNum,统计任务的执行数
AsynThreadPoolExecutor的callBackNum,统计回调函数的执行数
ServiceHandler处理器(启动或关闭)
WorkQueueFullHandler处理器(任务数超过maxCacheWork)
ErrorAsynWorkHandler处理器(任务执行异常)
执行时调用addWork方法
- public void addWork(Object tagerObject, Stringmethod, Object[] params, AsynCallBack asynCallBack, WorkWeight weight, booleancache)
- {
- if ((tagerObject == null) || (method ==null)) {
- throw newIllegalArgumentException("target name is null or target method name is null");
- }
- Object target = null;
- //如果对象是String
- if(tagerObject.getClass().isAssignableFrom(String.class)){
- addWorkWithSpring(tagerObject.toString(),method, params, asynCallBack, weight);
- return;
- }
- //如果对象是class
- if ((tagerObject instanceof Class)) {
- String classKey =((Class)tagerObject).getSimpleName();
- //是否缓存,一个class缓存一个默认的对象,防止重复创建
- if (cache)
- {
- target = targetCacheMap.get(classKey);
- if (target == null)
- {
- target =newObject((Class)tagerObject);
- targetCacheMap.put(classKey, target);
- }
- }
- else
- {
- target = newObject((Class)tagerObject);
- }
- }
- else
- {
- target = tagerObject;
- }
- if (target == null) {
- throw newIllegalArgumentException("target object is null");
- }
- //封装成异步任务
- AsynWork anycWork = newAsynWorkEntity(target, method, params, asynCallBack, weight);
- addAsynWork(anycWork);
- }
- public voidaddAsynWork(AsynWork asynWork){
- if (!run) {
- throw new Asyn4jException("asynservice is stop or no start!");
- }
- if (asynWork == null) {
- throw newIllegalArgumentException("asynWork is null");
- }
- try<span style="font-family: Arial, Helvetica, sans-serif;">{</span>
- //通过semaphore来获取许可权限
- if(this.semaphore.tryAcquire(addWorkWaitTime, TimeUnit.MILLISECONDS))
- {
- WorkProcessor workProcessor = newWorkProcessor(asynWork, this);
- workExecutor.execute(workProcessor);
- totalWork.incrementAndGet();
- }
- else{
- log.warn("work queue is full,addwork to cache queue");
- this.workQueueFullHandler.addAsynWork(asynWork);
- }
- }
- catch (InterruptedException e)
- {
- log.error(e);
- }
- }
通过semaphore来控制最大的访问线程数,maxCacheWork就是semaphore的数量,也是工作队列的数量
这里的目的是控制工作队列的数量不能超过maxCacheWork(也可以通过用上限的queue来代替)
如果超过队列maxCacheWork,就用workQueueFullHandler去处理,处理方式同线程池的拒绝策略处理器(
newAsynWorkRejectedExecutionHandler(this.workQueueFullHandler))不过线程池的拒绝策略没用到(前提是队列的有上限的队列),
工作任务WorkProcessor(内含AsynWorkEntity)
- public void run() {
- Thread currentThread =Thread.currentThread();
- if (this.asynWork.getThreadName() != null){
- setName(currentThread,this.asynWork.getThreadName());
- }
- AsynCallBack result = null;
- try{
- result = this.asynWork.call();
- if (result != null) {
- ApplicationContext.getCallBackExecutor().execute(result);
- }
- }
- catch (Throwable throwable){
- if(this.applicationContext.getErrorAsynWorkHandler() != null) {
- this.applicationContext.getErrorAsynWorkHandler().addErrorWork(this.asynWork,throwable);
- }
- }
- finally{
- this.applicationContext.getSemaphore().release();
- }
- }
首先修改线程名称,然后调用asynWork的call方法,如果有回调函数,就执行,如果有异常就执行ErrorAsynWorkHandler,最后Semaphore释放一个许可
AsynWorkEntity
- public AsynCallBack call()
- throws Exception {
- if (this.target == null) {
- throw new RuntimeException("targetobject is null");
- }
- Class clazz = this.target.getClass();
- String methodKey =MethodUtil.getClassMethodKey(clazz, this.params, this.method);
- Method targetMethod =(Method)methodCacheMap.get(methodKey);
- if (targetMethod == null){
- targetMethod =MethodUtil.getTargetMethod(clazz, this.params, this.method);
- if (targetMethod != null) {
- methodCacheMap.put(methodKey,targetMethod);
- }
- }
- if (targetMethod == null) {
- throw newIllegalArgumentException("target method is null");
- }
- Object result =targetMethod.invoke(this.target, this.params);
- if (this.anycResult != null) {
- this.anycResult.setInokeResult(result);
- }
- return this.anycResult;
- }
通过反射技术调用用具体方法,也用缓存技术,anycResult就是你定义的回调函数,没有就返回null
关闭时调用close方法
- publicvoid close(long waitTime){
- if (run){
- run = false;
- try {
- workExecutor.awaitTermination(waitTime,TimeUnit.MILLISECONDS);
- callBackExecutor.awaitTermination(0L,TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- log.error(e);
- }
- //关闭工作线程和回调函数线程
- workExecutor.shutdown();
- callBackExecutor.shutdown();
- //service关闭处理器
- if (this.serviceHandler != null)
- {
- this.serviceHandler.setAsynWorkQueue(workQueue);
- this.serviceHandler.setCallBackQueue(callBackQueue);
- this.serviceHandler.setServiceStat(1);
- this.serviceHandler.process();
- }
- }
- }
以上是关于异步框架asyn4j的原理的主要内容,如果未能解决你的问题,请参考以下文章
360四面:说说Spring Boot程序启动中Netty异步架构的原理!