java rxjava简化了Schedulers.io()中的List处理,并在主线程中等待结果

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java rxjava简化了Schedulers.io()中的List处理,并在主线程中等待结果相关的知识,希望对你有一定的参考价值。

package com.yuehui.web.util;

import rx.Observable;
import rx.schedulers.Schedulers;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * Created by Administrator on 2016/12/16.
 */
public class RxIOUtil<T,R> {

    public List<R> getResultByMultiBlockIOByOrgOrder(List<T> emmitSrc, Function<T,R> map, Method methodIn, Method methodOut){
        Map<Object,R> sortingMethodReturns = new LinkedHashMap<>();

        //记录原order
        emmitSrc.forEach(s -> {
            try {
                sortingMethodReturns.put(methodIn.invoke(s), null);
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        });

        List<R> allList =  Observable.from(emmitSrc)
                .flatMap(item->Observable.just(item)
                        .map(map::apply)
                        .subscribeOn(Schedulers.io())).toList().toBlocking().first();

        //恢复原order
        allList.forEach(l -> {
            try {
                sortingMethodReturns.put(methodOut.invoke(l), l);
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        });

        return new ArrayList<>(sortingMethodReturns.values());
    }

    public List<R> getResultByMultiBlockIO(List<T> emmitSrc, Function<T,R> function){
        List<R> allList =  Observable.from(emmitSrc)
                .flatMap(item->Observable.just(item)
                        .map(function::apply)
                        .subscribeOn(Schedulers.io())).toList().toBlocking().first();
        return allList;
    }

    public void updateByNewThread(Supplier<T> obj, Consumer<T> consumer){
        Observable.just(obj.get()).observeOn(Schedulers.newThread()).subscribe(o->consumer.accept(o));
    }


}

以上是关于java rxjava简化了Schedulers.io()中的List处理,并在主线程中等待结果的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - Schedulers默认线程池

单元测试 RxJava 超时未订阅

RxJava使用过程中遇到线程相关的坑

RxJava使用过程中遇到线程相关的坑