RACSignal 所有变换操作底层实现分析
Posted ZH952016281
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RACSignal 所有变换操作底层实现分析相关的知识,希望对你有一定的参考价值。
之前详细分析了RACSignal是创建和订阅的详细过程。看到底层源码实现后,就能发现,ReactiveCocoa这个FRP的库,实现响应式(RP)是用Block闭包来实现的,而并不是用KVC / KVO实现的。
在ReactiveCocoa整个库中,RACSignal占据着比较重要的位置,而RACSignal的变换操作更是整个RACStream流操作核心之一。在上篇文章中也详细分析了bind操作的实现。RACsignal很多变换操作都是基于bind操作来实现的。在开始本篇底层实现分析之前,先简单回顾一下上篇文章中分析的bind函数,这是这篇文章分析的基础。
bind函数可以简单的缩写成下面这样子。
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block;
return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber)
RACStreamBindBlock bindBlock = block();
[self subscribeNext:^(id x) //(1)
BOOL stop = NO;
RACSignal *signal = (RACSignal *)bindBlock(x, &stop); //(2)
if (signal == nil || stop)
[subscriber sendCompleted];
else
[signal subscribeNext:^(id x)
[subscriber sendNext:x]; //(3)
error:^(NSError *error)
[subscriber sendError:error];
completed:^
];
error:^(NSError *error)
[subscriber sendError:error];
completed:^
[subscriber sendCompleted];
];
return nil;
];
当bind变换之后的信号被订阅,就开始执行bind函数中return的block闭包。
- 在bind闭包中,先订阅原先的信号A。
- 在订阅原信号A的didSubscribe闭包中进行信号变换,变换中用到的block闭包是外部传递进来的,也就是bind函数的入参。变换结束,得到新的信号B
- 订阅新的信号B,拿到bind变化之后的信号的订阅者subscriber,对其发送新的信号值。
简要的过程如上图,bind函数中进行了2次订阅的操作,第一次订阅是为了拿到signalA的值,第二次订阅是为了把signalB的新值发给bind变换之后得到的signalB的订阅者。
回顾完bind底层实现之后,就可以开始继续本篇文章的分析了。
目录
- 1.变换操作
- 2.时间操作
一.变换操作
我们都知道RACSignal是继承自RACStream的,而在底层的RACStream上也定义了一些基本的信号变换的操作,所以这些操作在RACSignal上同样适用。如果在RACsignal中没有重写这些方法,那么调用这些操作,实际是调用的父类RACStream的操作。下面分析的时候,会把实际调用父类RACStream的操作的地方都标注出来。
1.Map: (在父类RACStream中定义的)
map操作一般是用来信号变换的。
RACSignal *signalB = [signalA map:^id(NSNumber *value)
return @([value intValue] * 10);
];
来看看底层是如何实现的。
- (instancetype)map:(id (^)(id value))block
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^(id value)
return [class return:block(value)];
] setNameWithFormat:@"[%@] -map:", self.name];
这里实现代码比较严谨,先判断self的类型。因为RACStream的子类中会有一些子类会重写这些方法,所以需要判断self的类型,在回调中可以回调到原类型的方法中去。
由于本篇文章中我们都分析RACSignal的操作,所以这里的self.class是RACDynamicSignal类型的。相应的在return返回值中也返回class,即RACDynamicSignal类型的信号。
从map实现代码上来看,map实现是用了flattenMap函数来实现的。把map的入参闭包,放到了flattenMap的返回值中。
在来看看flattenMap的实现:
- (instancetype)flattenMap:(RACStream * (^)(id value))block
Class class = self.class;
return [[self bind:^
return ^(id value, BOOL *stop)
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
;
] setNameWithFormat:@"[%@] -flattenMap:", self.name];
flattenMap算是对bind函数的一种封装。bind函数的入参是一个RACStreamBindBlock类型的闭包。而flattenMap函数的入参是一个value,返回值RACStream类型的闭包。
在flattenMap中,返回block(value)的信号,如果信号为nil,则返回[class empty]。
先来看看为空的情况。当block(value)为空,返回[RACEmptySignal empty],empty就是创建了一个RACEmptySignal类型的信号:
+ (RACSignal *)empty
#ifdef DEBUG
// Create multiple instances of this class in DEBUG so users can set custom
// names on each.
return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
static id singleton;
static dispatch_once_t pred;
dispatch_once(&pred, ^
singleton = [[self alloc] init];
);
return singleton;
#endif
RACEmptySignal类型的信号又是什么呢?
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^
[subscriber sendCompleted];
];
RACEmptySignal是RACSignal的子类,一旦订阅它,它就会同步的发送completed完成信号给订阅者。
所以flattenMap返回一个信号,如果信号不存在,就返回一个completed完成信号给订阅者。
再来看看flattenMap返回的信号是怎么变换的。
block(value)会把原信号发送过来的value传递给flattenMap的入参。flattenMap的入参是一个闭包,闭包的参数也是value的:
^(id value) return [class return:block(value)];
这个闭包返回一个信号,信号类型和原信号的类型一样,即RACDynamicSignal类型的,值是block(value)。这里的闭包是外面map传进来的:
^id(NSNumber *value) return @([value intValue] * 10);
在这个闭包中把原信号的value值传进去进行变换,变换结束之后,包装成和原信号相同类型的信号,返回。返回的信号作为bind函数的闭包的返回值。这样订阅新的map之后的信号就会拿到变换之后的值。
2.MapReplace: (在父类RACStream中定义的)
一般用法如下:
RACSignal *signalB = [signalA mapReplace:@"A"];
效果是不管A信号发送什么值,都替换成@“A”。
- (instancetype)mapReplace:(id)object
return [[self map:^(id _)
return object;
] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, [object rac_description]];
看底层源码就知道,它并不去关心原信号发送的是什么值,原信号发送什么值,都返回入参object的值。
3.reduceEach: (在父类RACStream中定义的)
reduce是减少,聚合在一起的意思,reduceEach就是每个信号内部都聚合在一起。
RACSignal *signalB = [signalA reduceEach:^id(NSNumber *num1 , NSNumber *num2)
return @([num1 intValue] + [num2 intValue]);
];
reduceEach后面比如传入一个元组RACTuple类型,否则会报错。
- (instancetype)reduceEach:(id (^)())reduceBlock
NSCParameterAssert(reduceBlock != nil);
__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t)
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
] setNameWithFormat:@"[%@] -reduceEach:", self.name];
这里有两个断言,一个是判断传入的reduceBlock闭包是否为空,另一个断言是判断闭包的入参是否是RACTuple类型的。
@interface RACBlockTrampoline : NSObject
@property (nonatomic, readonly, copy) id block;
+ (id)invokeBlock:(id)block withArguments:(RACTuple *)arguments;
@end
RACBlockTrampoline就是一个保存了一个block闭包的对象,它会根据传进来的参数,动态的构造一个NSInvocation,并执行。
reduceEach把入参reduceBlock作为RACBlockTrampoline的入参invokeBlock传进去,以及每个RACTuple也传到RACBlockTrampoline中。
- (id)invokeWithArguments:(RACTuple *)arguments
SEL selector = [self selectorForArgumentCount:arguments.count];
NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]];
invocation.selector = selector;
invocation.target = self;
for (NSUInteger i = 0; i < arguments.count; i++)
id arg = arguments[i];
NSInteger argIndex = (NSInteger)(i + 2);
[invocation setArgument:&arg atIndex:argIndex];
[invocation invoke];
__unsafe_unretained id returnVal;
[invocation getReturnValue:&returnVal];
return returnVal;
第一步就是先计算入参一个元组RACTuple里面元素的个数。
- (SEL)selectorForArgumentCount:(NSUInteger)count
NSCParameterAssert(count > 0);
switch (count)
case 0: return NULL;
case 1: return @selector(performWith:);
case 2: return @selector(performWith::);
case 3: return @selector(performWith:::);
case 4: return @selector(performWith::::);
case 5: return @selector(performWith:::::);
case 6: return @selector(performWith::::::);
case 7: return @selector(performWith:::::::);
case 8: return @selector(performWith::::::::);
case 9: return @selector(performWith:::::::::);
case 10: return @selector(performWith::::::::::);
case 11: return @selector(performWith:::::::::::);
case 12: return @selector(performWith::::::::::::);
case 13: return @selector(performWith:::::::::::::);
case 14: return @selector(performWith::::::::::::::);
case 15: return @selector(performWith:::::::::::::::);
NSCAssert(NO, @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported.");
return NULL;
可以看到最多支持元组内元素的个数是15个。
这里我们假设以元组里面有2个元素为例。
- (id)performWith:(id)obj1 :(id)obj2
id (^block)(id, id) = self.block;
return block(obj1, obj2);
对应的Type Encoding如下:
argument | return value | 0 | 1 | 2 | 3 |
---|---|---|---|---|---|
methodSignature | @ | @ | : | @ | @ |
argument0和argument1分别对应着隐藏参数self和_cmd,所以对应着的类型是@和:,从argument2开始,就是入参的Type Encoding了。
所以在构造invocation的参数的时候,argIndex是要偏移2个位置的。即从(i + 2)开始设置参数。
当动态构造了一个invocation方法之后,[invocation invoke]调用这个动态方法,也就是执行了外部的reduceBlock闭包,闭包里面是我们想要信号变换的规则。
闭包执行结束得到returnVal返回值。这个返回值就是整个RACBlockTrampoline的返回值了。这个返回值也作为了map闭包里面的返回值。
接下去的操作就完全转换成了map的操作了。上面已经分析过map操作了,这里就不赘述了。
4. reduceApply
举个例子:
RACSignal *signalA = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
id block = ^id(NSNumber *first,NSNumber *second,NSNumber *third)
return @(first.integerValue + second.integerValue * third.integerValue);
;
[subscriber sendNext:RACTuplePack(block,@2 , @3 , @8)];
[subscriber sendNext:RACTuplePack((id)(^id(NSNumber *x)return @(x.intValue * 10);),@9,@10,@30)];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^
NSLog(@"signal dispose");
];
];
RACSignal *signalB = [signalA reduceApply];
使用reduceApply的条件也是需要信号里面的值是元组RACTuple。不过这里和reduceEach不同的是,原信号中每个元祖RACTuple的第0位必须要为一个闭包,后面n位为这个闭包的入参,第0位的闭包有几个参数,后面就需要跟几个参数。
如上述例子中,第一个元组第0位的闭包有3个参数,所以第一个元组后面还要跟3个参数。第二个元组的第0位的闭包只有一个参数,所以后面只需要跟一个参数。
当然后面可以跟更多的参数,如第二个元组,闭包后面跟了3个参数,但是只有第一个参数是有效值,后面那2个参数是无效不起作用的。唯一需要注意的就是后面跟的参数个数一定不能少于第0位闭包入参的个数,否则就会报错。
上面例子输出
26 // 26 = 2 + 3 * 8;
90 // 90 = 9 * 10;
看看底层实现:
- (RACSignal *)reduceApply
return [[self map:^(RACTuple *tuple)
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-reduceApply must only be used on a signal of RACTuples. Instead, received: %@", tuple);
NSCAssert(tuple.count > 1, @"-reduceApply must only be used on a signal of RACTuples, with at least a block in tuple[0] and its first argument in tuple[1]");
// We can't use -array, because we need to preserve RACTupleNil
NSMutableArray *tupleArray = [NSMutableArray arrayWithCapacity:tuple.count];
for (id val in tuple)
[tupleArray addObject:val];
RACTuple *arguments = [RACTuple tupleWithObjectsFromArray:[tupleArray subarrayWithRange:NSMakeRange(1, tupleArray.count - 1)]];
return [RACBlockTrampoline invokeBlock:tuple[0] withArguments:arguments];
] setNameWithFormat:@"[%@] -reduceApply", self.name];
这里也有2个断言,第一个是确保传入的参数是RACTuple类型,第二个断言是确保元组RACTuple里面的元素各种至少是2个。因为下面取参数是直接从1号位开始取的。
reduceApply做的事情和reduceEach基本是等效的,只不过变换规则的block闭包一个是外部传进去的,一个是直接打包在每个信号元组RACTuple中第0位中。
5. materialize
这个方法会把信号包装成RACEvent类型。
- (RACSignal *)materialize
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber)
return [self subscribeNext:^(id x)
[subscriber sendNext:[RACEvent eventWithValue:x]];
error:^(NSError *error)
[subscriber sendNext:[RACEvent eventWithError:error]];
[subscriber sendCompleted];
completed:^
[subscriber sendNext:RACEvent.completedEvent];
[subscriber sendCompleted];
];
] setNameWithFormat:@"[%@] -materialize", self.name];
sendNext会包装成[RACEvent eventWithValue:x],error会包装成[RACEvent eventWithError:error],completed会被包装成RACEvent.completedEvent。注意,当原信号error和completed,新信号都会发送sendCompleted。
6. dematerialize
这个操作是materialize的逆向操作。它会把包装成RACEvent信号重新还原为正常的值信号。
- (RACSignal *)dematerialize
return [[self bind:^
return ^(RACEvent *event, BOOL *stop)
switch (event.eventType)
case RACEventTypeCompleted:
*stop = YES;
return [RACSignal empty];
case RACEventTypeError:
*stop = YES;
return [RACSignal error:event.error];
case RACEventTypeNext:
return [RACSignal return:event.value];
;
] setNameWithFormat:@"[%@] -dematerialize", self.name];
这里的实现也用到了bind函数,它会把原信号进行一个变换。新的信号会根据event.eventType进行转换。RACEventTypeCompleted被转换成[RACSignal empty],RACEventTypeError被转换成[RACSignal error:event.error],RACEventTypeNext被转换成[RACSignal return:event.value]。
7. not
- (RACSignal *)not
return [[self map:^(NSNumber *value)
NSCAssert([value isKindOfClass:NSNumber.class], @"-not must only be used on a signal of NSNumbers. Instead, got: %@", value);
return @(!value.boolValue);
] setNameWithFormat:@"[%@] -not", self.name];
not操作需要传入的值都是NSNumber类型的。不是NSNumber类型会报错。not操作会把每个NSNumber按照BOOL的规则,取非,当成新信号的值。
8. and
- (RACSignal *)and
return [[self map:^(RACTuple *tuple)
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
NSCAssert(tuple.count > 0, @"-and must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
return @([tuple.rac_sequence all:^(NSNumber *number)
NSCAssert([number isKindOfClass:NSNumber.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
return number.boolValue;
]);
] setNameWithFormat:@"[%@] -and", self.name];
and操作需要原信号的每个信号都是元组RACTuple类型的,因为只有这样,RACTuple类型里面的每个元素的值才能进行&运算。
and操作里面有3处断言。第一处,判断入参是不是元组RACTuple类型的。第二处,判断RACTuple类型里面至少包含一个NSNumber。第三处,判断RACTuple里面是否都是NSNumber类型,有一个不符合,都会报错。
- (RACSequence *)rac_sequence
return [RACTupleSequence sequenceWithTupleBackingArray:self.backingArray offset:0];
RACTuple类型先转换成RACTupleSequence。
+ (instancetype)sequenceWithTupleBackingArray:(NSArray *)backingArray offset:(NSUInteger)offset
NSCParameterAssert(offset <= backingArray.count);
if (offset == backingArray.count) return self.empty;
RACTupleSequence *seq = [[self alloc] init];
seq->_tupleBackingArray = backingArray;
seq->_offset = offset;
return seq;
backingArray是一个数组NSArry。这里关于RACTupleSequence和RACTuple会在以后的文章中详细分析,本篇以分析RACSignal为主。
RACTuple类型先转换成RACTupleSequence,即存成了一个数组。
- (BOOL)all:(BOOL (^)(id))block
NSCParameterAssert(block != NULL);
NSNumber *result = [self foldLeftWithStart:@YES reduce:^(NSNumber *accumulator, id value)
return @(accumulator.boolValue && block(value));
];
return result.boolValue;
- (id)foldLeftWithStart:(id)start reduce:(id (^)(id, id))reduce
NSCParameterAssert(reduce != NULL);
if (self.head == nil) return start;
for (id value in self)
start = reduce(start, value);
return start;
for会遍历RACSequence里面存的每一个值,分别都去调用reduce( )闭包。start的初始值为YES。reduce( )闭包是:
^(NSNumber *accumulator, id value) return @(accumulator.boolValue && block(value));
这里又会去调用block( )闭包:
^(NSNumber *number) return number.boolValue;
number是原信号RACTuple的第一个值。第一次循环reduce( )闭包是拿YES和原信号RACTuple的第一个值进行&计算。第二个循环reduce( )闭包是拿原信号RACTuple的第一个值和第二个值进行&计算,得到的值参与下一次循环,与第三个值进行&计算,如此下去。这也是折叠函数的意思,foldLeft从左边开始折叠。fold函数会从左至右,把RACTuple转换成的数组里面每个值都一个接着一个进行&计算。
每个RACTuple都map成这样的一个BOOL值。接下去信号就map成了一个新的信号。
9. or
- (RACSignal *)or
return [[self map:^(RACTuple *tuple)
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
return @([tuple.rac_sequence any:^(NSNumber *number)
NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
return number.boolValue;
]);
] setNameWithFormat:@"[%@] -or", self.name];
or操作的实现和and操作的实现大体类似。3处断言的作用和and操作完全一致,这里就不再赘述了。or操作的重点在any函数的实现上。or操作的入参也必须是RACTuple类型的。
- (BOOL)any:(BOOL (^)(id))block
NSCParameterAssert(block != NULL);
return [self objectPassingTest:block] != nil;
- (id)objectPassingTest:(BOOL (^)(id))block
NSCParameterAssert(block != NULL);
return [self filter:block].head;
- (instancetype)filter:(BOOL (^)(id value))block
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value)
if (block(value))
return [class return:value];
else
return class.empty;
] setNameWithFormat:@"[%@] -filter:", self.name];
any会依次判断RACTupleSequence数组里面的值,依次每个进行filter。如果value对应的BOOL值是YES,就转换成一个RACTupleSequence信号。如果对应的是NO,则转换成一个empty信号。
只要RACTuple为NO,就一直返回empty信号,直到BOOL值为YES,就返回1。map变换信号后变成成1。找到了YES之后的值就不会再判断了。如果没有找到YES,中间都是NO的话,一直遍历到数组最后一个,信号只能返回0。
10. any:
- (RACSignal *)any:(BOOL (^)(id object))predicateBlock
NSCParameterAssert(predicateBlock != NULL);
return [[[self materialize] bind:^
return ^(RACEvent *event, BOOL *stop)
if (event.finished)
*stop = YES;
return [RACSignal return:@NO];
if (predicateBlock(event.value))
*stop = YES;
return [RACSignal return:@YES];
return [RACSignal empty];
;
] setNameWithFormat:@"[%@] -any:", self.name];
原信号会先经过materialize转换包装成RACEvent事件。依次判断predicateBlock(event.value)值的BOOL值,如果返回YES,就包装成RACSignal的新信号,发送YES出去,并且stop接下来的信号。如果返回MO,就返回[RACSignal empty]空信号。直到event.finished,返回[RACSignal return:@NO]。
所以any:操作的目的是找到第一个满足predicateBlock条件的值。找到了就返回YES的RACSignal的信号,如果没有找到,返回NO的RACSignal。
11. any
- (RACSignal *)any
return [[self any:^(id x)
return YES;
] setNameWithFormat:@"[%@] -any", self.name];
any操作是any:操作中的一种情况。即predicateBlock闭包永远都返回YES,所以any操作之后永远都只能得到一个只发送一个YES的新信号。
12. all:
- (RACSignal *)all:(BOOL (^)(id object))predicateBlock
NSCParameterAssert(predicateBlock != NULL);
return [[[self materialize] bind:^
return ^(RACEvent *event, BOOL *stop)
if (event.eventType == RACEventTypeCompleted)
*stop = YES;
return [RACSignal return:@YES];
if (event.eventType == RACEventTypeError || !predicateBlock(event.value))
*stop = YES;
return [RACSignal return:@NO];
return [RACSignal empty];
;
] setNameWithFormat:@"[%@] -all:", self.name];
all:操作和any:有点类似。原信号会先经过materialize转换包装成RACEvent事件。对原信号发送的每个信号都依次判断predicateBlock(event.value)是否是NO 或者event.eventType == RACEventTypeError。如果predicateBlock(event.value)返回NO或者出现了错误,新的信号都返回NO。如果一直都没出现问题,在RACEventTypeCompleted的时候发送YES。
all:可以用来判断整个原信号发送过程中是否有错误事件RACEventTypeError,或者是否存在predicateBlock为NO的情况。可以把predicateBlock设置成一个正确条件。如果原信号出现错误事件,或者不满足设置的错误条件,都会发送新信号返回NO。如果全过程都没有出错,或者都满足predicateBlock设置的条件,则一直到RACEventTypeCompleted,发送YES的新信号。
13. repeat
- (RACSignal *)repeat
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber)
return subscribeForever(self,
^(id x)
[subscriber sendNext:x];
,
^(NSError *error, RACDisposable *disposable)
[disposable dispose];
[subscriber sendError:error];
,
^(RACDisposable *disposable)
// Resubscribe.
);
] setNameWithFormat:@"[%@] -repeat", self.name];
repeat操作返回一个subscribeForever闭包,闭包里面要传入4个参数。
static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *))
next = [next copy];
error = [error copy];
completed = [completed copy];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void))
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[compoundDisposable addDisposable:selfDisposable];
__weak RACDisposable *weakSelfDisposable = selfDisposable;
RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e)
@autoreleasepool
error(e, compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
recurse();
completed:^
@autoreleasepool
completed(compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
recurse();
];
[selfDisposable addDisposable:subscriptionDisposable];
;
// Subscribe once immediately, and then use recursive scheduling for any
// further resubscriptions.
recursiveBlock(^
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
[compoundDisposable addDisposable:schedulingDisposable];
);
return compoundDisposable;
subscribeForever有4个参数,第一个参数是原信号,第二个是传入的next闭包,第三个是error闭包,最后一个是completed闭包。
subscribeForever一进入这个函数就会调用recursiveBlock( )闭包,闭包中有一个recurse( )的入参的参数。在recursiveBlock( )闭包中对原信号RACSignal进行订阅。next,error,completed分别会先调用传进来的闭包。然后error,completed执行完error( )和completed( )闭包之后,还会继续再执行recurse( ),recurse( )是recursiveBlock的入参。
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
[self scheduleRecursiveBlock:[recursiveBlock copy] addingToDisposable:disposable];
return disposable;
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable
@autoreleasepool
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[disposable addDisposable:selfDisposable];
__weak RACDisposable *weakSelfDisposable = selfDisposable;
RACDisposable *schedulingDisposable = [self schedule:^ // 此处省略 ];
[selfDisposable addDisposable:schedulingDisposable];
先取到当前的currentScheduler,即recursiveScheduler,执行scheduleRecursiveBlock,在这个函数中,会调用schedule函数。这里的recursiveScheduler是RACQueueScheduler类型的。
- (RACDisposable *)schedule:(void (^)(void))block
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue, ^
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
);
return disposable;
如果原信号没有disposed,dispatch_async会继续执行block,在这个block中还会继续原信号的发送。所以原信号只要没有error信号,disposable.disposed就不会返回YES,就会一直调用block。所以在subscribeForever的error和completed的最后都会调用recurse( )闭包。error调用recurse( )闭包是为了结束调用block,结束所有的信号。completed调用recurse( )闭包是为了继续调用block( )闭包,也就是repeat的本质。原信号会继续发送信号,如此无限循环下去。
14. retry:
- (RACSignal *)retry:(NSInteger)retryCount
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber)
__block NSInteger currentRetryCount = 0;
return subscribeForever(self,
^(id x)
[subscriber sendNext:x];
,
^(NSError *error, RACDisposable *disposable)
if (retryCount == 0 || currentRetryCount < retryCount)
// Resubscribe.
currentRetryCount++;
return;
[disposable dispose];
[subscriber sendError:error];
,
^(RACDisposable *disposable)
[disposable dispose];
[subscriber sendCompleted];
);
] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
在retry:的实现中,和repeat实现的区别是中间加入了一个currentRetryCount值。如果currentRetryCount > retryCount的话,就会在error中调用[disposable dispose],这样subscribeForever就不会再无限循环下去了。
所以retry:操作的用途就是在原信号在出现error的时候,重试retryCount的次数,如果依旧error,那么就会停止重试。
如果原信号没有发生错误,那么原信号在发送结束,subscribeForever也就结束了。retry:操作对于没有任何error的信号相当于什么都没有发生。
15. retry
- (RACSignal *)retry
return [[self retry:0] setNameWithFormat:@"[%@] -retry", self.name];
这里的retry操作就是一个无限重试的操作。因为retryCount设置成0之后,在error的闭包中中,retryCount 永远等于 0,原信号永远都不会被dispose,所以subscribeForever会一直无限重试下去。
同样的,如果对一个没有error的信号调用retry操作,也是不起任何作用的。
16. scanWithStart: reduceWithIndex: (在父类RACStream中定义的)
先写出测试代码:
RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber)
[subscriber sendNext:@1];
[subscriber sendNext:@1];
[subscriber sendNext:@4];
return [RACDisposable disposableWithBlock:^
];
];
RACSignal *signalB = [signalA scanWithStart:@(2) reduceWithIndex:^id(NSNumber * running, NSNumber * next, NSUInteger index)
return @(running.intValue * next.intValue + index);
];
2 // 2 * 1 + 0 = 2
3 // 2 * 1 + 1 = 3
14 // 3 * 4 + 2 = 14
- (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock
NSCParameterAssert(reduceBlock != nil);
Class class = self.class;
return [[self bind:^
__block id running = startingValue;
__block NSUInteger index = 0;
return ^(id value, BOOL *stop)
running = reduceBlock(running, value, index++);
return [class return:running];
;
] setNameWithFormat:@"[%@] -scanWithStart: %@ reduceWithIndex:", self.name, [startingValue rac_description]];
scanWithStart这个变换由初始值,变换函数reduceBlock( ),和index步进的变量组成。原信号的每个信号都会由变换函数reduceBlock( )进行变换。index每次都是自增。变换的初始值是由入参startingValue传入的。
17. scanWithStart: reduce: (在父类RACStream中定义的)
- (instancetype)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock
NSCParameterAssert(reduceBlock != nil);
return [[self
scanWithStart:startingValue
reduceWithIndex:^(id running, id next, NSUInteger index)
return reduceBlock(running, next);
]
setNameWithFormat:@"[%@] -scanWithStart: %@ reduce:", self.name, [startingValue rac_description]];
scanWithStart: reduce:就是scanWithStart: reduceWithIndex: 的缩略版。变换函数也是外面闭包reduceBlock( )传进来的。只不过变换过程中不会使用index自增的这个变量。
18. aggregateWithStart: reduceWithIndex:
- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock
return [[[[self
scanWithStart:start reduceWithIndex:reduceBlock]
startWith:start]
takeLast:1]
setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, [start rac_description]];
aggregate是合计的意思。所以最后变换出来的信号只有最后一个值。
aggregateWithStart: reduceWithIndex:操作调用了scanWithStart: reduceWithIndex:,原理和它完全一致。不同的是多了两步额外的操作,一个是startWith:,一个是takeLast:1。startWith:是在scanWithStart: reduceWithIndex:变换之后的信号之前加上start信号。takeLast:1是取最后一个信号。takeLast:和startWith:的详细分析文章下面会详述。
值得注意的一点是,原信号如果没有发送complete信号,那么该函数就不会输出新的信号值。因为在一直等待结束。
19. aggregateWithStart: reduce:
- (RACSignal *)aggregateWithStart:(id)start reduce:(id (^)(id running, id next))reduceBlock
return以上是关于RACSignal 所有变换操作底层实现分析的主要内容,如果未能解决你的问题,请参考以下文章