Java中的并发和可扩展数据结构来处理任务?
Posted
技术标签:
【中文标题】Java中的并发和可扩展数据结构来处理任务?【英文标题】:Concurrent and scalable data structure in Java to handle tasks? 【发布时间】:2018-06-01 13:49:15 【问题描述】:对于我目前的开发,我有许多线程 (Producers
) 创建 Tasks
和许多线程消耗这些 Tasks
(consumers
)
每个Producers
都由一个唯一的名称标识; Tasks
由:
Producers
一个名字
数据
我的问题涉及 (Producers
) 和 (consumers
) 使用的数据结构。
并发队列?
天真地,我们可以想象 Producers
使用 Tasks
填充并发队列,并且 (consumers
) 读取/使用存储在并发队列中的 Tasks
。
我认为这个解决方案可以很好地扩展,但一个单一的案例是有问题的:如果Producers
很快创建了两个具有相同名称但数据不同的Tasks
(任务 T1 和 T2 具有相同的名称但是 T1 有数据 D1,T2 有数据 D2),理论上有可能按照 T2 然后 T1 的顺序消费!
任务图+队列?
现在,我想象基于 Map + Queue 创建自己的数据结构(比如说MyQueue
)。比如一个队列,它会有一个pop()
和一个push()
方法。
pop()
方法非常简单
push()
方法将:
检查现有的Task
是否尚未插入MyQueue
(在地图中执行find()
)
如果找到:存储在要插入的Task
中的数据将与存储在找到的Task
中的数据合并
如果没有找到:Task
将被插入到 Map 中,并在队列中添加一个条目
当然,我必须保证并发访问的安全……这肯定是我的问题;我几乎可以肯定这个解决方案不会扩展。
那又怎样?
所以我现在的问题是我必须使用什么数据结构来满足我的要求
【问题讨论】:
您可能想查看Reactive Java programming。 “理论上它们有可能以“T2”然后“T1”的顺序被消耗!”不,更准确的说法是它们很可能是由不同的消费者并行(又名同时)消费,并将并行处理。在并发编程中,顺序是一个非常流畅的概念。 由于元素是唯一的,您还可以尝试使用 ConcurrentSkipListSet 等集合来强制执行唯一性。您还可以完全避免同步并编写不同的线程并在最后合并它们。这样可以保持东西干净。 因此您实际上并不希望使用多个消费者。甚至可能没有多个生产者。这个问题与可扩展性无关。你遇到了不同的设计问题。 如你所描述的,如果希望两个任务相互依赖,则需要将它们捆绑执行;否则,您将永远无法在多线程环境中可靠地实现您想要做的事情——这就是@Kayaman 对“不同的设计问题”的意思。也许看看ForkJoinPool
。另请注意,“增加消费者数量”通常会降低吞吐量,而不是增加吞吐量(除非您在不同的机器上运行它们)。
【参考方案1】:
你可以试试 Heinz Kabutz 的 Striped Executor Service 一个可能的候选人。
这个神奇的线程池会保证所有具有相同stripedClass的Runnables都将按照它们提交的顺序执行,但具有不同stripedClasses的StripedRunners仍然可以独立执行。
【讨论】:
【参考方案2】:为什么不选择concurrent并选择parallel,而不是使数据结构对并发访问安全?
MapReduce 等函数式编程模型是解决此类问题的一种非常可扩展的方法。
我知道D1
和D2
可以一起分析,也可以单独分析,唯一的限制是不能以错误的顺序分析它们。 (在这里做一些假设)但如果真正的问题只是结果的组合方式,可能会有一个简单的解决方案。
您可以一起移除约束,允许单独分析它们,然后使用 reduce 函数以合理的方式将它们重新组合在一起。
在这种情况下,第一步是map
,第二步是reduce
。
即使一次性完成计算效率更高,但扩展的很大一部分,尤其是扩展是由denormalization 完成的。
【讨论】:
【参考方案3】:如果消费者并行运行,我怀疑是否有办法让他们按顺序执行同名任务。 在您的示例中(来自 cmets):
如果 Producer “P1”添加带有数据 D1 的第一个任务“T”并快速添加第二个任务“T” 与数据 D2。在这种情况下,第一个任务可以由一个线程处理 另一个线程的第二个任务;如果线程处理 第一个任务被中断,处理第二个任务的线程可以 先完成
如果 P1 不那么快提交 D2,则没有区别。消费者 1 仍然可能太慢,因此消费者 2 将能够先完成。以下是此类场景的示例:
-
P1:提交 D1
C1:读取 D1
P2:提交 D2
C2:读取 D2
C2:进程 D2
C1:进程 D1
要解决这个问题,您将不得不引入某种完成检测,我认为这会使事情变得过于复杂。
如果您有足够的负载并且可以不按顺序处理一些具有不同名称的任务,那么您可以为每个消费者使用一个队列并将相同命名的任务放入同一个队列。
public class ParallelQueue
private final BlockingQueue<Task>[] queues;
private final int consumersCount;
public ParallelQueue(int consumersCount)
this.consumersCount = consumersCount;
queues = new BlockingQueue[consumersCount];
for (int i = 0; i < consumersCount; i++)
queues[i] = new LinkedBlockingQueue<>();
public void push(Task<?> task)
int index = task.name.hashCode() % consumersCount;
queues[index].add(task);
public Task<?> pop(int consumerId) throws InterruptedException
int index = consumerId % consumersCount;
return queues[index].take();
private final static class Task<T>
private final String name;
private final T data;
private Task(String name, T data)
this.name = name;
this.data = data;
【讨论】:
以上是关于Java中的并发和可扩展数据结构来处理任务?的主要内容,如果未能解决你的问题,请参考以下文章