RxJava 并行获取 Observables
Posted
技术标签:
【中文标题】RxJava 并行获取 Observables【英文标题】:RxJava Fetching Observables In Parallel 【发布时间】:2014-12-02 15:34:21 【问题描述】:在 RxJava 中实现并行异步调用时,我需要一些帮助。我选择了一个简单的用例,其中 FIRST 调用获取(相当搜索)要显示的产品列表(平铺)。随后的调用出去并获取(A)评论和(B)产品图像
经过几次尝试,我到达了这个地方。
1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
2 List<Tile> allTiles = new ArrayList<Tile>();
3 ClientResponse response = new ClientResponse();
4 searchTile.parallel(oTile ->
5 return oTile.flatMap(t ->
6 Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
7 Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());
8 return Observable.zip(reviews, imageUrl, (r, u) ->
9 t.setReviews(r);
10 t.setImageUrl(u);
11 return t;
12 );
13 );
14 ).subscribe(e ->
15 allTiles.add((Tile) e);
16 );
第 1 行:取出要显示的产品(Tile)
第 4 行:我们获取 Observable 的列表并对其进行 SHARD 以获取评论和 imageUrls
谎言 6,7:获取 Observable 评论和 Observable url
第 8 行:最后 2 个 observable 被压缩以返回更新后的 Observable
第 15 行:最后第 15 行整理了所有要显示在集合中的单个产品,该集合可以返回给调用层
虽然 Observable 已被分片并且在我们的测试中运行了 4 个不同的线程;获取评论和图像似乎是一个接一个。我怀疑第 8 行的 zip 步骤基本上导致了 2 个 observables(reviews 和 url)的顺序调用。
这个小组对并行获取评论和图片网址有什么建议吗?本质上,上面附加的瀑布图应该看起来更垂直堆叠。对评论和图像的调用应该是并行的
谢谢 阿南德拉曼
【问题讨论】:
如何生成传输时间线图表?它看起来很酷而且很有用。想自己用。 由于我的系统正在进行外部调用,我只是通过 fiddler 代理调用。 Fiddler 可以选择生成网络时间线。你基本上看到了这种观点。在为代理请求设置提琴手之后;只需选择您感兴趣的会话,然后单击右侧窗格上的时间线选项卡。谢谢阿南德 【参考方案1】:仍然是 @ JDK 7 的人,他们的 IDE 还没有自动检测 JDK 8 源代码,以及尝试@benjchristensen 上述出色响应(和解释)的内容,可以使用这个无耻的重构 JDK 7 代码。感谢@benjchristensen 的精彩解释和示例!
import java.util.List;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
public class ParallelExecutionExample
public static void main(String[] args)
final long startTime = System.currentTimeMillis();
Observable<Tile> searchTile = getSearchResults("search term")
.doOnSubscribe(new Action0()
@Override
public void call()
logTime("Search started ", startTime);
)
.doOnCompleted(new Action0()
@Override
public void call()
logTime("Search completed ", startTime);
);
Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>()
@Override
public Observable<TileResponse> call(final Tile t)
Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
.doOnCompleted(new Action0()
@Override
public void call()
logTime("getSellerReviews[" + t.id + "] completed ", startTime);
);
Observable<String> imageUrl = getProductImage(t.getProductId())
.doOnCompleted(new Action0()
@Override
public void call()
logTime("getProductImage[" + t.id + "] completed ", startTime);
);
return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>()
@Override
public TileResponse call(Reviews r, String u)
return new TileResponse(t, r, u);
)
.doOnCompleted(new Action0()
@Override
public void call()
logTime("zip[" + t.id + "] completed ", startTime);
);
);
List<TileResponse> allTiles = populatedTiles
.toList()
.doOnCompleted(new Action0()
@Override
public void call()
logTime("All Tiles Completed ", startTime);
)
.toBlocking()
.single();
private static Observable<Tile> getSearchResults(String string)
return mockClient(new Tile(1), new Tile(2), new Tile(3));
private static Observable<Reviews> getSellerReviews(int id)
return mockClient(new Reviews());
private static Observable<String> getProductImage(int id)
return mockClient("image_" + id);
private static void logTime(String message, long startTime)
System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
private static <T> Observable<T> mockClient(final T... ts)
return Observable.create(new Observable.OnSubscribe<T>()
@Override
public void call(Subscriber<? super T> s)
try
Thread.sleep(1000);
catch (Exception e)
for (T t : ts)
s.onNext(t);
s.onCompleted();
)
.subscribeOn(Schedulers.io());
// note the use of subscribeOn to make an otherwise synchronous Observable async
public static class TileResponse
public TileResponse(Tile t, Reviews r, String u)
// store the values
public static class Tile
private final int id;
public Tile(int i)
this.id = i;
public int getSellerId()
return id;
public int getProductId()
return id;
public static class Reviews
【讨论】:
【参考方案2】:并行运算符被证明是几乎所有用例的问题,并且没有达到大多数人期望的效果,因此它在 1.0.0.rc.4 版本中被删除:https://github.com/ReactiveX/RxJava/pull/1716
here 有一个很好的例子来说明如何执行这种类型的行为并获得并行执行。
在您的示例代码中,不清楚searchServiceClient
是同步的还是异步的。它会稍微影响如何解决问题,就好像它已经是异步的,不需要额外的调度。如果需要同步额外调度。
首先这里有一些显示同步和异步行为的简单示例:
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution
public static void main(String[] args)
System.out.println("------------ mergingAsync");
mergingAsync();
System.out.println("------------ mergingSync");
mergingSync();
System.out.println("------------ mergingSyncMadeAsync");
mergingSyncMadeAsync();
System.out.println("------------ flatMapExampleSync");
flatMapExampleSync();
System.out.println("------------ flatMapExampleAsync");
flatMapExampleAsync();
System.out.println("------------");
private static void mergingAsync()
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
private static void mergingSync()
// here you'll see the delay as each is executed synchronously
Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
private static void mergingSyncMadeAsync()
// if you have something synchronous and want to make it async, you can schedule it like this
// so here we see both executed concurrently
Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
private static void flatMapExampleAsync()
Observable.range(0, 5).flatMap(i ->
return getDataAsync(i);
).toBlocking().forEach(System.out::println);
private static void flatMapExampleSync()
Observable.range(0, 5).flatMap(i ->
return getDataSync(i);
).toBlocking().forEach(System.out::println);
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i)
return getDataSync(i).subscribeOn(Schedulers.io());
static Observable<Integer> getDataSync(int i)
return Observable.create((Subscriber<? super Integer> s) ->
// simulate latency
try
Thread.sleep(1000);
catch (Exception e)
e.printStackTrace();
s.onNext(i);
s.onCompleted();
);
以下是尝试提供与您的代码更匹配的示例:
import java.util.List;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecutionExample
public static void main(String[] args)
final long startTime = System.currentTimeMillis();
Observable<Tile> searchTile = getSearchResults("search term")
.doOnSubscribe(() -> logTime("Search started ", startTime))
.doOnCompleted(() -> logTime("Search completed ", startTime));
Observable<TileResponse> populatedTiles = searchTile.flatMap(t ->
Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
.doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
Observable<String> imageUrl = getProductImage(t.getProductId())
.doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));
return Observable.zip(reviews, imageUrl, (r, u) ->
return new TileResponse(t, r, u);
).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
);
List<TileResponse> allTiles = populatedTiles.toList()
.doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
.toBlocking().single();
private static Observable<Tile> getSearchResults(String string)
return mockClient(new Tile(1), new Tile(2), new Tile(3));
private static Observable<Reviews> getSellerReviews(int id)
return mockClient(new Reviews());
private static Observable<String> getProductImage(int id)
return mockClient("image_" + id);
private static void logTime(String message, long startTime)
System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
private static <T> Observable<T> mockClient(T... ts)
return Observable.create((Subscriber<? super T> s) ->
// simulate latency
try
Thread.sleep(1000);
catch (Exception e)
for (T t : ts)
s.onNext(t);
s.onCompleted();
).subscribeOn(Schedulers.io());
// note the use of subscribeOn to make an otherwise synchronous Observable async
public static class TileResponse
public TileResponse(Tile t, Reviews r, String u)
// store the values
public static class Tile
private final int id;
public Tile(int i)
this.id = i;
public int getSellerId()
return id;
public int getProductId()
return id;
public static class Reviews
这个输出:
Search started => 65ms
Search completed => 1094ms
getProductImage[1] completed => 2095ms
getSellerReviews[2] completed => 2095ms
getProductImage[3] completed => 2095ms
zip[1] completed => 2096ms
zip[2] completed => 2096ms
getProductImage[2] completed => 2096ms
getSellerReviews[1] completed => 2096ms
zip[3] completed => 2096ms
All Tiles Completed => 2097ms
getSellerReviews[3] completed => 2097ms
我已将每个 IO 调用模拟为花费 1000 毫秒,因此很明显延迟在哪里并且它是并行发生的。它以经过的毫秒数打印出进度。
这里的技巧是 flatMap 会合并异步调用,所以只要被合并的 Observable 是异步的,它们都会被并发执行。
如果像getProductImage(t.getProductId())
这样的调用是同步的,则可以像这样使其异步:getProductImage(t.getProductId()).subscribeOn(Schedulers.io)。
这是上面示例的重要部分,没有所有的日志记录和样板类型:
Observable<Tile> searchTile = getSearchResults("search term");;
Observable<TileResponse> populatedTiles = searchTile.flatMap(t ->
Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
Observable<String> imageUrl = getProductImage(t.getProductId());
return Observable.zip(reviews, imageUrl, (r, u) ->
return new TileResponse(t, r, u);
);
);
List<TileResponse> allTiles = populatedTiles.toList()
.toBlocking().single();
我希望这会有所帮助。
【讨论】:
感谢@benjchristensen 的精彩回复。它提供了清晰度并解决了我的问题。感谢您还指出 [github.com/benjchristensen/ReactiveLab] 中的示例宝库。将在周末深入研究。 doOnXXX() 方法的目的是什么? @Pangea,我认为这些调用的目的是在事件发生时打印它,因此您可以看到它是并行工作的。 这是一个很好的答案! 我认为toBlocking
不需要像mergingSync
等同步调用。只有在我们进行一些异步调用时才需要它。以上是关于RxJava 并行获取 Observables的主要内容,如果未能解决你的问题,请参考以下文章