并行流是不是以线程安全的方式处理上游迭代器?

Posted

技术标签:

【中文标题】并行流是不是以线程安全的方式处理上游迭代器?【英文标题】:Do parallel streams treat upstream iterators in a thread safe way?并行流是否以线程安全的方式处理上游迭代器? 【发布时间】:2022-01-16 06:20:23 【问题描述】:

但是,今天我使用了一个流,它在映射后执行parallel() 操作;底层源是一个非线程安全的迭代器,类似于BufferedReader.lines 实现。

我原本以为 trySplit 会在创建的线程上被调用,然而;我观察到对迭代器的访问来自多个线程。

例如,以下愚蠢的迭代器实现只是设置了足够的元素以导致拆分,并且还跟踪访问 hasNext 方法的唯一线程。

class SillyIterator implements Iterator<String> 

    private final ArrayDeque<String> src =
        IntStream.range(1, 10000)
            .mapToObj(Integer::toString)
            .collect(toCollection(ArrayDeque::new));
    private Map<String, String> ts = new ConcurrentHashMap<>();
    public Set<String> threads()  return ts.keySet(); 
    private String nextRecord = null;

    @Override
    public boolean hasNext() 
        var n = Thread.currentThread().getName();
        ts.put(n, n);
        if (nextRecord != null) 
            return true;
         else 
            nextRecord = src.poll();
            return nextRecord != null;
        
    
    @Override
    public String next() 
        if (nextRecord != null || hasNext()) 
            var rec = nextRecord;
            nextRecord = null;
            return rec;
        
        throw new NoSuchElementException();
    


使用它来创建如下流:

var iter = new SillyIterator();
StreamSupport
    .stream(Spliterators.spliteratorUnknownSize(
        iter, Spliterator.ORDERED | Spliterator.NONNULL
    ), false)
    .map(n -> "value = " + n)
    .parallel()
    .collect(toList());

System.out.println(iter.threads());

这在我的系统上输出了两个fork join线程以及主线程,这让我有点害怕。

[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]

【问题讨论】:

【参考方案1】:

线程安全并不一定意味着只能由一个线程访问。重要的方面是没有并发访问,即不能同时被多个线程访问。如果不同线程的访问是按时间排序的,并且这种排序也确保了必要的内存可见性,这是​​调用者的责任,它仍然是线程安全的使用。

The Spliterator documentation 说:

尽管拆分器在并行算法中有明显的用途,但它们并不期望是线程安全的;相反,使用拆分器的并行算法的实现应确保拆分器一次仅由一个线程使用。这通常很容易通过串行线程限制实现,这通常是通过递归分解工作的典型并行算法的自然结果。

拆分器不需要在其整个生命周期内被限制在同一个线程中,但在调用方应该有一个明确的切换,以确保旧线程在新线程开始使用它之前停止使用它。

但重要的一点是,拆分器不需要是线程安全的,因此,被拆分器包裹的迭代器也不需要是线程安全的。

请注意,典型的行为是在开始遍历之前进行拆分和移交,但是由于普通的Iterator 不支持拆分,因此包装拆分器必须迭代和缓冲元素才能实现拆分。因此,从Stream 实现的角度来看,当尚未开始遍历时,Iterator 会经历不同线程(但一次一个)的遍历。


也就是说,BufferedReaderlines() 实现是一个你不应该遵循的坏例子。由于它以单个 readLine() 调用为中心,因此很自然地直接实现 Spliterator 而不是实现更复杂的 Iterator 并通过 spliteratorUnknownSize(…) 包装它。

由于您的示例同样以单个 poll() 调用为中心,因此直接实现 Spliterator 也很简单:

class SillySpliterator extends Spliterators.AbstractSpliterator<String> 
    private final ArrayDeque<String> src = IntStream.range(1, 10000)
        .mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));

    SillySpliterator() 
        super(Long.MAX_VALUE, ORDERED | NONNULL);
    

    @Override
    public boolean tryAdvance(Consumer<? super String> action) 
        String nextRecord = src.poll();
        if(nextRecord == null) return false;
        action.accept(nextRecord);
        return true;
    

根据您的实际情况,您还可以将实际的双端队列大小传递给构造函数并提供SIZED 特征。

那么,你可以像这样使用它

var result = StreamSupport.stream(new SillySpliterator(), true)
    .map(n -> "value = " + n)
    .collect(toList());

【讨论】:

这是一个很好的答案,谢谢。我并不经常需要实现 spliterator 并实际上在 JDK 中寻找示例。 deque 上的示例用于演示,有问题的 impl 与 hadoop 有关。再次感谢您的回答。 我认为这只是一个例子。这就是为什么我建议添加大小特征“取决于你的实际情况”,而不是仅仅将它添加到我的示例中。我还试图强调直接实现 spliterator 背后的一般模式。每当您有某种“获取下一个元素或告诉我没有更多”操作时,例如readLine()poll()fetchNext() 等,Spliterator 更简单实施。与Iterator 相比,Spliterator 甚至使用两种方法的 API 并不难。这只是熟悉它的问题。

以上是关于并行流是不是以线程安全的方式处理上游迭代器?的主要内容,如果未能解决你的问题,请参考以下文章

Java8 Stream流方法

std::map 访问线程是不是安全,如果它的迭代器永远不会失效

Java8 第四章

为啥在Python里推荐使用多进程而不是多线程

python面试题之多线程好吗?列举一些让Python代码以并行方式运行的方法

Java并发-线程安全性