保留多线程API中的传入请求顺序并按顺序处理

Posted

技术标签:

【中文标题】保留多线程API中的传入请求顺序并按顺序处理【英文标题】:preserve incoming request order in multithreaded api and process in order 【发布时间】:2018-04-05 15:14:37 【问题描述】:

假设有一个多线程API,从输入队列读取请求,处理请求,然后将数据写入输出队列。

但是有一个约束:所有传入请求的结果应该按照接收到的顺序写入 O/P 队列。

示例: 请求:[r1, r2, r3] 输出应该是相同的顺序:[o1, o2, o3]

问题:如何同步线程,使输出队列中的所有数据都像输入队列一样保持顺序??

我给了面试官3个答案:

第一个答案: 假设线程 [t1, t2, t3] 处理传入请求 [r1,r2,r3]

在这种情况下我们可以加入: t2 加入 t1 t3 加入 t2 这样可以维持处理顺序。

上述解决方案的问题: 1.必须为每个请求创建新线程。 2. 线程在处理并写入输出后必须退出。

显然上述解决方案是不可行的,因为如果有一个线程池并且我们有有限数量的线程怎么办。

第二个解决方案: 当线程从输入队列中获取请求时,锁定输出队列。 线程 t1 在输出队列上获取 r1 和互斥锁。 显然 t2 和 t3 将无法处理请求 r2 和 r3。

上述解决方案的问题: 1.这里不能实现并发。 2.单个线程只能处理单个请求。 3.这里不能应用线程池的概念。

第三种解决方案: 我们可以在全局数据结构中维护请求的顺序。

线程 t1 取 r1 并在数据结构中添加优先级 p1。 数据结构:[p1]

线程 t2 取 r1 并在优先级数据结构中添加 p2 数据结构:[p1, p2]

现在假设线程 t2 完成了 r2 的处理并愿意将输出 o2 写入 O/P 队列。 t2 将检查数据结构,并会发现 p1 中的第一个元素与 t2 在优先级数据结构中输入的内容不同。所以 t2 将继续有限的等待时间并再次检查。

同时 t1 现在完成并愿意将输出 o1 写入 O/P 队列。它检查数据结构,发现第一个元素是 p1,它是由 t1 自己插入的。因此 t1 将写入输出队列并将 p1 从全局优先级数据结构中删除。

请注意: [使用互斥锁和其他机制可以使全局优先级数据结构成为线程安全的。这里的问题不同:)]

我已经给面试官上面的答案,我认为我的第三个答案是正确的。

但面试官似乎对我的回答并不满意。

谁能给出这个问题的正确解决方案?

根据给出的答案,我想添加更多细节:

1) 主线程不负责将数据写入输出队列。每个线程都会将结果写入 O/P 队列 2)单个线程可能以连续方式处理多个请求。 例如: 线程 T1 处理请求 R1,然后线程 T2 处理请求 R2,然后线程 T1 将输出 o1 放入 O/P 队列,并从 I/P 队列中选择另一个请求 R3 进行处理。 所以现在线程 T1 处理 R3,线程 T2 正在处理 R2 请求。

3) 也可以有许多客户端向 I/P 队列写入请求。不是客户按特定顺序和优先级提出请求。 优先级是在先到先服务器的基础上设置的。

客户端 C1 在另一个客户端 C2 提出 R2 请求之前提出 R1 请求。 因此,针对 R1 请求的输出应首先写入 O/P 队列。

【问题讨论】:

用序列号标记每个请求,将输出与请求的序列号相关联,使用序列号作为键将输出保存在映射中,下一个序列号的任何输出都可以发出直到阻塞等待下一个序列输出。 【参考方案1】:

你的第三个答案很接近。

根据您的说法,您总是按顺序接收输入,因此很容易为添加到队列中的每个对象分配优先级。

任何线程都按顺序拾取元素,但每个元素的工作可能比下一个元素花费更长的时间,因此输出队列中的结果不是按顺序给出的。优先级用于将它们排序回与接收时相同的顺序。

读取输出的进程从第一优先级开始,并等待直到具有该特定优先级编号的输出在队列中。一旦它是,它就会被用完,下一次,我们首先检查下一个优先级是否已经可用并用完或者它不存在,我们阻塞直到新数据出现在输出队列中。

正如 Eljay 所提到的,std::map<unsigned long, your-object-ptr> 可用于输出队列,其中unsigned long 是优先级。

只有故障unsigned long 会在某个时候恢复为 0。您的算法应该确保它能够处理这种特殊情况,如果它可能发生(即您的软件运行一段时间很长一段时间,或者你很快就会在队列中收到大量消息)。

用3个元素来证明这一点,比如先做第3个,然后做第1个,最后做2个,有步骤

Input Queue
[p1, o1]
[p2, o2]
[p3, o3]

3 Threads pick up the objects and start working

Output Queue
-- first incoming result
[p3, o3]     -- main thread still wait for p1

-- second incoming result
[p1, o1]     -- eat this one!
[p3, o3]

-- check for next result, not present, wait
[p3, o3]

-- third incoming result
[p2, o2]     -- eat this one!
[p3, o3]

-- check for next result, present! return immediately
[p3, o3]     -- eat that one, finally

队列上的排序不是强制性的。如果您只期望 2 或 3 个元素,那么不对它们进行排序不会有很大的不同。如果你能得到更多,排序将成为一个很好的优化,因为它会更快地读取数据。


根据附加信息,限制是添加以正确顺序输出队列。没有在输出队列中获得正确的顺序。在这种情况下,我们执行与上述类似的操作,只是我们阻止添加到队列中,除非允许将下一个项目添加到所述队列中。

因此,如果队列尚未收到项目 o1(输出 1),则尝试添加 o2o3 会阻塞当前线程,直到添加 o1

从我上面的例子我会:

Output Queue
-- first incoming result is `o3`
-- thread that handled `o3` blocks until `o2` was added to the queue

-- second incoming result is `o1`
[o1]     -- add this one! counter is now on `o2`

-- wake up threads waiting on queue, thread with `o3` goes back to sleep

-- third incoming result is `o2`
[o2]     -- add this one! counter is now on `o3`

-- wake up threads waiting on queue, thread with `o3` is allowed now!
[o3]

任何一种方式都有效。我个人更喜欢另一个,因为它允许线程立即处理下一个数据包,因为它们的负载已保存在队列中。但是,如果您的资源有限并且需要确保无论如何都能完成工作,这种方式会更好(在负载较重的系统上,这可能是唯一好的解决方案。)

当然,“唤醒等待队列的线程”可以优化为只唤醒等待的线程,因为现在可以专门将o2o3 添加到队列中。这取决于您的实现如何运作。最简单的显然是把它们全部唤醒,让每个线程自己决定是否要保存下一个对象。

请注意,在这种情况下,我们不需要优先级,只需一个计数器即可知道接下来需要保存哪个回复。

无论哪种方式都有一个可能的缺点:如果线程决定不返回任何输出,您的算法就会阻塞(无论哪种方式)。

【讨论】:

根据您的回答,我在问题中还有一些详细信息。请检查它们。谢谢。 @user3990393,我相应地更新了我的答案。

以上是关于保留多线程API中的传入请求顺序并按顺序处理的主要内容,如果未能解决你的问题,请参考以下文章

Python 将任务排入队列并按顺序获取结果(多进程)

多线程请求时同步锁授予顺序?

使用线程池时一定要注意的五个点

多线程 ByteBuffers 比顺序慢?

多线程在我的 c# 程序中执行比顺序执行需要更多时间

HostApduService (NFC) 中的 Android 多线程