java rxjava简化了Schedulers.io()中的List处理,并在主线程中等待结果
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java rxjava简化了Schedulers.io()中的List处理,并在主线程中等待结果相关的知识,希望对你有一定的参考价值。
package com.yuehui.web.util;
import rx.Observable;
import rx.schedulers.Schedulers;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Created by Administrator on 2016/12/16.
*/
public class RxIOUtil<T,R> {
public List<R> getResultByMultiBlockIOByOrgOrder(List<T> emmitSrc, Function<T,R> map, Method methodIn, Method methodOut){
Map<Object,R> sortingMethodReturns = new LinkedHashMap<>();
//记录原order
emmitSrc.forEach(s -> {
try {
sortingMethodReturns.put(methodIn.invoke(s), null);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
});
List<R> allList = Observable.from(emmitSrc)
.flatMap(item->Observable.just(item)
.map(map::apply)
.subscribeOn(Schedulers.io())).toList().toBlocking().first();
//恢复原order
allList.forEach(l -> {
try {
sortingMethodReturns.put(methodOut.invoke(l), l);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
});
return new ArrayList<>(sortingMethodReturns.values());
}
public List<R> getResultByMultiBlockIO(List<T> emmitSrc, Function<T,R> function){
List<R> allList = Observable.from(emmitSrc)
.flatMap(item->Observable.just(item)
.map(function::apply)
.subscribeOn(Schedulers.io())).toList().toBlocking().first();
return allList;
}
public void updateByNewThread(Supplier<T> obj, Consumer<T> consumer){
Observable.just(obj.get()).observeOn(Schedulers.newThread()).subscribe(o->consumer.accept(o));
}
}
以上是关于java rxjava简化了Schedulers.io()中的List处理,并在主线程中等待结果的主要内容,如果未能解决你的问题,请参考以下文章
Rxjava 源码解析 - Schedulers默认线程池
Rxjava 源码解析 - Schedulers默认线程池
Rxjava 源码解析 - Schedulers默认线程池
单元测试 RxJava 超时未订阅
RxJava使用过程中遇到线程相关的坑
RxJava使用过程中遇到线程相关的坑