重定向提交到调度队列的块

Posted

技术标签:

【中文标题】重定向提交到调度队列的块【英文标题】:Retarget blocks submitted to dispatch queue 【发布时间】:2015-12-07 05:30:33 【问题描述】:

我有串行调度队列Q(以另一个串行队列T 作为目标)并且已经通过dispatch_async(Q, block) 提交了几个块。有没有办法将待处理的块重新定位到另一个队列A

我的简单测试表明Q会尽快将块转发到T,因此设置新目标无效:

#define print(f, ...) printf(f "\n", ##__VA_ARGS__)

dispatch_queue_t Q = dispatch_queue_create("Q", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t T = dispatch_queue_create("T", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t A = dispatch_queue_create("A", DISPATCH_QUEUE_SERIAL);

dispatch_set_target_queue(Q, T);

dispatch_async(T, ^ print("T sleeping"); sleep(2); print("T ready"); );
dispatch_async(A, ^ print("A sleeping"); sleep(5); print("A ready"); );

dispatch_async(Q, ^
    print("block 1");
    dispatch_set_target_queue(Q, A); // no effect!
);
dispatch_async(Q, ^ print("block 2"); );
dispatch_async(Q, ^ print("block 3"); );
dispatch_async(Q, ^ print("block 4"); );

输出:

A sleeping
T sleeping
(wait 2 seconds)
T ready
block 1
block 2
block 3
block 4
(wait 3 seconds)
A ready

如您所见,即使手动状态,第 2-4 块仍被固定到 T

新的目标队列设置将在对象上的块执行之间生效,但不会在任何现有块执行的中间生效(非抢占式)。

我不清楚这是否仅适用于调度源,或者“现有”意味着已经提交(甚至尚未执行)的块,但无论如何,我的串行队列不会发生这种情况。

有没有办法做到这一点?

【问题讨论】:

参考dispatch_set_target_queue,文档说“更改调度源的队列是异步操作,调度源会尽最大努力尽快进行更改。如果事件处理程序已经"诚然,这与来源有关,但您描述的行为似乎相似。 顺便说一句,除了笨拙之外,第二个代码 sn-p 的问题是调度的块将在一个队列中等待,并且只有在第一个队列中等待完成后才会重定向到另一个队列,此时它被添加到另一个队列的末尾。这既低效又迟钝。我可以想象其他模型,但这取决于这里的功能意图是什么。也许您可以退后一步,在这里描述一个您试图解决的实际问题,我们或许可以提出更好的方法。 @Rob 感谢您指出这一点!块甚至可能出现在T 和原始提交者线程之间的竞争中。我想我应该把块放在一个常规的原子队列中,并在合适的时候从那里挑选。更大的图景不是那么大:我有通过“rpc-channels”进行通信的并发进程。通道可以通过其他通道传递,从那时起,呼叫应该转到新所有者(旧所有者使他的句柄无效)。 @Rob 我想我找到了正确的解决方案(作为答案发布),再次感谢。 【参考方案1】:

好的,经过一番研究,我想出了以下解决方案。它基于自定义调度源,我认为这是即时提交块的唯一方法。

BlockSource.h:

dispatch_source_t dispatch_block_source_create(dispatch_queue_t queue);
void dispatch_block_source_add_block(dispatch_source_t source, dispatch_block_t block);

BlockSource.c:

struct context 
    CFMutableArrayRef array;
    pthread_mutex_t mutex;
    dispatch_source_t source;
;

static void
s_event(struct context *context)

    dispatch_block_t block;
    CFIndex pending;

    pthread_mutex_lock(&context->mutex); 
        block = CFArrayGetValueAtIndex(context->array, 0);
        CFArrayRemoveValueAtIndex(context->array, 0);
        pending = CFArrayGetCount(context->array);
    
    pthread_mutex_unlock(&context->mutex);

    block();
    Block_release(block);

    if (pending)
        dispatch_source_merge_data(context->source, 1);


static void
s_cancel(struct context *context)

    CFIndex count = CFArrayGetCount(context->array);
    for (CFIndex i = 0; i < count; i++) 
        dispatch_block_t block = CFArrayGetValueAtIndex(context->array, i);
        Block_release(block);
    
    CFRelease(context->array);
    pthread_mutex_destroy(&context->mutex);
    print("canceled");


dispatch_source_t
dispatch_block_source_create(dispatch_queue_t queue)

    dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0, queue);

    struct context *context = calloc(1, sizeof(*context));
    context->array = CFArrayCreateMutable(kCFAllocatorDefault, 0, NULL);
    pthread_mutex_init(&context->mutex, NULL);
    context->source = source;

    dispatch_set_context(source, context);
    dispatch_source_set_event_handler_f(source, (dispatch_function_t)s_event);
    dispatch_source_set_cancel_handler_f(source, (dispatch_function_t)s_cancel);
    dispatch_set_finalizer_f(source, (dispatch_function_t)free);

    return source;


void
dispatch_block_source_add_block(dispatch_source_t source, dispatch_block_t block)

    struct context *context = dispatch_get_context(source);

    pthread_mutex_lock(&context->mutex); 
        CFArrayAppendValue(context->array, Block_copy(block));
        dispatch_source_merge_data(context->source, 1);
    
    pthread_mutex_unlock(&context->mutex);

还有测试用例:

dispatch_queue_t T = dispatch_queue_create("T", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t A = dispatch_queue_create("A", DISPATCH_QUEUE_SERIAL);

static int queue_name_key;
dispatch_queue_set_specific(T, &queue_name_key, "T", NULL);
dispatch_queue_set_specific(A, &queue_name_key, "A", NULL);

dispatch_source_t source = dispatch_block_source_create(T);
dispatch_resume(source);

for (int i = 1; i <= 10; i++) 
    dispatch_block_source_add_block(source, ^
        print("block %d on queue %s", i, dispatch_get_specific(&queue_name_key));
        sleep(1);

        if (i == 2) 
            dispatch_set_target_queue(source, A);
        
        else if (i == 5) 
            dispatch_source_cancel(source);
            dispatch_release(source);
        
    );

输出:

block 1 on queue T
block 2 on queue T
block 3 on queue A
block 4 on queue A
block 5 on queue A
canceled

至少现在它在dispatch_set_target_queue() 调用之后遵循调度源的方案。这意味着如果在其中一个提交的块中设置了新的目标队列,则保证所有剩余的块都将进入新队列。

可能仍包含错误。

【讨论】:

以上是关于重定向提交到调度队列的块的主要内容,如果未能解决你的问题,请参考以下文章

jQuery滑块重定向图像

shell代码块重定向

如何在 php 中重定向/调度? [复制]

提交给调度组的块是串行执行还是同时执行?

Boost消息队列文件可以重定向到用户指定的位置吗

提交表单 Laravel 后重定向到不同的路由