如何基于另一个 Observable 重置 RXJS 扫描算子
Posted
技术标签:
【中文标题】如何基于另一个 Observable 重置 RXJS 扫描算子【英文标题】:How to reset a RXJS scan operator based on another Observable 【发布时间】:2019-10-13 06:44:20 【问题描述】:我有一个组件在呈现虚拟列表中的最后一项时触发onScrollEnd
事件。此事件将发出一个新的 API 请求以获取下一页并使用 scan
运算符将它们与以前的结果合并。
此组件还有一个搜索字段,可触发onSearch
事件。
触发搜索事件时,如何清除scan
算子之前累积的结果?还是我需要在这里重构我的逻辑?
const loading$ = new BehaviorSubject(false);
const offset$ = new BehaviorSubject(0);
const search$ = new BehaviorSubject(null);
const options$: Observable<any[]> = merge(offset$, search$).pipe(
// 1. Start the loading indicator.
tap(() => loading$.next(true)),
// 2. Fetch new items based on the offset.
switchMap(([offset, searchterm]) => userService.getUsers(offset, searchterm)),
// 3. Stop the loading indicator.
tap(() => loading$.next(false)),
// 4. Complete the Observable when there is no 'next' link.
takeWhile((response) => response.links.next),
// 5. Map the response.
map(( data ) =>
data.map((user) => (
label: user.name,
value: user.id
))
),
// 6. Accumulate the new options with the previous options.
scan((acc, curr) =>
// TODO: Dont merge on search$.next
return [...acc, ...curr]);
);
// Fetch next page
onScrollEnd: (offset: number) => offset$.next(offset);
// Fetch search results
onSearch: (term) =>
search$.next(term);
;
【问题讨论】:
【参考方案1】:我认为你可以通过重组你的链来实现你想要的(为了简单起见,我省略了触发加载的 tap
调用):
search$.pipe(
switchMap(searchterm =>
concat(
userService.getUsers(0, searchterm),
offset$.pipe(concatMap(offset => userService.getUsers(offset, searchterm)))),
).pipe(
map(( data ) => data.map((user) => (
label: user.name,
value: user.id
))),
scan((acc, curr) => [...acc, ...curr], []),
),
),
);
来自search$
的每个发射都将创建一个新的内部 Observable,该 Observable 具有自己的 scan
,该 scan
将从一个空累加器开始。
【讨论】:
感谢您抽出宝贵时间回复马丁。我没有像我想要使用您的解决方案那样让它完全工作,但它提供了一些灵感,我找到了一个可行的解决方案。您可以在这里查看,如果您认为可以改进,请立即告诉我:stackblitz.com/edit/rxjs-search-offset TBH 我相信这是最好的解决方案。它使所有内容都面向流,并且不依赖流本身以外的任何东西来重置扫描。 这是最好的解决方案,但我无法在我的应用程序中使用它。可观察的流从未被订阅并且没有调用被触发,我使用不同的方法实现。但这更干净,当截止日期不太灵活时,我可能不得不重试。【参考方案2】:要操作scan
的state
,您可以编写higher order functions 来获取旧状态和新更新。然后与merge 运算符结合使用。这样,您就可以坚持使用干净的面向流的解决方案,而不会产生任何副作用。
const Subject, merge = rxjs;
const scan, map = rxjs.operators;
add$ = new Subject();
clear$ = new Subject();
add = (value) => (state) => [...state, value];
clear = () => (state) => [];
const result$ = merge(
add$.pipe(map(add)),
clear$.pipe(map(clear))
).pipe(
scan((state, innerFn) => innerFn(state), [])
)
result$.subscribe(result => console.log(...result))
add$.next(1)
add$.next(2)
clear$.next()
add$.next(3)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
此方法可以轻松扩展和/或适用于 rxjs 中的其他 state
用例。
示例(删除最后一项)
removeLast$ = new Subject()
removeLast = () => (state) => state.slice(0, -1);
merge(
..
removeLast$.pipe(map(removeLast)),
..
)
【讨论】:
【参考方案3】:找到了一个可行的解决方案:我在 scan
运算符之前使用 withLatestFrom
检查当前偏移量,并在需要时根据此值重置累加器。
Stackblitz demo
【讨论】:
这确实是一个非常简洁、干净的解决方案。【参考方案4】:这是一个有趣的流。仔细想想,offset$ 和 search$ 确实是 2 个独立的流,但逻辑不同,因此应该在最后而不是开始时合并。
另外,在我看来,搜索应该将偏移量重置为 0,而我在当前逻辑中看不到这一点。
这是我的想法:
const offsettedOptions$ = offset$.pipe(
tap(() => loading$.next(true)),
withLatestFrom(search$),
concatMap(([offset, searchterm]) => userService.getUsers(offset, searchterm)),
tap(() => loading$.next(false)),
map(( data ) =>
data.map((user) => (
label: user.name,
value: user.id
)),
scan((acc, curr) => [...acc, ...curr])
);
const searchedOptions$ = search$.pipe(
tap(() => loading$.next(true)),
concatMap(searchTerm => userService.getUsers(0, searchterm)),
tap(() => loading$.next(false)),
map(( data ) =>
data.map((user) => (
label: user.name,
value: user.id
)),
);
const options$ = merge(offsettedOptions, searchedOptions);
看看这是否有效或有意义。我可能遗漏了一些上下文。
【讨论】:
感谢杰西的回复。我没有像我想要使用您的解决方案那样让它完全工作,但它提供了一些灵感,我找到了一个可行的解决方案。您可以在这里查看,如果您认为可以改进,请立即告诉我:stackblitz.com/edit/rxjs-search-offset以上是关于如何基于另一个 Observable 重置 RXJS 扫描算子的主要内容,如果未能解决你的问题,请参考以下文章
如何使用另一个Observable的值操作Observable中的项列表