在完成 Java 未来时释放资源 - 此处:对 Callable 及其外部变量的引用

Posted

技术标签:

【中文标题】在完成 Java 未来时释放资源 - 此处:对 Callable 及其外部变量的引用【英文标题】:Freeing up Resources in Completion of Java Future - Here: Reference to Callable and its outer Variables 【发布时间】:2013-08-20 00:25:41 【问题描述】:

在我的项目中,我经常使用 Java Futures 处理并发任务。在一个应用程序中,每个并发任务在完成期间都需要相当大的内存。由于其他一些设计选择,该内存是在线程外部创建的对象中创建和引用的(请参阅下面的更详细示例)。

令我惊讶的是,即使在未来任务(即它的计算线程)已经完成之后,future 仍然持有对这个对象的引用。也就是说:如果在其他地方没有对该对象的其他引用,则该对象将不会被释放,除非未来被释放 - 即使任务已经完成。

我的幼稚想法是限制并发线程的数量会自动限制任务持有的资源(内存)数量。这不是真的!

考虑下面的代码。在这个例子中,我创建了一些任务。在他们的计算过程中,一个 ArrayList(它是一个外部变量)的大小会增长。该方法返回一个Vector<Future>。即使任务已经完成,即使 ArrayList 的范围已经离开,Future 仍然持有对 ArrayList 的引用(通过FutureTask.sync.callable)。

总结一下:

FutureTask 持有对 Callable 的引用,即使 Callable 已完成。 Callable 包含对计算期间使用的最终外部变量的引用,即使计算已完成。

问题:释放 Future 持有的资源的最佳方式是什么? (当然,我知道可调用的局部变量会在线程完成时被释放——这不是我想要的)。

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christianfries.com.
 *
 * Created on 17.08.2013
 */

package net.finmath.experiments.concurrency;

import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author Christian Fries
 *
 */
public class ConcurrencyTest 

    private ExecutorService executor = Executors.newFixedThreadPool(10);
    private int numberOfDoubles = 1024*1024/8;      // 1 MB
    private int numberOfModels  = 100;              // 100 * 1 MB

    /**
     * @param args
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException 
        ConcurrencyTest ct = new ConcurrencyTest();
        ct.concurrencyTest();
    

    /**
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public void concurrencyTest() throws InterruptedException, ExecutionException 
        Vector<Double> results = getResults();

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only results): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
    


    private Vector<Double> getResults() throws InterruptedException, ExecutionException 
        Vector<Future<Double>> resultsFutures = getResultsConcurrently();


        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.HOURS);

        /*
         * At this point, we expect that no reference to the models is held
         * and the memory is freed.
         * However, the Future still reference each "model". 
         */

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only futures): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

        Vector<Double> results = new Vector<Double>(resultsFutures.size());
        for(int i=0; i<resultsFutures.size(); i++) 
            results.add(i, resultsFutures.get(i).get());
               

        return results;
    

    private Vector<Future<Double>> getResultsConcurrently() 

        /*
         * At this point we allocate some model, which represents
         * something our workers work on.
         */
        Vector<ArrayList<Double>> models = new Vector<ArrayList<Double>>(numberOfModels);
        for(int i=0; i<numberOfModels; i++) 
            models.add(i, new ArrayList<Double>());
        

        /*
         * Work on the models concurrently
         */
        Vector<Future<Double>> results = calculateResults(models);


        /*
         * Return the futures.
         * Note: We expect that no more reference is held to a model
         * once we are running out scope of this function AND the respective worker
         * has completed.
         */
        return results;
    

    private Vector<Future<Double>> calculateResults(Vector<ArrayList<Double>> models) 
        Vector<Future<Double>> results = new Vector<Future<Double>>(models.size());
        for(int i=0; i<models.size(); i++) 
            final ArrayList<Double> model       = models.get(i);
            final int               modelNumber = i;

            Callable<Double> worker = new  Callable<Double>() 
                public Double call() throws InterruptedException 

                    /*
                     * The models will perform some thread safe lazy init,
                     * which we simulate here, via the following line
                     */
                    for(int j=0; j<numberOfDoubles; j++) model.add(Math.random());

                    /*
                     * Now the worker starts working on the model
                     */
                    double sum = 0.0;
                    for(Double value : model) sum += value.doubleValue();

                    Thread.sleep(1000);

                    Runtime.getRuntime().gc();
                    System.out.println("Model " + modelNumber + " completed. Allocated memory: " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

                    return sum;
                
            ;

            // The following line will add the future result of the calculation to the vector results               
            results.add(i, executor.submit(worker));
        

        return results;
    

这是调试器/分析器的屏幕截图(这是在另一个示例中完成的)。 FutureTask 已经完成(从结果中可以明显看出)。然而,FutureTask 持有对 Callable 的引用。在这种情况下,Callable 持有对包含一些“大”对象的外部最终变量参数的引用。

(这个例子更真实。这里Obba Server 使用并发创建和处理数据处理电子表格 - 取自我的一个项目)。

更新:

鉴于 allprog 和 sbat 的答案,我想添加一些 cmets:

我接受了allprog的答案,因为它是对原始问题如何在未来释放资源的答案。我不喜欢的是这个解决方案对外部库的依赖,但在这种情况下,这是一个很好的提示。

也就是说,我首选的解决方案是 sbat 解决方案,并且在下面我自己的回答中:避免在 call() 完成后在可调用对象中引用“更大”的对象。实际上,我首选的解决方案是避免匿名类实现 Callable。相反,我定义了一个实现 Callable 的内部类,它有一个构造函数,通过它的构造函数接收对其他对象的所有引用,并在 call() 实现结束时释放它们。

【问题讨论】:

可以进行的一项改进是在 Future 的调用方法中在 return sum; 之前调用 model.clear(); 这将是下面“玩具示例”中的解决方案。但是,在我的应用程序中,模型是从外部提供的,通常也可以被其他线程使用。出于这个原因,他们正在缓存一些数据。使用 model.clear() 相当于为生成的数据使用局部变量,不幸的是这不是我的设置。 一点OFF评论:尽量不要使用Vector。这是一个过时的 List 实现。请改用 ArrayList。 @John Vint:就是那个 ArrayList。请注意,代码只是一个玩具示例。在我的实际应用程序中,它是其他一些对象(如果这是混乱的话)。该 ArrayList 被炸毁到大约 1 MB。它被 Future 保留... 您找到问题的答案了吗? 【参考方案1】:

如果您担心未来的任务会保留对您模型的引用,您可以尝试替换

final ArrayList<Double> model = models.get(i);

final ArrayList<ArrayList<Double>> modelList = 
            new ArrayList<ArrayList<Double>>();
modelList.add(models.get(i));

然后在你的任务调用方法的开头,做

ArrayList<Double> model = modelList.get(0);

最后写上

model = null;
modelList.clear();

至少在没有强行清除用户提供的模型的情况下,您的“玩具示例”得到了改进。这是否可以应用于您的实际工作我不能说。

【讨论】:

【参考方案2】:

如果您使用默认的 ExecutionService 实现,Future 本身就是 FutureTask。 FutureTask 将保留对您提交的 Runnable 或 Callable 的引用,因此,这些分配的任何资源都将保留到未释放 Future 为止。

解决此类问题的最佳方法是将 transform 未来转换为仅保留结果并将其返回给调用者的另一个未来。这样您就不需要完成服务或任何额外的黑客攻击。

// Decorate the executor service with listening
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

// Submit the task
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() 
    public Integer call() throws Exception 
        return new Integer(1234);
    
);

// Transform the future to get rid of the memory consumption
// The transformation must happen asynchronously, thus the 
// Executor parameter is vital
Futures.transform(future, new AsyncFunction<Integer, Integer>() 
    public ListenableFuture<Integer> apply(Integer input) 
        return Futures.immediateFuture(input);
    
, service);

这有效地实现了您所描述的方案,但减轻了您的负担,并在以后提供了更高的灵活性。这带来了 Guava 作为依赖项,但我认为它值得付出代价。我一直使用这个方案。

【讨论】:

【参考方案3】:

我想到的解决方案是

在完成服务中使用期货(通过take(),它应该释放这些期货,然后在另一个线程上执行此操作,返回新的期货。外部代码仅引用完成服务中的期货。

除了其他答案:另一种可能性是避免在可调用对象中保留对模型的引用,就像在 sbat 答案中一样。以下以不同的方式做同样的事情(灵感来自Accessing constructor of an anonymous class 的答案):

通过 setter 将对象“模型”传递给匿名类的字段,完成后将该字段设置为零。

    for(int i=0; i<models.size(); i++) 
// REMOVED: final ArrayList<Double> model       = models.get(i);
        final int               modelNumber = i;

        Callable<Double> worker = new  Callable<Double>() 
/*ADDED*/   private ArrayList<Double> model;
/*ADDED*/   public Callable<Double> setModel(ArrayList<Double> model)  this.model = model; return this; ;

            public Double call() throws InterruptedException 

                /* ... */

                /*
                 * Now the worker starts working on the model
                 */
                double sum = 0.0;

                /* ... */

/*ADDED*/       model = null;
                return sum;
            
/*ADDED*/   .setModel(models.get(i));


        // The following line will add the future result of the calculation to the vector results
        results.add(i, executor.submit(worker));
    

【讨论】:

以上是关于在完成 Java 未来时释放资源 - 此处:对 Callable 及其外部变量的引用的主要内容,如果未能解决你的问题,请参考以下文章

C/C++开发之Windows资源释放管理

C/C++开发之Windows资源释放管理

C# kill线程正常释放资源?

java 啥资源需要手动释放

try()catch{}自动释放资源

try()catch{}自动释放资源