如何基于涉及第一个元素的谓词转换可观察对象
Posted
技术标签:
【中文标题】如何基于涉及第一个元素的谓词转换可观察对象【英文标题】:How to transform observable based on predicate involving first element 【发布时间】:2021-12-18 00:01:23 【问题描述】:我正在尝试创建一个采用 Observable<string>
的 Rx.NET 运算符,并且:
"a"
,则转发每个元素不变
否则只发出完成信号
例如:
-a-b-c-d-|- --> -a-b-c-d-|-
-b-c-d-|- --> -|-
我该怎么做?
【问题讨论】:
【参考方案1】:这是一个绝对没有竞争条件的版本:
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
source
.Publish(published =>
from x in published.Take(1)
from y in
x.Equals(expectedFirstElement)
? published.StartWith(x)
: Observable.Empty<T>()
select y);
有方法语法版本:
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
source
.Publish(published =>
published
.Take(1)
.SelectMany(x =>
x.Equals(expectedFirstElement)
? published.StartWith(x)
: Observable.Empty<T>()));
我更喜欢查询语法,但是嘿...
【讨论】:
谢谢。输出序列是否有可能在开始时包含两次expectedFirstElement
(可能是因为第 7 行上的 published
observable 接收到第一个元素的通知有点晚或什么的)?或者这是不可能的?
@jackdry - 不,这是不可能的。由于在订阅published.StartWith(x)
之前,x
已经从published.Take(1)
中获取,因此不可能发生。更好的是,Rx 保证如果下一个元素紧跟在第一个元素之后,也不会错过它。
赞成方法语法版本。 ?
@jackdry The Observable Contract 包含基本内容,还有一个旧的但很好的 Rx Design Guidelines 文档更深入(可下载的 PDF)。还有无数未记录的细微差别使 Rx 库如此难以学习。
@TheodorZoulias - ......但如此强大的工具。【参考方案2】:
这是一种方法:
/// <summary>
/// If the first element has the expected value, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source,
T expectedFirstElement, IEqualityComparer<T> comparer = default)
comparer ??= EqualityComparer<T>.Default;
return source.Publish(published =>
published
.Where(x => !comparer.Equals(x, expectedFirstElement))
.Take(1)
.IgnoreElements()
.Amb(published)
);
此实现使用Amb
运算符(“歧义”的缩写),它采用两个序列并传播首先反应的序列。
-
如果第一个元素有想要的值,第一个序列(
published.Where
+Take
+IgnoreElements
)没有反应,所以传播第二个序列(published
,这是整个序列)。此时第一个序列已取消订阅,因此不会为后续元素调用 comparer.Equals
方法。
如果第一个元素没有所需的值,则第一个序列发出完成通知,该通知由 Amb
运算符传播,第二个序列(整个序列)被忽略。
使用示例:
IObservable<string> original = new string[] "a", "b", "c", "d" .ToObservable();
IObservable<string> transformed = original.IfFirstElement("a");
注意:此实现基于以下假设:当两个序列同时反应时,Amb
运算符始终选择第一个序列。文档中没有提到这一点,仅说明 “Amb
运算符使用并行处理来检测哪个序列产生第一个项目”。 source code 相当复杂,所以我无法通过阅读它得出这个保证。如果你想要更可靠的东西,你可以试试这个实现:
return Observable.Create<T>(observer =>
bool first = true;
return source.Subscribe(item =>
if (first)
first = false;
if (!comparer.Equals(item, expectedFirstElement))
observer.OnCompleted(); return;
observer.OnNext(item);
, observer.OnError, observer.OnCompleted);
);
【讨论】:
我不会像那样使用Amb
,但由于发布,它可以工作。我也会避免使用Create
,因为它会引入竞争条件。
@Enigmativity 能否描述一个会导致上述Observable.Create
实现中出现竞争条件的场景?
我的意思一般是Create
。我只是避免它,因为它可能会阻塞,具体取决于使用的调度程序。我几乎发现使用常规运算符会导致 observables 行为正常。
@Enigmativity 啊,好的。我问是因为有人可以阅读您的评论,并得出上述实现存在缺陷的结论。以上是关于如何基于涉及第一个元素的谓词转换可观察对象的主要内容,如果未能解决你的问题,请参考以下文章