RAC篇(中) - 信号的各种转换和操作
Posted diyigechengxu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RAC篇(中) - 信号的各种转换和操作相关的知识,希望对你有一定的参考价值。
bind函数会返回一个新的信号N。整体思路是对原信号O进行订阅,每当信号O产生一个值就将其转变成一个中间信号M,并马上订阅M, 之后将信号M的输出作为新信号N的输出。
- (RACSignal *)bind:(RACSignalBindBlock (^)(void))block { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { RACSignalBindBlock bindingBlock = block(); [self subscribeNext:^(id x) { BOOL stop = NO; id signal = bindingBlock(x, &stop); [signal subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [subscriber sendCompleted]; }]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [subscriber sendCompleted]; }]; return nil; }]; }
flattenMap是对bind的包装,为bind提供bindBlock。因此flattenMap与bind操作实质上是一样的,都是将原信号传出的值转换成中间信号,同时马上去订阅这个中间信号,之后将中间信号的输出作为新信号的输出。如果原信号是一个信号中的信号(sendNext:一个信号),那么这个原信号的输出值就是一个信号
- (instancetype)flattenMap:(RACStream* (^)(id value))block { Class class =self.class; return[self bind:^{ return^(id value,BOOL*stop) { id stream = block(value) ?: [class empty]; return stream; }; }]; }
map操作可将原信号输出的数据通过自定义的方法转换成所需的数据, 同时将变化后的数据作为新信号的输出。它实际调用了flattenMap, 只不过中间信号是直接将mapBlock处理的值返回
常用的filter内部也是使用了flattenMap。与map相同,它也是将filter后的结果使用中间信号进行包装并对其进行订阅,之后将中间信号的输出作为新信号的输出,以此来达到输出filter结果的目的。
- (instancetype)map:(id(^)(id value))block { Class class = self.class; return[self flattenMap:^(id value) { return[class return:block(value)]; // (1) }; }
flatten: 该操作主要作用于信号的信号。原信号作为信号的信号,在被订阅时输出的数据必然也是个信号(signalValue),这往往不是我们想要的。当我们执行[O flatten]操作时,因为flatten内部调用了flattenMap,flattenMap里对应的中间信号就是原信号输出signalValue。因此在flatten操作中新信号被订阅时输出的值就是原信号O的各个子信号输出值的集合。 主要用来打平信号的信号。
- (instancetype)flatten { return [self flattenMap:^(RACSignal *signalValue) { // (1) return [signalValue]; // (2) }; }
switchToLatest:与flatten相同,其主要目的也是用于”压平”信号的信号。但与flatten不同的是,flatten是在多管线汇聚后,将原信号O的各子信号输出作为新信号N的输出,但switchToLatest仅仅只是将O输出的最新信号L的输出作为N的输出。
- (RACSignal*)switchToLatest { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { RACMulticastConnection *connection = [self publish]; // connection.signal 在这里就是一个RACSubject
[[connection.signal flattenMap:^(RACSignal *signalValue) {// 如果connection的sourceSignal产生了新的值,那么flattenMap中的bind函数的subsribe()就会回调,这里的signalValue就是sourceSignal产生的信号值
return [signalValue takeUntil:[connection.signal concat:[RACSignal never]]];
}] subscribe:subscriber]; // 将takeUtil的信号产生的值绑定到subscriber上
RACDisposable *connectionDisposable = [connection connect];
return [RACDisposable disposableWithBlock:^{ }];
}];
}
# RACSignal class - (RACMulticastConnection *)publish { RACSubject *subject = [RACSubject subject]]; RACMulticastConnection *connection = [self multicast:subject]; return connection; } - (RACMulticastConnection *)multicast:(RACSubject *)subject { RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject]; return connection; } # RACMulticastConnection class - (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject { self = [super init]; _sourceSignal = source; _signal = subject; return self; } - (RACDisposable *)connect { self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];// 原信号添加一个信号订阅者_signal
return self.serialDisposable;
}
takeUntil: 如果有新的信号进来,那么原来的信号就会dispose
- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {// 如果有新的信号进来,那么原来的信号就会dispose return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { [signalTrigger subscribeNext:^(id _) { [subscriber sendCompleted]; } completed:^{ [subscriber sendCompleted]; }]; [self subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [subscriber sendCompleted]; }]; return nil; }]; }
scanWithStart : 该操作可将上次reduceBlock处理后输出的结果作为参数,传入当次reduceBlock操作,往往用于信号输出值的聚合处理。scanWithStart内部仍然用到了核心操作bind。它会在bindBlock中对value进行操作,同时将上次操作得到的结果running作为参数带入,一旦本次reduceBlock执行完,就将结果保存在running中,以便下次处理时使用,最后再将本次得出的结果用信号包装后,传递出去。
- (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id(^)(id,id,NSUInteger))reduceBlock { Class class =self.class; return [self bind:^{ __block id running = startingValue; __block NSUIntegerindex = 0; return^(id value, BOOL*stop) { running = reduceBlock(running, value, index++); // (1) return [class return:running]; // (2) }; }]; }
throttle:这个操作接收一个时间间隔interval作为参数,如果Signal发出的next事件之后interval时间内不再发出next事件,那么它返回的Signal会将这个next事件发出。
take: 只接收原有信号前几个信号值,信号发送数量大于指定序号的会被忽略
- (__kindof RACStream *)take:(NSUInteger)count { Class class = self.class; if (count == 0) return class.empty; return [[self bind:^{ __block NSUInteger taken = 0; return ^ id (id value, BOOL *stop) { if (taken < count) { ++taken; if (taken == count) *stop = YES; return [class return:value]; } else { return nil; } }; }]]; }
skip: 过滤原有信号前几个信号值,从指定序号位置开始接收序号
- (__kindof RACStream *)skip:(NSUInteger)skipCount { Class class = self.class; return [[self bind:^{ __block NSUInteger skipped = 0; return ^(id value, BOOL *stop) { if (skipped >= skipCount) return [class return:value]; skipped++; return class.empty; }; }]]; }
startWith: 除了接收原有信号的值,还会在最前面接收一个指定的信号值。
- (__kindof RACStream *)startWith:(id)value { return [[self.class return:value] concat:self]; }
repeat: 对原有信号发送了complete前的所有信号值反复接收
- (RACSignal *)repeat { return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { return subscribeForever(self, ^(id x) { [subscriber sendNext:x]; }, ^(NSError *error, RACDisposable *disposable) { [disposable dispose]; [subscriber sendError:error]; }, ^(RACDisposable *disposable) { // 不会处理complete信号 }); }] setNameWithFormat:@"[%@] -repeat", self.name]; }
static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) { RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) { [signal subscribeNext:next error:^(NSError *e) { @autoreleasepool { error(e, nil); } recurse();//[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
} completed:^{ @autoreleasepool { completed(nil);
}
recurse();//[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
}];
};
recursiveBlock(^{
[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
});
return nil;
}
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable { [self schedule:^{ void (^reallyReschedule)(void) = ^{ [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable]; }; __block NSLock *lock = [[NSLock alloc] init]; lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)]; __block NSUInteger rescheduleCount = 0; __block BOOL rescheduleImmediately = NO; @autoreleasepool { recursiveBlock(^{// 这里会触发信号的重复订阅,并触发[signal sendNext:next error:error complete:complete] [lock lock]; BOOL immediate = rescheduleImmediately; if (!immediate) ++rescheduleCount; [lock unlock]; if (immediate) reallyReschedule(); }); } [lock lock]; NSUInteger synchronousCount = rescheduleCount; rescheduleImmediately = YES; [lock unlock]; for (NSUInteger i = 0; i < synchronousCount; i++) { reallyReschedule(); } }]; }
retry: 如果原有信号发送了一个error,那么这里会尝试重新订阅原有信号,在重复指定次数后发送失败的信号.常用于网络请求的失败重试机制
- (RACSignal *)retry:(NSInteger)retryCount { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { __block NSInteger currentRetryCount = 0; return subscribeForever(self, ^(id x) { [subscriber sendNext:x]; }, ^(NSError *error, RACDisposable *disposable) { if (retryCount == 0 || currentRetryCount < retryCount) { currentRetryCount++; return; } [disposable dispose]; [subscriber sendError:error]; }, ^(RACDisposable *disposable) {// 会触发完成信号,不会无限订阅 [disposable dispose]; [subscriber sendCompleted]; }); }]; }
aggregateWithStart: reduceBlock: 设一个初始值,对原信号的值进行累加并输出最终值
- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock { return [[[self scanWithStart:start reduceWithIndex:reduceBlock] startWith:start] takeLast:1]; }
- (__kindof RACStream *)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock { Class class = self.class; return [self bind:^{ __block id running = startingValue; __block NSUInteger index = 0; return ^(id value, BOOL *stop) { running = reduceBlock(running, value, index++); return [class return:running]; }; }]; }
- (RACSignal *)takeLast:(NSUInteger)count { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count]; return [self subscribeNext:^(id x) { [valuesTaken addObject:x ? : RACTupleNil.tupleNil]; while (valuesTaken.count > count) { [valuesTaken removeObjectAtIndex:0];// 移除之前的所有信号值 } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ for (id value in valuesTaken) { [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value]; } [subscriber sendCompleted]; }]; }]; }
delay: 对原信号值延迟输出
throttle: 指定原有信息的输出间隔时间,时间内的信号值会被过滤
concat: 拼接两个信号的输出值为一个整体
- (RACSignal *)concat:(RACSignal *)signal { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { [self subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [signal subscribe:subscriber]; }]; return nil; }]; }
zip: 拉链式输出两个信号的值为一个元祖.例如a信号输出[1,2,3,4,5],b信号输出[6,7,8,9],那么zip后输出为[[1,6],[2,7],[3,8],[4,9]]
- (RACSignal *)zipWith:(RACSignal *)signal { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { __block BOOL selfCompleted = NO; NSMutableArray *selfValues = [NSMutableArray array]; __block BOOL otherCompleted = NO; NSMutableArray *otherValues = [NSMutableArray array]; void (^sendCompletedIfNecessary)(void) = ^{ @synchronized (selfValues) { BOOL selfEmpty = (selfCompleted && selfValues.count == 0); BOOL otherEmpty = (otherCompleted && otherValues.count == 0); if (selfEmpty || otherEmpty) [subscriber sendCompleted]; } }; void (^sendNext)(void) = ^{ @synchronized (selfValues) { if (selfValues.count == 0) return; if (otherValues.count == 0) return; RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]); [selfValues removeObjectAtIndex:0]; [otherValues removeObjectAtIndex:0]; [subscriber sendNext:tuple]; sendCompletedIfNecessary(); } }; RACDisposable *selfDisposable = [self subscribeNext:^(id x) { @synchronized (selfValues) { [selfValues addObject:x ?: RACTupleNil.tupleNil]; sendNext(); } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ @synchronized (selfValues) { selfCompleted = YES; sendCompletedIfNecessary(); } }]; RACDisposable *otherDisposable = [signal subscribeNext:^(id x) { @synchronized (selfValues) { [otherValues addObject:x ?: RACTupleNil.tupleNil]; sendNext(); } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ @synchronized (selfValues) { otherCompleted = YES; sendCompletedIfNecessary(); } }]; return [RACDisposable disposableWithBlock:^{ [selfDisposable dispose]; [otherDisposable dispose]; }]; }]; }
combineLatest: 打包两个信号的输出值,任何一个信号输出变化,都会取出最近两个信号的输出值。例如a信号输出[1,2,3,4,5],b信号输出[6,7,8,9],那么zip后输出为[[1,6],[2,6],[2,7],[3,7],[3,8],[4,8][4,9],[5,9]].(这里可能会有更多的输出)
- (RACSignal *)combineLatestWith:(RACSignal *)signal { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { __block id lastSelfValue = nil; __block BOOL selfCompleted = NO; __block id lastOtherValue = nil; __block BOOL otherCompleted = NO; void (^sendNext)(void) = ^{ if (lastSelfValue == nil || lastOtherValue == nil) return; [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)]; }; RACDisposable *selfDisposable = [self subscribeNext:^(id x) { lastSelfValue = x ?: RACTupleNil.tupleNil; sendNext(); } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ selfCompleted = YES; if (otherCompleted) [subscriber sendCompleted]; }]; [signal subscribeNext:^(id x) { lastOtherValue = x ?: RACTupleNil.tupleNil; sendNext(); } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ otherCompleted = YES; if (selfCompleted) [subscriber sendCompleted]; }]; return nil; }]; }
RACCommand是RAC很重要的组成部分,通常用来表示某个action的执行。RACCommand提供executionSignals、 executing、 error等一连串公开的信号,方便外界对action执行过程与执行结果进行观察。executionSignals是signal of signals,如果外部直接订阅executionSignals,得到的输出是当前执行的信号,而不是执行信号输出的数据,所以一般会配合flatten或switchToLatest使用。 errors,RACCommand的错误不是通过sendError来实现的,而是通过errors属性传递出来的。 executing,表示该command当前是否正在执行。它常用于监听按钮点击、网络请求等。
以上是关于RAC篇(中) - 信号的各种转换和操作的主要内容,如果未能解决你的问题,请参考以下文章
iOS Reactivecocoa(RAC)知其所以然(源码分析,一篇足以)
RAC篇(上) - RACSignal & RACSubject