Java线程池在运行后的结果反查

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java线程池在运行后的结果反查相关的知识,希望对你有一定的参考价值。

参考技术A

  Java线程池需要不断的学习 在学习的时候我们就要注意不少的问题 下面我们就来看看具体的语言运作环境如何才能满足Java线程池相关程序的运行 希望大家有所收获

  无论是接收Runnable型参数 还是接收Callable型参数的submit()方法 都会返回一个Future(也是一个接口)类型的对象 该对象中包含了任务的执行情况以及结果 调用Future的boolean isDone()方法可以获知任务是否执行完毕 调用Object get()方法可以获得任务执行后的返回结果 如果此时任务还没有执行完 get()方法会保持等待 直到相应的任务执行完毕后 才会将结果返回

  我们用下面的一个例子来演示Java 中Java线程池的使用

  Java代码

   import ncurrent *;

   public class ExecutorTest

   public static void main(String[] args) throws

  InterruptedException

   ExecutionException

   ExecutorService es = Executors newSingleThreadExecutor();

   Future fr = es submit(new RunnableTest());// 提交任务

   Future fc = es submit(new CallableTest());// 提交任务

   // 取得返回值并输出

   System out println((String) fc get());

   // 检查任务是否执行完毕

   if (fr isDone())

   System out println( 执行完毕 RunnableTest run() );

   else

   System out println( 未执行完 RunnableTest run() );

  

   // 检查任务是否执行完毕

   if (fc isDone())

   System out println( 执行完毕 CallableTest run() );

   else

   System out println( 未执行完 CallableTest run() );

  

   // 停止线程池服务

   es shutdown();

  

  

   class RunnableTest implements Runnable

   public void run()

   System out println( 已经执行 RunnableTest run() );

  

  

   class CallableTest implements Callable

   public Object call()

   System out println( 已经执行 CallableTest call() );

   return 返回值 CallableTest call() ;

  

  

   import ncurrent *;

   public class ExecutorTest

   public static void main(String[] args) throws

  InterruptedException

   ExecutionException

   ExecutorService es = Executors newSingleThreadExecutor();

   Future fr = es submit(new RunnableTest());// 提交任务

   Future fc = es submit(new CallableTest());// 提交任务

   // 取得返回值并输出

   System out println((String) fc get());

   // 检查任务是否执行完毕

   if (fr isDone())

   System out println( 执行完毕 RunnableTest run() );

   else

   System out println( 未执行完 RunnableTest run() );

  

   // 检查任务是否执行完毕

   if (fc isDone())

   System out println( 执行完毕 CallableTest run() );

   else

   System out println( 未执行完 CallableTest run() );

  

   // 停止线程池服务

   es shutdown();

  

  

   class RunnableTest implements Runnable

   public void run()

   System out println( 已经执行 RunnableTest run() );

  

  

   class CallableTest implements Callable

   public Object call()

   System out println( 已经执行 CallableTest call() );

   return 返回值 CallableTest call() ;

  

  

  运行结果

  已经执行 RunnableTest run()

  已经执行 CallableTest call()

  返回值 CallableTest call()

  执行完毕 RunnableTest run()

  执行完毕 CallableTest run()

lishixinzhi/Article/program/Java/gj/201311/27283

使用阻塞队列的对象池在死锁中运行

我有一个在payara 5中运行的java应用程序。

我需要汇集我的bean将使用的一些引擎对象(来自库)。创建引擎需要在单独的线程中完成。

为此我想出了我的EnginePool和我的EngineProducer。想法是EnginePool管理两个BlockingQueues。一个用于可用引擎,另一个用于bean使用的引擎,需要再次使用。 EnginePool应该只提供一次,因此它是一个单例。

@Singleton
@Startup
@LocalBean
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class AbbyyEnginePool implements OcrEnginePool<IEngine> {
private static final Logger logger = LoggerFactory.getLogger(AbbyyEnginePool.class);

@Resource(lookup = "java:comp/DefaultManagedThreadFactory")
private ManagedThreadFactory threadFactory;

private static final int DEFAULT_ENGINE_COUNT = 3;
private BlockingQueue<EngineMetaInfo> availableEngines = new ArrayBlockingQueue<>(DEFAULT_ENGINE_COUNT);
private BlockingQueue<IEngine> enginesToRelease = new ArrayBlockingQueue<>(DEFAULT_ENGINE_COUNT);
private Map<IEngine, IEngine> proxiesMapping = new ConcurrentHashMap<>(DEFAULT_ENGINE_COUNT);
private int poolSize;

public AbbyyEnginePool() {
    this(DEFAULT_ENGINE_COUNT);
}

public AbbyyEnginePool(int poolSize) {
    this.poolSize = poolSize;
    availableEngines = new ArrayBlockingQueue<>(poolSize);
    enginesToRelease = new ArrayBlockingQueue<>(poolSize);
    proxiesMapping = new ConcurrentHashMap<>(poolSize);
}

void setThreadFactory(ManagedThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
}

@PostConstruct
void init() {
    EngineProducer engineProducer = new EngineProducer(availableEngines, enginesToRelease, poolSize);       
    Thread engineProducerThread = threadFactory.newThread(engineProducer);

    engineProducerThread.setName("engineProducer");
    engineProducerThread.start();

}

@Override
public  IEngine get() throws EngineException {
    try {
        EngineMetaInfo engineMetaInfo = availableEngines.take();
        IEngine engineProxy = IEngine.UnmarshalInterface(engineMetaInfo.engineHandle);
        proxiesMapping.put(engineProxy, engineMetaInfo.engine);
        return engineProxy;
    } catch (InterruptedException e) {
        throw new EngineException("Could not retrieve engine", e);
    }
}

@Override
public void release(IEngine engineProxy) throws EngineException {
    if (engineProxy != null) {
        synchronized (proxiesMapping) {
            if (proxiesMapping.containsKey(engineProxy)) {
                try {
                    IEngine engine = proxiesMapping.remove(engineProxy);
                    enginesToRelease.put(engine);
                } catch (Exception e) {
                    throw new EngineException("Could not release engine proxy.");
                }
            } else {
                logger.warn("Engine proxy was not registered. Could not release proxy.");
            }
        }
    }
}

static class EngineMetaInfo {
    long engineHandle;
    IEngine engine;

     EngineMetaInfo(long engineHandle, IEngine engine) {
        this.engineHandle = engineHandle;
        this.engine = engine;
    }
}

}

EngineProducer看起来像这样:

public class EngineProducer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(EngineProducer.class);
private static final String PROJECT_ID = "someId";

private final Integer MAX_ENGINE_COUNT;
private final BlockingQueue<AbbyyEnginePool.EngineMetaInfo> availableEngines;
private final BlockingQueue<IEngine> enginesToRelease;

private Boolean isRunning = Boolean.FALSE;
private List<EngineHolder> enginesHolder;

public EngineProducer(BlockingQueue<AbbyyEnginePool.EngineMetaInfo> availableEngines,
                      BlockingQueue<IEngine> enginesToRelease,
                      Integer maxEnginesCount) {
    this.availableEngines = availableEngines;
    this.enginesToRelease = enginesToRelease;
    this.MAX_ENGINE_COUNT = maxEnginesCount;
    this.enginesHolder = new ArrayList<>(MAX_ENGINE_COUNT);
}

private void initEngines() {
    synchronized (availableEngines) {
        if (availableEngines.size() == 0) {
            try {
                for (int i = 0; i < MAX_ENGINE_COUNT; i++) {
                    EngineHolder engineHolder = new EngineHolder(PROJECT_ID);
                    enginesHolder.add(engineHolder);
                    IEngine engine = engineHolder.getAndLockEngine();
                    long engineHandle = engine.MarshalInterface();

                    AbbyyEnginePool.EngineMetaInfo engineMetaInfo = new AbbyyEnginePool.EngineMetaInfo(engineHandle, engine);
                    availableEngines.put(engineMetaInfo);
                }

                logger.info("{} abbyy engines prepared for processing", MAX_ENGINE_COUNT);
                EnginePool.setInitialized(Boolean.TRUE);
                isRunning = Boolean.TRUE;
            } catch (Exception e) {
                logger.error("Could not instantiate engines.", e);
            }
        }
    }
}

@Override
public void run() {
    try {
        initEngines();

        while(isRunning) {
            IEngine engineProxyToRelease = enginesToRelease.take();
            releaseEngine(engineProxyToRelease);
        }

        availableEngines.clear();
        for(int i = 0; i < enginesHolder.size(); i++) {
            enginesHolder.get(i).unloadEngine();
        }

    } catch (Exception e) {
        logger.error("EngineProducer encounter a problem.", e);
    }
}

public void unloadEngines() {
    isRunning = Boolean.FALSE;
}

private void releaseEngine( IEngine engineToRelease ) {
    for (EngineHolder engineHolder : enginesHolder) {
        if (engineHolder.containsEngine(engineToRelease)) {
            engineHolder.unlockEngine();

            IEngine engine = engineHolder.getAndLockEngine();
            long engineHandle = engine.MarshalInterface();
            AbbyyEnginePool.EngineMetaInfo engineMetaInfo = new AbbyyEnginePool.EngineMetaInfo(engineHandle, engine);
            try {
                availableEngines.put(engineMetaInfo);
            } catch (InterruptedException e) {
                logger.warn("could not add free engine");
            }
            break;
        }
    }
}

}

当我在测试中运行它而不是在glassfish中运行它时没有问题。但是当我在玻璃鱼中运行时,豆子会陷入僵局。

bean使用此代码来获取和释放引擎:

        engine = enginePool.get();
    ProcessingResult processingResult = null;
    try {
        this.parameters = parameters;

        this.tmpDir = tmpDir;
        Path customProfileFile = loadProfiles(parameters);

        Instant processingStart = Instant.now();
        processingResult = processFile();
        Instant processingEnd = Instant.now();
        enginePool.release(engine);
        engine = null;

        processingResult.setProcessingStartTime(processingStart);
        processingResult.setProcessingEndTime(processingEnd);

        logger.info("Processing took about {} milliseconds.", processingResult.getProcessDurationInMilliseconds());
        customProfileFile.toFile().delete();
        this.tmpDir.toFile().delete();
    } catch (Exception e) {
        logger.error("Ocr of document failed ",e );
        enginePool.release(engine);
        throw new EngineException("Ocr of document failed.", e);
    }

在我的场景中有4个尝试获取引擎的bean。其中3个将获得一个,最后一个bean将等待engine = enginePool.get(); 3个获得引擎的bean将完成他们的工作并等待enginePool.release(engine);。我接受了一个线程转储,可以看到3个bean正在等待一个没有引擎的bean所持有的锁。所以他们无法释放引擎。

我的问题是我不明白。释放和获取引擎在不同的阻塞队列上工作,所以我想知道为什么等待获取引擎的最后一个bean阻止试图释放引擎的其他bean。

答案

问题是容器管理所有并发。如果是单例,则意味着对字段的所有访问都将获得写锁定。

解决方案是使用@ConcurrencyManagement(BEAN)注释。这意味着bean控制并发管理,并且必须确保完成同步。

详细解释可以在here找到。

以上是关于Java线程池在运行后的结果反查的主要内容,如果未能解决你的问题,请参考以下文章

java 如何获得线程池中正在执行的线程数?

java获取线程池执行完后的结果

关于java线程池

Java中涉及线程和并发相关的内容

Java 中的线程池

Java中的线程池