RACSignal 所有变换操作底层实现分析
Posted ZH952016281
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RACSignal 所有变换操作底层实现分析相关的知识,希望对你有一定的参考价值。
前言
继续分析RACSignal的变换操作的底层实现。
目录
- 1.过滤操作
- 2.组合操作
一. 过滤操作
过滤操作也属于一种变换,根据过滤条件,过滤出符合条件的值。变换出来的新的信号是原信号的一个子集。
1. filter: (在父类RACStream中定义的)
这个filter:操作在any:的实现中用到过了。
- (instancetype)filter:(BOOL (^)(id value))block
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value)
if (block(value))
return [class return:value];
else
return class.empty;
] setNameWithFormat:@"[%@] -filter:", self.name];
filter:中传入一个闭包,是用筛选的条件。如果满足筛选条件的即返回原信号的值,否则原信号的值被“吞”掉,返回空的信号。这个变换主要是用flattenMap的。
2. ignoreValues
- (RACSignal *)ignoreValues
return [[self filter:^(id _)
return NO;
] setNameWithFormat:@"[%@] -ignoreValues", self.name];
由上面filter的实现,这里把筛选判断条件永远的传入NO,那么原信号的值都会被变换成empty信号,故变换之后的信号为空信号。
3. ignore: (在父类RACStream中定义的)
- (instancetype)ignore:(id)value
return [[self filter:^ BOOL (id innerValue)
return innerValue != value && ![innerValue isEqual:value];
] setNameWithFormat:@"[%@] -ignore: %@", self.name, [value rac_description]];
ignore:的实现还是由filter:实现的。传入的筛选判断条件是一个值,当原信号发送的值中是这个值的时候,就替换成空信号。
4. distinctUntilChanged (在父类RACStream中定义的)
- (instancetype)distinctUntilChanged
Class class = self.class;
return [[self bind:^
__block id lastValue = nil;
__block BOOL initial = YES;
return ^(id x, BOOL *stop)
if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];
initial = NO;
lastValue = x;
return [class return:x];
;
] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
distinctUntilChanged的实现是用bind来完成的。每次变换中都记录一下原信号上一次发送过来的值,并与这一次进行比较,如果是相同的值,就“吞”掉,返回empty信号。只有和原信号上一次发送的值不同,变换后的新信号才把这个值发送出来。
关于distinctUntilChanged,这里关注的是两两信号之间的值是否不同,有时候我们可能需要一个类似于NSSet的信号集,distinctUntilChanged就无法满足了。在ReactiveCocoa 2.5的这个版本也并没有向我们提供distinct的变换函数。
我们可以自己实现类似的变换。实现思路也不难,可以把之前每次发送过来的信号都用数组存起来,新来的信号都去数组里面查找一遍,如果找不到,就把这个值发送出去,如果找到了,就返回empty信号。效果如上图。
5. take: (在父类RACStream中定义的)
- (instancetype)take:(NSUInteger)count
Class class = self.class;
if (count == 0) return class.empty;
return [[self bind:^
__block NSUInteger taken = 0;
return ^ id (id value, BOOL *stop)
if (taken < count)
++taken;
if (taken == count) *stop = YES;
return [class return:value];
else
return nil;
;
] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
take:实现也非常简单,借助bind函数来实现的。入参的count是原信号取值的个数。在bind的闭包中,taken计数从0开始取原信号的值,当taken取到count个数的时候,就停止取值。
在take:的基础上我们还可以继续改造出新的变换方式。比如说,想取原信号中执行的第几个值。类似于elementAt的操作。这个操作在ReactiveCocoa 2.5的这个版本也并没有直接向我们提供出来。
其实实现很简单,只需要判断taken是否等于我们要取的那个位置就可以了,等于的时候把原信号的值发送出来,并*stop = YES。
// 我自己增加实现的方法
- (instancetype)elementAt:(NSUInteger)index
Class class = self.class;
return [[self bind:^
__block NSUInteger taken = 0;
return ^ id (id value, BOOL *stop)
if (index == 0)
*stop = YES;
return [class return:value];
if (taken == index)
*stop = YES;
return [class return:value];
else if (taken < index)
taken ++;
return [class empty];
else
return nil;
;
] setNameWithFormat:@"[%@] -elementAt: %lu", self.name, (unsigned long)index];
6. takeLast:
- (RACSignal *)takeLast:(NSUInteger)count
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber)
NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
return [self subscribeNext:^(id x)
[valuesTaken addObject:x ? : RACTupleNil.tupleNil];
while (valuesTaken.count > count)
[valuesTaken removeObjectAtIndex:0];
error:^(NSError *error)
[subscriber sendError:error];
completed:^
for (id value in valuesTaken)
[subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
[subscriber sendCompleted];
];
] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
takeLast:的实现也是按照套路来。先创建一个新信号,return的时候订阅原信号。在函数内部用一个valuesTaken来保存原信号发送过来的值,原信号发多少,就存多少,直到个数溢出入参给定的count,就溢出数组第0位。这样能保证数组里面始终都装着最后count个原信号的值。
当原信号发送completed信号的时候,把数组里面存的值都sendNext出去。这里要注意的也是该变换发送信号的时机。如果原信号一直没有completed,那么takeLast:就一直没法发出任何信号来。
7. takeUntilBlock: (在父类RACStream中定义的)
- (instancetype)takeUntilBlock:(BOOL (^)(id x))predicate
NSCParameterAssert(predicate != nil);
Class class = self.class;
return [[self bind:^
return ^ id (id value, BOOL *stop)
if (predicate(value)) return nil;
return [class return:value];
;
] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
takeUntilBlock:是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么新信号停止发送新信号,因为它被置为nil了。和函数名的意思是一样的,take原信号的值,Until直到闭包满足条件。
8. takeWhileBlock: (在父类RACStream中定义的)
- (instancetype)takeWhileBlock:(BOOL (^)(id x))predicate
NSCParameterAssert(predicate != nil);
return [[self takeUntilBlock:^ BOOL (id x)
return !predicate(x);
] setNameWithFormat:@"[%@] -takeWhileBlock:", self.name];
takeWhileBlock:的信号集是takeUntilBlock:的信号集的补集。全集是原信号。takeWhileBlock:底层还是调用takeUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。
9. takeUntil:
- (RACSignal *)takeUntil:(RACSignal *)signalTrigger
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber)
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
void (^triggerCompletion)(void) = ^
[disposable dispose];
[subscriber sendCompleted];
;
RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _)
triggerCompletion();
completed:^
triggerCompletion();
];
[disposable addDisposable:triggerDisposable];
if (!disposable.disposed)
RACDisposable *selfDisposable = [self subscribeNext:^(id x)
[subscriber sendNext:x];
error:^(NSError *error)
[subscriber sendError:error];
completed:^
[disposable dispose];
[subscriber sendCompleted];
];
[disposable addDisposable:selfDisposable];
return disposable;
] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
takeUntil:的实现也是“经典套路”——return一个新信号,在新信号中订阅原信号。入参是一个信号signalTrigger,这个信号是一个Trigger。一旦signalTrigger发出第一个信号,就会触发triggerCompletion( )闭包,在这个闭包中,会调用triggerCompletion( )闭包。
void (^triggerCompletion)(void) = ^
[disposable dispose];
[subscriber sendCompleted];
;
一旦调用了triggerCompletion( )闭包,就会把原信号取消订阅,并给变换的新的信号订阅者sendCompleted。
如果入参signalTrigger一直没有sendNext,那么原信号就会一直sendNext:。
10. takeUntilReplacement:
- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement
return [RACSignal createSignal:^(id<RACSubscriber> subscriber)
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x)
[selfDisposable dispose];
[subscriber sendNext:x];
error:^(NSError *error)
[selfDisposable dispose];
[subscriber sendError:error];
completed:^
[selfDisposable dispose];
[subscriber sendCompleted];
];
if (!selfDisposable.disposed)
selfDisposable.disposable = [[self
concat:[RACSignal never]]
subscribe:subscriber];
return [RACDisposable disposableWithBlock:^
[selfDisposable dispose];
[replacementDisposable dispose];
];
];
- 原信号concat:了一个[RACSignal never]信号,这样原信号就一直不会disposed,会一直等待replacement信号的到来。
- 控制selfDisposable是否被dispose,控制权来自于入参的replacement信号,一旦replacement信号sendNext,那么原信号就会取消订阅,接下来的事情就会交给replacement信号了。
- 变换后的新信号sendNext,sendError,sendCompleted全部都由replacement信号来发送,最终新信号完成的时刻也是replacement信号完成的时刻。
11. skip: (在父类RACStream中定义的)
- (instancetype)skip:(NSUInteger)skipCount
Class class = self.class;
return [[self bind:^
__block NSUInteger skipped = 0;
return ^(id value, BOOL *stop)
if (skipped >= skipCount) return [class return:value];
skipped++;
return class.empty;
;
] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
skip:信号集和take:信号集是补集关系,全集是原信号。take:是取原信号的前count个信号,而skip:是从原信号第count + 1位开始取信号。
skipped是一个游标,每次原信号发送一个值,就比较它和入参skipCount的大小。如果不比skipCount大,说明还需要跳过,所以就返回empty信号,否则就把原信号的值发送出来。
通过类比take系列方法,可以发现在ReactiveCocoa 2.5的这个版本也并没有向我们提供skipLast:的变换函数。这个变换函数的实现过程也不难,我们可以类比takeLast:来实现。
实现的思路也不难,原信号每次发送过来的值,都用一个数组存储起来。skipLast:是想去掉原信号最末尾的count个信号。
我们先来分析一下:假设原信号有n个信号,从0 - (n-1),去掉最后的count个,前面还剩n - count个信号。那么从 原信号的第 count + 1位的信号开始发送,到原信号结束,这样中间就正好是发送了 n - count 个信号。
分析清楚后,代码就很容易了:
// 我自己增加实现的方法
- (RACSignal *)skipLast:(NSUInteger)count
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber)
NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
return [self subscribeNext:^(id x)
[valuesTaken addObject:x ? : RACTupleNil.tupleNil];
while (valuesTaken.count > count)
[subscriber sendNext:valuesTaken[0] == RACTupleNil.tupleNil ? nil : valuesTaken[0]];
[valuesTaken removeObjectAtIndex:0];
error:^(NSError *error)
[subscriber sendError:error];
completed:^
[subscriber sendCompleted];
];
] setNameWithFormat:@"[%@] -skipLast: %lu", self.name, (unsigned long)count];
原信号每发送过来一个信号就存入数组,当数组里面的个数大于count的时候,就是需要我们发送信号的时候,这个时候每次都把数组里面第0位发送出去即可,数组维护了一个FIFO的队列。这样就实现了skipLast:的效果了。
12. skipUntilBlock: (在父类RACStream中定义的)
- (instancetype)skipUntilBlock:(BOOL (^)(id x))predicate
NSCParameterAssert(predicate != nil);
Class class = self.class;
return [[self bind:^
__block BOOL skipping = YES;
return ^ id (id value, BOOL *stop)
if (skipping)
if (predicate(value))
skipping = NO;
else
return class.empty;
return [class return:value];
;
] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
skipUntilBlock: 的实现可以类比takeUntilBlock: 的实现。
skipUntilBlock: 是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么skipping = NO。skipping为NO,以后原信号发送的每个值都原封不动的发送出去。predicate( )闭包不满足条件的时候,即会一直skip原信号的值。和函数名的意思是一样的,skip原信号的值,Until直到闭包满足条件,就不再skip了。
13. skipWhileBlock: (在父类RACStream中定义的)
- (instancetype)skipWhileBlock:(BOOL (^)(id x))predicate
NSCParameterAssert(predicate != nil);
return [[self skipUntilBlock:^ BOOL (id x)
return !predicate(x);
] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
skipWhileBlock:的信号集是skipUntilBlock:的信号集的补集。全集是原信号。skipWhileBlock:底层还是调用skipUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。
到这里skip系列方法就结束了,对比take系列的方法,少了2个方法,在ReactiveCocoa 2.5的这个版本中 takeUntil: 和 takeUntilReplacement:这两个方法没有与之对应的skip方法。
// 我自己增加实现的方法
- (RACSignal *)skipUntil:(RACSignal *)signalTrigger
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber)
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
__block BOOL sendTrigger = NO;
void (^triggerCompletion)(void) = ^
sendTrigger = YES;
;
RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _)
triggerCompletion();
completed:^
triggerCompletion();
];
[disposable addDisposable:triggerDisposable];
if (!disposable.disposed)
RACDisposable *selfDisposable = [self subscribeNext:^(id x)
if (sendTrigger)
[subscriber sendNext:x];
error:^(NSError *error)
[subscriber sendError:error];
completed:^
[disposable dispose];
[subscriber sendCompleted];
];
[disposable addDisposable:selfDisposable];
return disposable;
] setNameWithFormat:@"[%@] -skipUntil: %@", self.name, signalTrigger];
skipUntil实现方法也很简单,当入参的signalTrigger开发发送信号的时候,就让原信号sendNext把值发送出来,否则就把原信号的值“吞”掉。
skipUntilReplacement:就没什么意义了,把原信号经过skipUntilReplacement:变换之后得到的新的信号就是Replacement信号。所以说这个操作也就没意义了。
14. groupBy:transform:
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock
NSCParameterAssert(keyBlock != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber)
NSMutableDictionary *groups = [NSMutableDictionary dictionary];
NSMutableArray *orderedGroups = [NSMutableArray array];
return [self subscribeNext:^(id x)
id<NSCopying> key = keyBlock(x);
RACGroupedSignal *groupSubject = nil;
@synchronized(groups)
groupSubject = groups[key];
if (groupSubject == nil)
groupSubject = [RACGroupedSignal signalWithKey:key];
groups[key] = groupSubject;
[orderedGroups addObject:groupSubject];
[subscriber sendNext:groupSubject];
[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
error:^(NSError *error)
[subscriber sendError:error];
[orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
completed:^
[subscriber sendCompleted];
[orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
];
] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
看groupBy:transform:的实现,依旧是老“套路”。return 一个新的RACSignal,在新的信号里面订阅原信号。
groupBy:transform:的重点就在subscribeNext中了。
- 首先解释一下两个入参。两个入参都是闭包,keyBlock返回值是要作为字典的key,transformBlock的返回值是对原信号发出来的值x进行变换。
- 先创建一个NSMutableDictionary字典groups,和NSMutableArray数组orderedGroups。
- 从字典里面取出key对应的value,这里的key对应着keyBlock返回值。value的值是一个RACGroupedSignal信号。如果找不到对应的key值,就新建一个RACGroupedSignal信号,并存入字典对应的key值,与之对应。
- 新变换之后的信号,订阅之后,RACGroupedSignal进行sendNext,这是一个信号,如果transformBlock不为空,就发送transformBlock变换之后的值。
- sendError和sendCompleted都要分别对数组orderedGroups里面每个RACGroupedSignal都要进行sendError或者sendCompleted。因为要对数组里面每个信号都执行一个操作,所以需要调用makeObjectsPerformSelector:withObject:方法。
经过groupBy:transform:变换之后,原信号会根据keyBlock进行分组。
写出测试代码,来看看平时应该怎么用。
RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber)
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^
NSLog(@"signal dispose");
];
];
RACSignal *signalGroup = [signalA groupBy:^id<NSCopying>(NSNumber *object)
return object.integerValue > 3 ? @"good" : @"bad";
transform:^id(NSNumber * object)
return @(object.integerValue * 10);
];
[[[signalGroup filter:^BOOL(RACGroupedSignal *value)
return [(NSString *)value.key isEqualToString:@"good"];
] flatten]subscribeNext:^(id x)
NSLog(@"subscribeNext: %@", x);
];
假设原信号发送的1,2,3,4,5是代表的成绩的5个等级。当成绩大于3的都算“good”,小于3的都算“bad”。
signalGroup是原信号signalA经过groupBy:transform:得到的新的信号,这个信号是一个高阶的信号,因为它里面并不是直接装的是值,signalGroup这个信号里面装的还是信号。signalGroup里面有两个分组,分别是“good”分组和“bad”分组。
想从中取出这两个分组里面的值,需要进行一次filter:筛选。筛选之后得到对应分组的高阶信号。这时还要再进行一个flatten操作,把高阶信号变成低阶信号,再次订阅才能取到其中的值。
订阅新信号的值,输出如下:
subscribeNext: 40
subscribeNext: 50
关于flatten的实现:
- (instancetype)flatten
__weak RACStream *stream __attribute__((unused)) = self;
return [[self flattenMap:^(id value)
return value;
] setNameWithFormat:@"[%@] -flatten", self.name];
flatten操作就是调用了flattenMap:把值传进去了。
- (instancetype)flattenMap:(RACStream * (^)(id value))block
Class class = self.class;
return [[self bind:^
return ^(id value, BOOL *stop)
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
;
] setNameWithFormat:@"[%@] -flattenMap:", self.name];
flatten是把高阶信号变换成低阶信号的常用操作。flattenMap:具体实现上篇文章分析过了,这里不再赘述。
15. groupBy:
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock
return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
groupBy:操作就是groupBy:transform:的缩减版,transform传入的为nil。
关于groupBy:可以干的事情很多,可以进行很高级的分组操作。这里可以举一个例子:
// 简单算法题,分离数组中的相同的元素,如果元素个数大于2,则组成一个新的数组,结果得到多个包含相同元素的数组,
// 例如[1,2,3,1,2,3]分离成[1,1],[2,2],[3,3]
RACSignal *signal = @[@1, @2, @3, @4,@2,@3,@3,@4,@4,@4].rac_sequence.signal;
NSArray * array = [[[[signal groupBy:^NSString *(NSNumber *object)
return [NSString stringWithFormat:@"%@",object];
] map:^id(RACGroupedSignal *value)
return [value sequence];
] sequence] map:^id(RACSignalSequence * value)
return value.array;
].array;
for (NSNumber * num in array)
NSLog(@"最后的数组%@",num);
// 最后输出 [1,2,3,4,2,3,3,4,4,4]变成[1],[2,2],[3,3,3],[4,4,4,4]
二. 组合操作
1. startWith: (在父类RACStream中定义的)
- (instancetype)startWith:(id)value
return [[[self.class return:value]
concat:self]
setNameWithFormat:@"[%@] -startWith: %@", self.name, [value rac_description]];
startWith:的实现很简单,就是先构造一个只发送一个value的信号,然后这个信号发送完毕之后接上原信号。得到的新的信号就是在原信号前面新加了一个值。
2. concat: (在父类RACStream中定义的)
这里说的concat:是在父类RACStream中定义的。
- (instancetype)concat:(RACStream *)stream
return nil;
父类中定义的这个方法就返回一个nil,具体的实现还要子类去重写。
3. concat: (在父类RACStream中定义的)
+ (instancetype)concat:(id<NSFastEnumeration>)streams
RACStream *result = self.empty;
for (RACStream *stream in streams)
result = [result concat:stream];
return [result setNameWithFormat:@"+concat: %@", streams];
这个concat:后面跟着一个数组,数组里面包含这很多信号,concat:依次把这些信号concat:连接串起来。
4. merge:
+ (RACSignal *)merge:(id<NSFastEnumeration>)signals
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
for (RACSignal *signal in signals)
[copiedSignals addObject:signal];
return [[[RACSignal
createSignal:^ RACDisposable * (id<RACSubscriber> subscriber)
for (RACSignal *signal in copiedSignals)
[subscriber sendNext:signal];
[subscriber sendCompleted];
return nil;
]
flatten]
setNameWithFormat:@"+merge: %@", copiedSignals];
merge:后面跟一个数组。先会新建一个数组copiedSignals,把传入的信号都装到数组里。然后依次发送数组里面的信号。由于新信号也是一个高阶信号,因为sendNext会把各个信号都依次发送出去,所以需要flatten操作把这个信号转换成值发送出去。
从上图上看,上下两个信号就像被拍扁了一样,就成了新信号的发送顺序。
5. merge:
- (RACSignal *)merge:(RACSignal *)signal
return [[RACSignal
merge:@[ self, signal ]]
setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
merge:后面参数也可以跟一个信号,那么merge:就是合并这两个信号。具体实现和merge:多个信号是一样的原理。
6. zip: (在父类RACStream中定义的)
+ (instancetype)zip:(id<NSFastEnumeration>)streams
return [[self join:streams block:^(RACStream *left, RACStream *right)
return [left zipWith:right];
] setNameWithFormat:@"+zip: %@", streams];
zip:后面可以跟一个数组,数组里面装的是各种信号流。
它的实现是调用了join: block: 实现的。
+ (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block
RACStream *current = nil;
// 第一步
for (RACStream *stream in streams)
if (current == nil)
current = [stream map:^(id x)
return RACTuplePack(x);
];
continue;
current = block(current, stream);
// 第二步
if (current == nil) return [self empty];
return [current map:^(RACTuple *xs)
NSMutableArray *values = [[NSMutableArray alloc] init];
// 第三步
while (xs != nil)
[values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
xs = (xs.count > 1 ? xs.first : nil);
// 第四步
return [RACTuple tupleWithObjectsFromArray:values];
];
join: block: 的实现可以分为4步:
-
依次打包各个信号流,把每个信号流都打包成元组RACTuple。首先第一个信号流打包成一个元组,这个元组里面就一个信号。接着把第一个元组和第二个信号执行block( )闭包里面的操作。传入的block( )闭包执行的是zipWith:的操作。这个操作是把两个信号“压”在一起。得到第二个元组,里面装着是第一个元组和第二个信号。之后每次循环都执行类似的操作,再把第二个元组和第三个信号进行zipWith:操作,以此类推下去,直到所有的信号流都循环一遍。
-
经过第一步的循环操作之后,还是nil,那么肯定就是空信号了,就返回empty信号。
-
这一步是把之前第一步打包出来的结果,还原回原信号的过程。经过第一步的循环之后,current会是类似这个样子,(((1), 2), 3),第三步就是为了把这种多重元组解出来,每个信号流都依次按照顺序放在数组里。注意观察current的特点,最外层的元组,是一个值和一个元组,所以从最外层的元组开始,一层一层往里“剥”。while循环每次都取最外层元组的last,即那个单独的值,插入到数组的第0号位置,然后取出first即是里面一层的元组。然后依次循环。由于每次都插入到数组0号的位置,类似于链表的头插法,最终数组里面的顺序肯定也保证是原信号的顺序。
-
第四步就是把还原成原信号的顺序的数组包装成元组,返回给map操作的闭包。
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array
return [self tupleWithObjectsFromArray:array convertNullsToNils:NO];
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array convertNullsToNils:(BOOL)convert
RACTuple *tuple = [[self alloc] init];
if (convert)
NSMutableArray *newArray = [NSMutableArray arrayWithCapacity:array.count];
for (id object in array)
[newArray addObject:(object == NSNull.null ? RACTupleNil.tupleNil : object)];
tuple.backingArray = newArray;
else
tuple.backingArray = [array copy];
return tuple;
在转换过程中,入参convertNullsToNils的含义是,是否把数组里面的NSNull转换成RACTupleNil。
这里转换传入的是NO,所以就是把数组原封不动的copy一份。
测试代码:
RACSignal *signalD = [RACSignal interval:3 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalO = [RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalE = [RACSignal interval:4 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalB = [RACStream zip:@[signalD,signalO,signalE]];
[signalB subscribeNext:^(id x)
NSLog(@"最后接收到的值 = %@",x);
];
打印输出:
2016-11-29 13:07:57.349 最后接收到的值 = <RACTuple: 0x608000011440> (
"2016-11-29 05:07:56 +0000",
"2016-11-29 05:07:54 +0000",
"2016-11-29 05:07:57 +0000"
)
2016-11-29 13:08:01.350 最后接收到的值 = <RACTuple: 0x608000010c60> (
"2016-11-29 05:07:59 +0000",
"2016-11-29 05:07:55 +0000",
"2016-11-29 05:08:01 +0000"
)
2016-11-29 13:08:05.352 最后接收到的值 = <RACTuple: 0x60000001a350> (
"2016-11-29 05:08:02 +0000",
"2016-11-29 05:07:56 +0000",
"2016-11-29 05:08:05 +0000"
)
最后输出的信号以时间最长的为主,最后接到的信号是一个元组,里面依次包含zip:数组里每个信号在一次“压”缩周期里面的值。
7. zip: reduce: (在父类RACStream中定义的)
+ (instancetype)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock
NSCParameterAssert(reduceBlock != nil);
RACStream *result = [self zip:streams];
if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
zip: reduce:是一个组合的方法。具体实现可以拆分成两部分,第一部分是先执行zip:,把数组里面的信号流依次都进行组合。这一过程的实现在上一个变换实现中分析过了。zip:完成之后,紧接着进行reduceEach:操作。
这里有一个判断reduceBlock是否为nil的判断,这个判断是针对老版本的“历史遗留问题”。在ReactiveCocoa 2.5之前的版本,是允许reduceBlock传入nil,这里为了防止崩溃,所以加上了这个reduceBlock是否为nil的判断。
- (instancetype)reduceEach:(id (^)())reduceBlock
NSCParameterAssert(reduceBlock != nil);
__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t)
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
] setNameWithFormat:@"[%@] -reduceEach:", self.name];
reduceEach:操作在已经分析过了。它会动态的构造闭包,对原信号每个元组,执行reduceBlock( )闭包里面的方法。具体分析见上篇。一般用法如下:
[RACStream zip:@[ stringSignal, intSignal ] reduce:^(NSString *string, NSNumber *number)
return [NSString stringWithFormat:@"%@: %@", string, number];
];
8. zipWith: (在父类RACStream中定义的)
- (instancetype)zipWith:(RACStream *)stream
return nil;
这个方法就是在父类的RACStream中定义了,具体实现还要看RACStream各个子类的实现。
它就可以类比concat:在父类中的实现,也是直接返回一个nil。
- (instancetype)concat:(RACStream *)stream return nil;
之前分析了concat:和zipWith:在RACSignal子类中具体实现。忘记了具体实现的可以回去看看。
9. combineLatestWith:
- (RACSignal *)combineLatestWith:(RACSignal *)signal
NSCParameterAssert(signal != nil);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber)
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
// 初始化第一个信号的一些标志变量
__block id lastSelfValue = nil;
__block BOOL selfCompleted = NO;
// 初始化第二个信号的一些标志变量
__block id lastOtherValue = nil;
__block BOOL otherCompleted = NO;
// 这里是一个判断是否sendNext的闭包
void (^sendNext)(void) = ^ ;
// 订阅第一个信号
RACDisposable *selfDisposable = [self subscribeNext:^(id x) ];
[disposable addDisposable:selfDisposable];
// 订阅第二个信号
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) ];
[disposable addDisposable:otherDisposable];
return disposable;
] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
大体实现思路比较简单,在新信号里面分别订阅原信号和入参signal信号。
RACDisposable *selfDisposable = [self subscribeNext:^(id x)
@synchronized (disposable)
lastSelfValue = x ?: RACTupleNil.tupleNil;
sendNext();
error:^(NSError *error)
[subscriber sendError:error];
completed:^
@synchronized (disposable)
selfCompleted = YES;
if (otherCompleted) [subscriber sendCompleted];
];
先来看看原信号订阅的具体实现:
在subscribeNext闭包中,记录下原信号最新发送的x值,并保存到lastSelfValue中。从此lastSelfValue变量每次都保存原信号发送过来的最新的值。然后再调用sendNext( )闭包。
在completed闭包中,selfCompleted中记录下
以上是关于RACSignal 所有变换操作底层实现分析的主要内容,如果未能解决你的问题,请参考以下文章