RAC篇(中) - 信号的各种转换和操作

Posted diyigechengxu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RAC篇(中) - 信号的各种转换和操作相关的知识,希望对你有一定的参考价值。

bind函数会返回一个新的信号N。整体思路是对原信号O进行订阅,每当信号O产生一个值就将其转变成一个中间信号M,并马上订阅M, 之后将信号M的输出作为新信号N的输出。

- (RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSignalBindBlock bindingBlock = block();
        [self subscribeNext:^(id x) {
            BOOL stop = NO;
            id signal = bindingBlock(x, &stop);
            [signal subscribeNext:^(id x) {
                [subscriber sendNext:x];
            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{
                [subscriber sendCompleted];
            }];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];
        return nil;
    }];
}

flattenMap是对bind的包装,为bind提供bindBlock。因此flattenMap与bind操作实质上是一样的,都是将原信号传出的值转换成中间信号,同时马上去订阅这个中间信号,之后将中间信号的输出作为新信号的输出。如果原信号是一个信号中的信号(sendNext:一个信号),那么这个原信号的输出值就是一个信号

- (instancetype)flattenMap:(RACStream* (^)(id value))block 
{
    Class class =self.class;
    return[self bind:^{
        return^(id value,BOOL*stop) {
            id stream = block(value) ?: [class empty];
            return stream;
        };
    }];
}

map操作可将原信号输出的数据通过自定义的方法转换成所需的数据, 同时将变化后的数据作为新信号的输出。它实际调用了flattenMap, 只不过中间信号是直接将mapBlock处理的值返回

常用的filter内部也是使用了flattenMap。与map相同,它也是将filter后的结果使用中间信号进行包装并对其进行订阅,之后将中间信号的输出作为新信号的输出,以此来达到输出filter结果的目的。

- (instancetype)map:(id(^)(id value))block
{
    Class class = self.class;

    return[self flattenMap:^(id value) {
        return[class return:block(value)]; // (1)
    };
}

flatten: 该操作主要作用于信号的信号。原信号作为信号的信号,在被订阅时输出的数据必然也是个信号(signalValue),这往往不是我们想要的。当我们执行[O flatten]操作时,因为flatten内部调用了flattenMap,flattenMap里对应的中间信号就是原信号输出signalValue。因此在flatten操作中新信号被订阅时输出的值就是原信号O的各个子信号输出值的集合。 主要用来打平信号的信号。

- (instancetype)flatten
{
    return [self flattenMap:^(RACSignal *signalValue) { // (1)
        return [signalValue]; // (2)
    };
}

 switchToLatest:与flatten相同,其主要目的也是用于”压平”信号的信号。但与flatten不同的是,flatten是在多管线汇聚后,将原信号O的各子信号输出作为新信号N的输出,但switchToLatest仅仅只是将O输出的最新信号L的输出作为N的输出。

- (RACSignal*)switchToLatest 
{
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACMulticastConnection *connection = [self publish];
      // connection.signal 在这里就是一个RACSubject
        [[connection.signal flattenMap:^(RACSignal *signalValue) {// 如果connection的sourceSignal产生了新的值,那么flattenMap中的bind函数的subsribe()就会回调,这里的signalValue就是sourceSignal产生的信号值
       return [signalValue takeUntil:[connection.signal concat:[RACSignal never]]]; 
     }] subscribe:subscriber]; // 将takeUtil的信号产生的值绑定到subscriber上
     RACDisposable *connectionDisposable = [connection connect]; 
     return [RACDisposable disposableWithBlock:^{ }];
  }];
}
# RACSignal class
- (RACMulticastConnection *)publish {
    RACSubject *subject = [RACSubject subject]];
    RACMulticastConnection *connection = [self multicast:subject];
    return connection;
}

- (RACMulticastConnection *)multicast:(RACSubject *)subject {
    RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
    return connection;
}
# RACMulticastConnection class
- (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    self = [super init];
    _sourceSignal = source;
    _signal = subject;
    return self;
}
- (RACDisposable *)connect {
    self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];// 原信号添加一个信号订阅者_signal
  return self.serialDisposable; 
}

 takeUntil: 如果有新的信号进来,那么原来的信号就会dispose

- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {// 如果有新的信号进来,那么原来的信号就会dispose
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        [signalTrigger subscribeNext:^(id _) {
            [subscriber sendCompleted];
        } completed:^{
            [subscriber sendCompleted];
        }];

        [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];
        return nil;
    }];
}

 scanWithStart : 该操作可将上次reduceBlock处理后输出的结果作为参数,传入当次reduceBlock操作,往往用于信号输出值的聚合处理。scanWithStart内部仍然用到了核心操作bind。它会在bindBlock中对value进行操作,同时将上次操作得到的结果running作为参数带入,一旦本次reduceBlock执行完,就将结果保存在running中,以便下次处理时使用,最后再将本次得出的结果用信号包装后,传递出去。

- (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id(^)(id,id,NSUInteger))reduceBlock {
    Class class =self.class;
    return [self bind:^{
        __block id running = startingValue;
        __block NSUIntegerindex = 0;
        return^(id value, BOOL*stop) {
            running = reduceBlock(running, value, index++); // (1)
            return [class return:running]; // (2)
        };
    }];
}

throttle:这个操作接收一个时间间隔interval作为参数,如果Signal发出的next事件之后interval时间内不再发出next事件,那么它返回的Signal会将这个next事件发出。

 

take: 只接收原有信号前几个信号值,信号发送数量大于指定序号的会被忽略

- (__kindof RACStream *)take:(NSUInteger)count {
    Class class = self.class;
    if (count == 0) return class.empty;
    return [[self bind:^{
        __block NSUInteger taken = 0;
        return ^ id (id value, BOOL *stop) {
            if (taken < count) {
                ++taken;
                if (taken == count) *stop = YES;
                return [class return:value];
            } else {
                return nil;
            }
        };
    }]];
}

skip: 过滤原有信号前几个信号值,从指定序号位置开始接收序号

- (__kindof RACStream *)skip:(NSUInteger)skipCount {
    Class class = self.class;
    return [[self bind:^{
        __block NSUInteger skipped = 0;
        return ^(id value, BOOL *stop) {
            if (skipped >= skipCount) return [class return:value];
            skipped++;
            return class.empty;
        };
    }]];
}

startWith: 除了接收原有信号的值,还会在最前面接收一个指定的信号值。

- (__kindof RACStream *)startWith:(id)value {
    return [[self.class return:value] concat:self];
}

repeat: 对原有信号发送了complete前的所有信号值反复接收

- (RACSignal *)repeat {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return subscribeForever(self,
            ^(id x) {
                [subscriber sendNext:x];
            },
            ^(NSError *error, RACDisposable *disposable) {
                [disposable dispose];
                [subscriber sendError:error];
            },
            ^(RACDisposable *disposable) {
                                // 不会处理complete信号
            });
    }] setNameWithFormat:@"[%@] -repeat", self.name];
}

 

static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
    RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
        [signal subscribeNext:next error:^(NSError *e) {
            @autoreleasepool {
                error(e, nil);
            }
            recurse();//[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
        } completed:^{
            @autoreleasepool {
                completed(nil);
            }
            recurse();//[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
    }]; 
  };
  recursiveBlock(
^{
    [[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
  });
  return nil;
}

 

- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
    [self schedule:^{
        void (^reallyReschedule)(void) = ^{
            [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable];
        };
        __block NSLock *lock = [[NSLock alloc] init];
        lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)];
        __block NSUInteger rescheduleCount = 0;
        __block BOOL rescheduleImmediately = NO;
        @autoreleasepool {
            recursiveBlock(^{// 这里会触发信号的重复订阅,并触发[signal sendNext:next error:error complete:complete]
                [lock lock];
                BOOL immediate = rescheduleImmediately;
                if (!immediate) ++rescheduleCount;
                [lock unlock];
                if (immediate) reallyReschedule();
            });
        }
        [lock lock];
        NSUInteger synchronousCount = rescheduleCount;
        rescheduleImmediately = YES;
        [lock unlock];
        for (NSUInteger i = 0; i < synchronousCount; i++) {
            reallyReschedule();
        }
    }];
}

 

retry: 如果原有信号发送了一个error,那么这里会尝试重新订阅原有信号,在重复指定次数后发送失败的信号.常用于网络请求的失败重试机制

- (RACSignal *)retry:(NSInteger)retryCount {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block NSInteger currentRetryCount = 0;
        return subscribeForever(self,
            ^(id x) {
                [subscriber sendNext:x];
            },
            ^(NSError *error, RACDisposable *disposable) {
                if (retryCount == 0 || currentRetryCount < retryCount) {
                    currentRetryCount++;
                    return;
                }

                [disposable dispose];
                [subscriber sendError:error];
            },
            ^(RACDisposable *disposable) {// 会触发完成信号,不会无限订阅
                [disposable dispose];
                [subscriber sendCompleted];
            });
    }];
}

 

aggregateWithStart: reduceBlock: 设一个初始值,对原信号的值进行累加并输出最终值

- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
    return [[[self
        scanWithStart:start reduceWithIndex:reduceBlock]
        startWith:start]
        takeLast:1];
}
- (__kindof RACStream *)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
    Class class = self.class;

    return [self bind:^{
        __block id running = startingValue;
        __block NSUInteger index = 0;

        return ^(id value, BOOL *stop) {
            running = reduceBlock(running, value, index++);
            return [class return:running];
        };
    }];
}
- (RACSignal *)takeLast:(NSUInteger)count {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
        return [self subscribeNext:^(id x) {
            [valuesTaken addObject:x ? : RACTupleNil.tupleNil];

            while (valuesTaken.count > count) {
                [valuesTaken removeObjectAtIndex:0];// 移除之前的所有信号值
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            for (id value in valuesTaken) {
                [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
            }
            [subscriber sendCompleted];
        }];
    }];
}

 delay: 对原信号值延迟输出

throttle: 指定原有信息的输出间隔时间,时间内的信号值会被过滤

concat: 拼接两个信号的输出值为一个整体

- (RACSignal *)concat:(RACSignal *)signal {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [signal subscribe:subscriber];
        }];
        return nil;
    }];
}

zip: 拉链式输出两个信号的值为一个元祖.例如a信号输出[1,2,3,4,5],b信号输出[6,7,8,9],那么zip后输出为[[1,6],[2,7],[3,8],[4,9]]

- (RACSignal *)zipWith:(RACSignal *)signal {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block BOOL selfCompleted = NO;
        NSMutableArray *selfValues = [NSMutableArray array];
        __block BOOL otherCompleted = NO;
        NSMutableArray *otherValues = [NSMutableArray array];
        void (^sendCompletedIfNecessary)(void) = ^{
            @synchronized (selfValues) {
                BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
                BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
                if (selfEmpty || otherEmpty) [subscriber sendCompleted];
            }
        };
        void (^sendNext)(void) = ^{
            @synchronized (selfValues) {
                if (selfValues.count == 0) return;
                if (otherValues.count == 0) return;
                RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
                [selfValues removeObjectAtIndex:0];
                [otherValues removeObjectAtIndex:0];
                [subscriber sendNext:tuple];
                sendCompletedIfNecessary();
            }
        };
        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            @synchronized (selfValues) {
                [selfValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {
                selfCompleted = YES;
                sendCompletedIfNecessary();
            }
        }];
        RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
            @synchronized (selfValues) {
                [otherValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {
                otherCompleted = YES;
                sendCompletedIfNecessary();
            }
        }];
        return [RACDisposable disposableWithBlock:^{
            [selfDisposable dispose];
            [otherDisposable dispose];
        }];
    }];
}

 

 combineLatest: 打包两个信号的输出值,任何一个信号输出变化,都会取出最近两个信号的输出值。例如a信号输出[1,2,3,4,5],b信号输出[6,7,8,9],那么zip后输出为[[1,6],[2,6],[2,7],[3,7],[3,8],[4,8][4,9],[5,9]].(这里可能会有更多的输出)

- (RACSignal *)combineLatestWith:(RACSignal *)signal {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block id lastSelfValue = nil;
        __block BOOL selfCompleted = NO;

        __block id lastOtherValue = nil;
        __block BOOL otherCompleted = NO;
        void (^sendNext)(void) = ^{
            if (lastSelfValue == nil || lastOtherValue == nil) return;
            [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
        };
        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            lastSelfValue = x ?: RACTupleNil.tupleNil;
            sendNext();
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            selfCompleted = YES;
            if (otherCompleted) [subscriber sendCompleted];
        }];
        [signal subscribeNext:^(id x) {
            lastOtherValue = x ?: RACTupleNil.tupleNil;
            sendNext();
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            otherCompleted = YES;
            if (selfCompleted) [subscriber sendCompleted];
        }];
        return nil;
    }];
}

 

RACCommand是RAC很重要的组成部分,通常用来表示某个action的执行。RACCommand提供executionSignals、 executing、 error等一连串公开的信号,方便外界对action执行过程与执行结果进行观察。executionSignals是signal of signals,如果外部直接订阅executionSignals,得到的输出是当前执行的信号,而不是执行信号输出的数据,所以一般会配合flatten或switchToLatest使用。 errors,RACCommand的错误不是通过sendError来实现的,而是通过errors属性传递出来的。 executing,表示该command当前是否正在执行。它常用于监听按钮点击、网络请求等。

以上是关于RAC篇(中) - 信号的各种转换和操作的主要内容,如果未能解决你的问题,请参考以下文章

iOS Reactivecocoa(RAC)知其所以然(源码分析,一篇足以)

RAC篇(上) - RACSignal & RACSubject

iOS开发进阶篇——FRP与ReactiveCocoa的介绍

Arduino: AD模数转换详解和电路搭建以及示例代码

OC + RAC 信号中的信号

最快让你上手ReactiveCocoa之基础篇(简称RAC)