CKQueryOperation 处理大批量

Posted

技术标签:

【中文标题】CKQueryOperation 处理大批量【英文标题】:CKQueryOperation working with big batch 【发布时间】:2015-05-11 23:11:28 【问题描述】:

我在使用大量数据创建 CKQuery 操作时遇到问题。我的查询适用于 100 个结果,但在更多结果查询失败后,因为一个线程调度错误或其他原因(libdispatch.dylib`dispatch_group_leave: ) 我迷路了……有什么想法吗?

+ (void) fetchAnswersWithRecordId:(CKRecordID *)recordId completionHandler:(CloudKitCompletionHandler)handler 
NSPredicate *predicate = [NSPredicate predicateWithFormat:@"ANSrecordID == %@", recordId];
CKQuery *query = [[CKQuery alloc] initWithRecordType:ckAnswers predicate:predicate];

CKQueryOperation *operation = [[CKQueryOperation alloc] initWithQuery:query];
CKQueryOperation * __weak weakSelf = operation;
operation.resultsLimit = 300;
NSMutableArray *tmp = [[NSMutableArray alloc] init];


operation.recordFetchedBlock = ^(CKRecord *record) 
    [tmp addObject:record];
;

operation.queryCompletionBlock = ^(CKQueryCursor *cursor, NSError *error) 
    if (!handler)
        return;

    NSArray *array = [NSArray arrayWithArray:tmp];
    if(cursor != nil) 
        CKQueryOperation *newOperation = [[CKQueryOperation alloc] initWithCursor:cursor];
        newOperation.recordFetchedBlock = weakSelf.recordFetchedBlock;
        newOperation.completionBlock = weakSelf.completionBlock;
        [[self publicCloudDatabase] addOperation:newOperation];
     else 
        NSLog(@"Results: %lu", [array count]);
        dispatch_async(dispatch_get_main_queue(), ^
            handler(array, error);
        );
    
;

[[self publicCloudDatabase] addOperation:operation];

【问题讨论】:

【参考方案1】:

我认为您的问题在于 __weak 操作以及您在另一个操作中创建操作的方式。这是我如何做类似事情的示例(快速),即获得额外的结果,但在获取而不是查询中。注意第一次初始化使用实例变量,通过GCD dispatch_aync使用半递归:

private func _fetchRecordChangesFromCloud() 
    if !_fetching 
        // this is the first and only time this code is called in GCD recusion

        // we clean the caches we use to collect the results of the fetch
        // so we can then save the record in the correct order so references can be created
        _fetchedModifiedRecords = []
        _fetchedDeletedRecordIDs = []

        // mark fetching has started
        _fetching = true
    

    let operation = CKFetchRecordChangesOperation(recordZoneID: _customRecordZoneID, previousServerChangeToken: _serverChangeToken)

    operation.recordChangedBlock =  (record: CKRecord?) in
        if let record = record 
            println("Received record to save: \(record)")
            self._fetchedModifiedRecords.append(record)
        
    

    operation.recordWithIDWasDeletedBlock =  (recordID: CKRecordID?) in
        if let recordID = recordID 
            println("Received recordID to delete: \(recordID)")
            self._fetchedDeletedRecordIDs.append(recordID)
        
    

    operation.fetchRecordChangesCompletionBlock = 
        (serverChangeToken: CKServerChangeToken?, clientChangeToken: NSData?, error: NSError?) -> Void in

        if let error = error 
            println("Error in fetching record changes: \(error)")
            // try again next sync
            self._fetchAfterNextSuccessfullSync = true
            self._fetching = false
            return
        

        // fetched records successfuly
        println("fetched records successfuly")

        if let serverChangeToken = serverChangeToken 
            self._serverChangeToken = serverChangeToken
        

        if operation.moreComing 
            // we need to create another operation object and do it again
            dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) 
                self._fetchRecordChangesFromCloud()
            
         else 
            // we are finally done

            // process the fetched records
            self._processFetchedRecords()

            // save all changes back to persistent store
            self._saveBackgroundContext()

            // we are done
            self._fetching = false
        
    

    self._privateDatabase.addOperation(operation)

【讨论】:

【参考方案2】:

仅供参考,dispatch_async() 不是必需的,这是内存管理问题。以下内容适用于多批次提取:

//
__block CKQueryOperation* enumerateOperationActive = nil;

//
NSPredicate* predicate = [NSPredicate predicateWithValue:TRUE];
CKQuery* query = [[[CKQuery alloc] initWithRecordType:@"myType" predicate:predicate] autorelease];

CKQueryOperation* enumerateOperation = [[[CKQueryOperation alloc] initWithQuery:query] autorelease];

// DEBUG: fetch only 1 record in order to "force" a nested CKQueryOperation cycle
enumerateOperation.resultsLimit = 1;

enumerateOperation.recordFetchedBlock = ^(CKRecord* recordFetched)
    
        // ...
    ;

enumerateOperation.queryCompletionBlock = ^(CKQueryCursor* cursor, NSError* error)
    
        if (error)
        
            // ...
        
        else
        
            if (cursor)
            
                CKQueryOperation* enumerateOperationNested = [[[CKQueryOperation alloc] initWithCursor:cursor] autorelease];

                // DEBUG: fetch only 1 record in order to "force" a doubly-nested CKQueryOperation cycle
                enumerateOperationNested.resultsLimit = 1;

                enumerateOperationNested.recordFetchedBlock = /* re-used */ enumerateOperationActive.recordFetchedBlock;
                enumerateOperationNested.queryCompletionBlock = /* re-used */ enumerateOperationActive.queryCompletionBlock;

                // CRITICAL: keep track of the very last (active) operation
                enumerateOperationActive = enumerateOperationNested;
                [database addOperation:enumerateOperationNested];
            
        
    ;

//

// CRITICAL: keep track of the very last (active) operation
enumerateOperationActive = enumerateOperation;
[database addOperation:enumerateOperation];

注意:如果您尝试访问(原始)enumerateOperation.queryCompletionBlock 而不是(最后一个)enumerateOperationActive.queryCompletionBlock,则操作将永远不会完成。

【讨论】:

为什么是autorelease?你不使用 ARC 吗? 真正的男人不使用 ARC ;)【参考方案3】:

我喜欢主线程上的递归解决方案。下面是我在 Objective C 中的解决方案。我使用的是类级别的 var:_recordArray,提前分配。

- (void) readRecords_Resurs: (CKDatabase *) database query: (CKQuery *) query cursor: (CKQueryCursor *) cursor 

    // Send either query or cursor

    CKQueryOperation *operation;
    if (query != nil) operation = [[CKQueryOperation alloc] initWithQuery: query];
    else operation = [[CKQueryOperation alloc] initWithCursor: cursor];
    operation.recordFetchedBlock = ^(CKRecord *record) 
        [_recordArray addObject:record];
    ;
    operation.queryCompletionBlock = ^(CKQueryCursor *cursor, NSError *error) 
        if (cursor == nil || error != nil) 
            // Done
            dispatch_async(dispatch_get_main_queue(), ^ [self readRecordsDone: error == nil ? nil : [error localizedDescription]]; );
        
        else 
            // Get next batch using cursor
            dispatch_async(dispatch_get_main_queue(), ^ [self readRecords_Resurs: database query: nil cursor: cursor]; );
        
    ;

    [database addOperation: operation]; // start


- (void) readRecordsDone: (NSString *) errorMsg 

【讨论】:

【参考方案4】:

我的解决方案是一个使用两个操作但都使用相同块的类别,您可以为每个请求提供多少个结果。

@interface CKDatabase (MH)

/* Convenience method for performing a query receiving the results in batches using multiple network calls. Best use max 400 for cursorResultsLimit otherwise server sometimes exceptions telling you to use max 400. Even using CKQueryOperationMaximumResults can cause this exception.  */
- (void)mh_performCursoredQuery:(CKQuery *)query cursorResultsLimit:(int)cursorResultsLimit inZoneWithID:(CKRecordZoneID *)zoneID completionHandler:(void (^)(NSArray /* CKRecord */ *results, NSError *error))completionHandler;

@end

@implementation CKDatabase(MH)

- (void)mh_performCursoredQuery:(CKQuery *)query cursorResultsLimit:(int)cursorResultsLimit inZoneWithID:(CKRecordZoneID *)zoneID completionHandler:(void (^)(NSArray /* CKRecord */ *results, NSError *error))completionHandler

    //holds all the records received.
    NSMutableArray* records = [NSMutableArray array];

    //this block adds records to the result array
    void (^recordFetchedBlock)(CKRecord *record) = ^void(CKRecord *record) 
        [records addObject:record];
    ;

    //this is used to allow the block to call itself recurively without causing a retain cycle.
    void (^queryCompletionBlock)(CKQueryCursor *cursor, NSError *error)
    __block __weak typeof(queryCompletionBlock) weakQueryCompletionBlock;

    weakQueryCompletionBlock = queryCompletionBlock = ^void(CKQueryCursor *cursor, NSError *error) 
        //if any error at all then end with no results. Note its possible that we got some results,
        // and even might have got a cursor. However if there is an error then the cursor doesn't work properly so will just return with no results.
        if(error)
            completionHandler(nil,error);
        
        else if(cursor)
            CKQueryOperation* cursorOperation = [[CKQueryOperation alloc] initWithCursor:cursor];
            cursorOperation.zoneID = zoneID;
            cursorOperation.resultsLimit = cursorResultsLimit;
            cursorOperation.recordFetchedBlock = recordFetchedBlock;
            cursorOperation.queryCompletionBlock = weakQueryCompletionBlock; // use the weak pointer to prevent retain cycle
            //start the recursive operation and return.
            [self addOperation:cursorOperation];
        
        else
            completionHandler(records,nil);
        
    ;

    //start the initial operation.
    CKQueryOperation* queryOperation = [[CKQueryOperation alloc] initWithQuery:query];
    queryOperation.zoneID = zoneID;
    queryOperation.resultsLimit = cursorResultsLimit;
    queryOperation.recordFetchedBlock = recordFetchedBlock;
    queryOperation.queryCompletionBlock = queryCompletionBlock;
    [self addOperation:queryOperation];


@end

【讨论】:

您可以将其翻译成 SWIFT 不确定,上次我尝试 swift 它没有错误处理

以上是关于CKQueryOperation 处理大批量的主要内容,如果未能解决你的问题,请参考以下文章

使用新游标多次执行 CKQueryOperation

CloudKit 批量获取?

CKQueryOperation 可见性、取消和超时可能性

使用 CKQueryOperation 和 CKReference 获取大集合

CloudKit:删除记录的 CKQueryOperation

CloudKit - 具有依赖关系的 CKQueryOperation