并行流是不是以线程安全的方式处理上游迭代器?
Posted
技术标签:
【中文标题】并行流是不是以线程安全的方式处理上游迭代器?【英文标题】:Do parallel streams treat upstream iterators in a thread safe way?并行流是否以线程安全的方式处理上游迭代器? 【发布时间】:2022-01-16 06:20:23 【问题描述】:但是,今天我使用了一个流,它在映射后执行parallel()
操作;底层源是一个非线程安全的迭代器,类似于BufferedReader.lines 实现。
我原本以为 trySplit 会在创建的线程上被调用,然而;我观察到对迭代器的访问来自多个线程。
例如,以下愚蠢的迭代器实现只是设置了足够的元素以导致拆分,并且还跟踪访问 hasNext
方法的唯一线程。
class SillyIterator implements Iterator<String>
private final ArrayDeque<String> src =
IntStream.range(1, 10000)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayDeque::new));
private Map<String, String> ts = new ConcurrentHashMap<>();
public Set<String> threads() return ts.keySet();
private String nextRecord = null;
@Override
public boolean hasNext()
var n = Thread.currentThread().getName();
ts.put(n, n);
if (nextRecord != null)
return true;
else
nextRecord = src.poll();
return nextRecord != null;
@Override
public String next()
if (nextRecord != null || hasNext())
var rec = nextRecord;
nextRecord = null;
return rec;
throw new NoSuchElementException();
使用它来创建如下流:
var iter = new SillyIterator();
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL
), false)
.map(n -> "value = " + n)
.parallel()
.collect(toList());
System.out.println(iter.threads());
这在我的系统上输出了两个fork join线程以及主线程,这让我有点害怕。
[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]
【问题讨论】:
【参考方案1】:线程安全并不一定意味着只能由一个线程访问。重要的方面是没有并发访问,即不能同时被多个线程访问。如果不同线程的访问是按时间排序的,并且这种排序也确保了必要的内存可见性,这是调用者的责任,它仍然是线程安全的使用。
The Spliterator
documentation 说:
尽管拆分器在并行算法中有明显的用途,但它们并不期望是线程安全的;相反,使用拆分器的并行算法的实现应确保拆分器一次仅由一个线程使用。这通常很容易通过串行线程限制实现,这通常是通过递归分解工作的典型并行算法的自然结果。
拆分器不需要在其整个生命周期内被限制在同一个线程中,但在调用方应该有一个明确的切换,以确保旧线程在新线程开始使用它之前停止使用它。
但重要的一点是,拆分器不需要是线程安全的,因此,被拆分器包裹的迭代器也不需要是线程安全的。
请注意,典型的行为是在开始遍历之前进行拆分和移交,但是由于普通的Iterator
不支持拆分,因此包装拆分器必须迭代和缓冲元素才能实现拆分。因此,从Stream
实现的角度来看,当尚未开始遍历时,Iterator
会经历不同线程(但一次一个)的遍历。
也就是说,BufferedReader
的 lines()
实现是一个你不应该遵循的坏例子。由于它以单个 readLine()
调用为中心,因此很自然地直接实现 Spliterator
而不是实现更复杂的 Iterator
并通过 spliteratorUnknownSize(…)
包装它。
由于您的示例同样以单个 poll()
调用为中心,因此直接实现 Spliterator
也很简单:
class SillySpliterator extends Spliterators.AbstractSpliterator<String>
private final ArrayDeque<String> src = IntStream.range(1, 10000)
.mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));
SillySpliterator()
super(Long.MAX_VALUE, ORDERED | NONNULL);
@Override
public boolean tryAdvance(Consumer<? super String> action)
String nextRecord = src.poll();
if(nextRecord == null) return false;
action.accept(nextRecord);
return true;
根据您的实际情况,您还可以将实际的双端队列大小传递给构造函数并提供SIZED
特征。
那么,你可以像这样使用它
var result = StreamSupport.stream(new SillySpliterator(), true)
.map(n -> "value = " + n)
.collect(toList());
【讨论】:
这是一个很好的答案,谢谢。我并不经常需要实现 spliterator 并实际上在 JDK 中寻找示例。 deque 上的示例用于演示,有问题的 impl 与 hadoop 有关。再次感谢您的回答。 我认为这只是一个例子。这就是为什么我建议添加大小特征“取决于你的实际情况”,而不是仅仅将它添加到我的示例中。我还试图强调直接实现 spliterator 背后的一般模式。每当您有某种“获取下一个元素或告诉我没有更多”操作时,例如readLine()
、poll()
、fetchNext()
等,Spliterator
更简单实施。与Iterator
相比,Spliterator
甚至使用两种方法的 API 并不难。这只是熟悉它的问题。以上是关于并行流是不是以线程安全的方式处理上游迭代器?的主要内容,如果未能解决你的问题,请参考以下文章
std::map 访问线程是不是安全,如果它的迭代器永远不会失效