[实践中的并发 7.2.5] 中提到的竞态条件的年表是啥

Posted

技术标签:

【中文标题】[实践中的并发 7.2.5] 中提到的竞态条件的年表是啥【英文标题】:What's the chronology for a race condition mentioned in [Concurrency in practice 7.2.5][实践中的并发 7.2.5] 中提到的竞态条件的年表是什么 【发布时间】:2022-01-05 15:16:59 【问题描述】:

正如 Brian Goetz 所说:“TrackingExecutor 有一个不可避免的竞争条件,这可能使其产生误报:被标识为已取消但实际上已完成的任务。这是因为线程池可能在任务执行,并且池将任务记录为完成。”

TrackingExecutor:

/**
 * TrackingExecutor
 * <p/>
 * ExecutorService that keeps track of cancelled tasks after shutdown
 *
 * @author Brian Goetz and Tim Peierls
 */
public class TrackingExecutor extends AbstractExecutorService 
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService exec) 
        this.exec = exec;
    

    public void shutdown() 
        exec.shutdown();
    

    public List<Runnable> shutdownNow() 
        return exec.shutdownNow();
    

    public boolean isShutdown() 
        return exec.isShutdown();
    

    public boolean isTerminated() 
        return exec.isTerminated();
    

    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException 
        return exec.awaitTermination(timeout, unit);
    

    public List<Runnable> getCancelledTasks() 
        if (!exec.isTerminated())
            throw new IllegalStateException(/*...*/);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    

    public void execute(final Runnable runnable) 
        exec.execute(new Runnable() 
            public void run() 
                try 
                    runnable.run();
                 finally 
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                
            
        );
    

然后他创建了使用TrackingExecutor的Crawler:

爬虫:

/**
 * WebCrawler
 * <p/>
 * Using TrackingExecutorService to save unfinished tasks for later execution
 *
 * @author Brian Goetz and Tim Peierls
 */
public abstract class WebCrawler 
    private volatile TrackingExecutor exec;
    @GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();

    private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
    private static final long TIMEOUT = 500;
    private static final TimeUnit UNIT = MILLISECONDS;

    public WebCrawler(URL startUrl) 
        urlsToCrawl.add(startUrl);
    

    public synchronized void start() 
        exec = new TrackingExecutor(Executors.newCachedThreadPool());
        for (URL url : urlsToCrawl) submitCrawlTask(url);
        urlsToCrawl.clear();
    

    public synchronized void stop() throws InterruptedException 
        try 
            saveUncrawled(exec.shutdownNow());
            if (exec.awaitTermination(TIMEOUT, UNIT))
                saveUncrawled(exec.getCancelledTasks());
         finally 
            exec = null;
        
    

    protected abstract List<URL> processPage(URL url);

    private void saveUncrawled(List<Runnable> uncrawled) 
        for (Runnable task : uncrawled)
            urlsToCrawl.add(((CrawlTask) task).getPage());
    

    private void submitCrawlTask(URL u) 
        exec.execute(new CrawlTask(u));
    

    private class CrawlTask implements Runnable 
        private final URL url;

        CrawlTask(URL url) 
            this.url = url;
        

        private int count = 1;

        boolean alreadyCrawled() 
            return seen.putIfAbsent(url, true) != null;
        

        void markUncrawled() 
            seen.remove(url);
            System.out.printf("marking %s uncrawled%n", url);
        

        public void run() 
            for (URL link : processPage(url)) 
                if (Thread.currentThread().isInterrupted())
                    return;
                submitCrawlTask(link);
            
        

        public URL getPage() 
            return url;
        
    

但我不明白 runnable.run()、exec.shutdownNow()、exec.awaitTermination(...)、exec.getCancelledTasks()、taskCancelledAtShutdown.add(runnable) 调用的确切时间顺序,可运行的完成和线程交错,这会导致竞争条件。

【问题讨论】:

【参考方案1】:

我是这样理解的。比如TrackingExecutorCrawlTask退出之前正在关闭,这个任务也可能记录为taskCancelledAtShutdown,因为TrackingExecutor#execute中的if (isShutdown() &amp;&amp; Thread.currentThread().isInterrupted())可能是真的,但实际上这个任务已经完成了。

    private class CrawlTask implements Runnable 

        public void run() 
            for (URL link : processPage(url)) 
                if (Thread.currentThread().isInterrupted())
                    return;
                submitCrawlTask(link);
            
            // May be here, trackingExecutor is shutting down. 
            // Actually this task has completed now.But this method did not exit.
        

    

    public void execute(final Runnable runnable) 
        exec.execute(new Runnable() 
            public void run() 
                try 
                    runnable.run();
                 finally 
                    // isShutdown() && Thread.currentThread().isInterrupted() may be true
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                
            
        );
    

【讨论】:

是的,这似乎是事实。 1.shutdownNow() 在慢速核心上调用,它将.interrupt() 发送到所有正在运行的任务。 2. runnable.run() 是一个持久的方法,所以它不能在很短的时间内完成,但是 Thread.currentThread().isInterrupted() 状态变为 true。 3. runnable.run 完成并且它的线程在 TrackingExecutor.execute() 的 finally 之前切换出去,离开它的上下文。 4. WebCrawler.stop() 在慢速核心上运行,并调用 exec.awaitTermination(TIMEOUT, UNIT),返回 true,因为 runnable.run() 已完成。 5. TrackingExecutor.execute() 线程上下文再次激活并调用了tasksCancelledAtShutdown.add(runnable),这显然是错误的,因为runnable 已完成,而不是取消。最后, saveUncrawled(exec.getCancelledTasks()) 继续执行错误的任务列表。

以上是关于[实践中的并发 7.2.5] 中提到的竞态条件的年表是啥的主要内容,如果未能解决你的问题,请参考以下文章

linux设备驱动归纳总结:5.SMP下的竞态和并发

linux设备驱动归纳总结:5.SMP下的竞态和并发

Java并发编程实战的作品目录

Java并发编程入门

来自共享内存中的数据结构的竞态检查错误

[Java并发编程实战] 基础知识