使用内部同步实现作业列表

Posted

技术标签:

【中文标题】使用内部同步实现作业列表【英文标题】:Implementing a job list with internal synchronisation 【发布时间】:2011-01-05 10:37:00 【问题描述】:

我正在开发一个简单的工作线程框架,它与id Tech 5 Challenges 中描述的框架非常相似。在最基本的层面上,我有一组作业列表,我想在一堆 CPU 线程中安排这些列表(使用标准线程池进行实际调度。)但是,我想知道这个信号/等待的东西是怎么回事可以有效地实施候补名单内部。据我了解,如果尚未执行信号令牌,则等待令牌会阻止列表执行。这隐含地意味着信号之前的所有事情都必须在信号发出之前完成。所以假设我们有一个这样的列表:

J1, J2, S, J3, W, J4

那么调度可以这样进行:

#1: J1, J2, J3
<wait for J1, J2, run other lists if possible>
#2: J4

但是,这并不像看起来那么容易,因为给定一组列表,我必须在 readywaiting 之间移动其中的一些,并且还有特殊代码来收集信号之前的所有作业并在它们上标记一些东西,以便当且仅当它们全部完成时它们才能触发信号(例如,在执行它时,不再可能将作业添加到列表 中,如下所示信号访问之前插入的作业。)

是否有任何“标准”方法可以有效地实现这一点?我还想知道如何最好地安排作业列表的执行,现在,每个核心都抓取一个作业列表,并安排其中的所有作业,这提供了很好的扩展性(对于 32k 个作业 à 0.7 毫秒,我得到 101%,我猜部分原因是单线程版本有时会被调度到不同的内核上。)

【问题讨论】:

你平均有多少工作清单? 10-40 个工作列表,总共约 300-500 个工作(有些列表可以包含 50+ 个工作) 信号/同步对也可以嵌套吗? 没有。例如,我将它用于图像处理,我可以将图像分成 n 个部分,在它们上做 A,信号,做一些其他小的事情,同步,再次在 n 部分上工作,做 B,信号,同步。因此,每个工作清单都可以连续执行而无需任何同步。如果一个列表中的作业被分派到多个核心上,这将变得很棘手。 “标准”取决于哪种语言/平台,您能否也提及这些? 【参考方案1】:

如果您可以在您的环境中访问work stealing framework(例如,如果您使用 C 语言,则为 Cilk,或者使用 Java 语言的 Doug Lea 的 fork/join framework),您可以轻松获得简单而干净的解决方案(与低级别的 ad-hoc 尝试相比,如果您不能使用类似的东西,您可能必须这样做),它为您提供自动负载平衡和良好的数据局部性。

这里是一个解决方案的高级描述:每个内核启动一个线程。每个人都会被分配一个列表,直到他们筋疲力尽(有很多方法可以做到这一点 - 这是非常好的并发排队机制的任务,这就是您希望尽可能避免自己动手解决方案的原因)。每个工人一个接一个地遍历列表的行: - 维护了两个队列,一个用于signal 令牌之前的那些作业,一个用于它之后的那些作业。 - 遇到作业时,forked,并添加到相应的队列中(取决于我们是否看到signal 令牌) - 当遇到wait 标记时,我们会在信号之前加入所有作业(如果我理解正确,这就是您描述的语义)。请注意,在我使用helpJoin() 的代码中, 这意味着线程实际上会提供帮助(通过弹出分叉任务并执行它们直到连接可以继续)

“分叉”意味着将任务放入线程本地队列中,该队列要么稍后由线程自己执行,要么被另一个寻找工作要做的线程窃取。

出于说明目的,这里是使用上述 java 框架对这个场景进行的大约 80 行的工作模拟。它创建与可用内核一样多的线程和一些列表,并开始执行它们。请注意 run() 方法是多么简单——虽然它仍然具有负载平衡的好处,并且线程主要从自己的列表中执行任务,除非它们用完工作并开始窃取一些任务。当然,如果您不是使用 Java 或 C,则必须找到类似的框架,但相同的一组核心思想将类似地简化您的代码,而不管是哪种语言。

import java.util.*;
import java.util.concurrent.*;
import jsr166y.ForkJoinPool;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveTask;

public class FJTest 
    public static void main(String[] args) throws Exception 
        Iterable<List<TaskType>> lists = createLists(10);

        ForkJoinPool pool = new ForkJoinPool();

        for (final List<TaskType> list : lists) 
            pool.submit(new Runnable() 
                public void run() 
                    List<ForkJoinTask> beforeSignal = new ArrayList<ForkJoinTask>();
                    List<ForkJoinTask> afterSignal = new ArrayList<ForkJoinTask>();
                    boolean signaled = false;
                    for (TaskType task : list) 
                        switch (task) 
                            case JOB:
                                ForkJoinTask job = new Job();
                                if (signaled == false)
                                    beforeSignal.add(job);
                                else
                                    afterSignal.add(job);
                                job.fork();
                                break;
                            case SIGNAL:
                                signaled = true;
                                break;
                            case WAIT:
                                signaled = false;
                                for (ForkJoinTask t : beforeSignal) 
                                    t.helpJoin();
                                
                                beforeSignal = afterSignal;
                                afterSignal = new ArrayList<ForkJoinTask>();
                        
                    
                
            );
        

        pool.shutdown();
        pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    

    private static Iterable<List<TaskType>> createLists(int size) 
        List<List<TaskType>> tasks = new ArrayList<List<TaskType>>();
        for (int i = 0; i < size; i++) 
            tasks.add(createSomeList());
        
        return tasks;
    

    private static List<TaskType> createSomeList() 
        return Arrays.asList(
                TaskType.JOB,
                TaskType.JOB,
                TaskType.SIGNAL,
                TaskType.JOB,
                TaskType.WAIT,
                TaskType.JOB);
    



enum TaskType 
    JOB, SIGNAL, WAIT;

class Job extends RecursiveTask<Void> 
    @Override
    protected Void compute() 
        long x = 1;
        for (long i = 1; i < 200000001; i++) 
            x = i * x;
        
        System.out.println(x); //just to use x
        return null;
    

【讨论】:

【参考方案2】:

这是一种相对简单的调度算法。有几个问题起初看起来很棘手,但实际上并非如此(信号/等待和缓存位置)。我将解释这些技术,然后给出一些我编写的代码来说明这些概念,然后给出一些关于调优的最后说明。

要使用的算法

有效地处理信号/等待一开始似乎很棘手,但实际上却非常容易。由于信号/等待对不能嵌套或重叠,因此在任何给定时间实际上只能有两个被满足,一个被等待。只需将“CurrentSignal”指针指向最近未满足的信号即可进行簿记。

确保内核不会在列表之间跳转太多,并且给定列表不会在太多内核之间共享也相对容易:每个内核不断从同一个列表中获取作业,直到它阻塞,然后切换到另一个清单。为了防止所有核心聚集在一个列表中,每个列表都保留了一个 WorkerCount,以告知有多少核心在使用它,并且列表是经过组织的,因此核心首先选择具有较少工人的列表。

通过只锁定调度程序或您正在处理的列表,可以保持简单锁定,永远不要同时锁定。

您对在列表已经开始执行后将作业添加到列表中表达了一些担忧。事实证明,支持这一点几乎是微不足道的:它只需要在将作业添加到当前完成的列表时从列表调用调度程序,以便调度程序可以调度新作业。

数据结构

以下是您需要的基本数据结构:

class Scheduler

  LinkedList<JobList>[] Ready; // Indexed by number of cores working on list
  LinkedList<JobList> Blocked;
  int ReadyCount;
  bool Exit;

  public:
    void AddList(JobList* joblist);
    void DoWork();

  internal:
    void UpdateQueues(JobList* joblist);

    void NotifyBlockedCores();
    void WaitForNotifyBlockedCores();


class JobList

  Scheduler Scheduler;
  LinkedList<JobList> CurrentQueue;

  LinkedList<Job> Jobs;            // All jobs in the job list
  LinkedList<SignalPoint> Signals; // All signal/wait pairs in the job list,
                                      plus a dummy

  Job* NextJob;                    // The next job to schedule, if any
  int NextJobIndex;                // The index of NextJob

  SignalPoint* CurrentSignal;      // First signal not fully satisfied

  int WorkerCount;                 // # of cores executing in this list

  public:
    void AddJob(Job* job);
    void AddSignal();
    void AddWait();

  internal:
    void Ready  get; 
    void GetNextReadyJob(Job& job, int& jobIndex);
    void MarkJobCompleted(Job job, int jobIndex);

class SignalPoint

  int SignalJobIndex = int.MaxValue;
  int WaitJobIndex = int.MaxValue;
  int IncompleteCount = 0;

请注意,给定作业列表的信号点最方便地与实际作业列表分开存储。

调度器实现

调度程序跟踪作业列表,将它们分配给核心,并从作业列表中执行作业。

AddList 将作业添加到调度程序。它必须根据它是否有任何工作要做(即是否已添加任何作业)而将其置于就绪或阻塞队列中,因此只需调用 UpdateQueues。

void Scheduler.AddList(JobList* joblist)

  joblist.Scheduler = this;
  UpdateQueues(joblist);

UpdateQueues 集中了队列更新逻辑。请注意选择新队列的算法,以及当工作可用时对空闲内核的通知:

void Scheduler.UpdateQueues(JobList* joblist)

  lock(this)
  
    // Remove from prior queue, if any
    if(joblist.CurrentQueue!=null)
    
      if(joblist.CurrentQueue!=Blocked) ReadyCount--;
      joblist.CurrentQueue.Remove(joblist);
    

    // Select new queue
    joblist.CurrentQueue = joblist.Ready ? Ready[joblist.WorkerCount] : Blocked;

    // Add to new queue
    joblist.CurrentQueue.Add(joblist);
    if(joblist.CurrentQueue!=Blocked)
      if(++ReadyCount==1) NotifyBlockedCores();
  

DoWork 是一个正常的调度程序工作,除了:1. 它选择具有最少工人的 JobList,2. 它从给定的作业列表中处理作业,直到它不能再工作,以及 3. 它存储 jobIndex 以及作业,以便作业列表可以轻松更新完成状态(实施细节)。

void Scheduler.DoWork()

  while(!Exit)
  
    // Get a job list to work on
    JobList *list = null;
    lock(this)
    
      for(int i=0; i<Ready.Length; i++)
        if(!Ready[i].Empty)
        
          list = Ready[i].First;
          break;
        
      if(list==null)  // No work to do
      
        WaitForNotifyBlockedCores();
        continue;
      
      list.WorkerCount++;
      UpdateQueues(list);
    

    // Execute jobs in the list as long as possible
    while(true)
    
      int jobIndex;
      Job job;
      if(!GetNextReadyJob(&job, &jobIndex)) break;

      job.Execute();

      list.MarkJobCompleted(job, jobIndex);
    

    // Release the job list
    lock(this)
    
      list.WorkerCount--;
      UpdateQueues(list);
    
  

JobList 实施

JobList 跟踪信号/等待如何与作业穿插,并跟踪哪些信号/等待对在其信号点之前已经完成了所有工作。

构造函数创建一个虚拟信号点来添加作业。每当添加一个新的“信号”时,这个信号点就会变成一个真实的信号点(并添加一个新的虚拟信号点)。

JobList.JobList()

  // Always have a dummy signal point at the end
  Signals.Add(CurrentSignal = new SignalPoint());

AddJob 将作业添加到列表中。它在 SignalPoint 中被标记为不完整。当作业实际执行时,同一个 SignalPoint 的 IncompleteCount 会递减。还必须告诉调度程序事情可能已经改变,因为新作业可以立即执行。请注意,调度程序是在“this”上的锁被释放后调用的,以避免死锁。

void JobList.AddJob(Job job)

  lock(this)
  
    Jobs.Add(job);
    Signals.Last.IncompleteCount++;
    if(NextJob == null)
      NextJob = job;
  
  if(Scheduler!=null)
    Scheduler.UpdateQueues(this);

AddSignal 和 AddWait 将信号和等待添加到作业列表。注意AddSignal实际上是新建了一个SignalPoint,而AddWait只是在之前创建的SignalPoint中填入了等待点信息。

void JobList.AddSignal()

  lock(this)
  
    Signals.Last.SignalJobIndex = Jobs.Count;  // Reify dummy signal point
    Signals.Add(new SignalPoint());            // Create new dummy signal point
  



void JobList.AddWait()

  lock(this)
  
    Signals.Last.Previous.WaitJobIndex = Jobs.Count;
  

Ready 属性确定列表是否准备好分配给它的其他核心。如果下一个作业在开始之前等待信号,则可能有两个或三个内核在列表上工作,而列表尚未“就绪”。

bool JobList.Ready

  get
  
    lock(this)
    
      return NextJob!=null &&
        (CurrentSignal==Signals.Last ||
         NextJobIndex < CurrentSignal.WaitJobIndex);
    
  

GetNextReadyJob 非常简单:如果我们准备好了,只需返回列表中的下一个作业。

void JobList.GetNextReadyJob(Job& job, int& jobIndex)

  lock(this)
  
    if(!Ready) return false;
    jobIndex = list.NextJobIndex++;
    job = list.NextJob; list.NextJob = job.Next;
    return true;

  

MarkJobCompleted 可能是最有趣的。由于信号和等待的结构,当前作业要么在 CurrentSignal 之前,要么在 CurrentSignal 和 CurrentSignal.Next 之间(如果它在最后一个实际信号之后,它将被计为在 CurrentSignal 和最后的虚拟 SignalPoint 之间)。我们需要减少未完成工作的数量。如果这个计数变为零,我们可能还需要继续下一个信号。当然,我们永远不会在最后通过虚拟 SignalPoint。

请注意,此代码没有调用 Scheduler.UpdateQueue,因为我们知道调度程序将在一秒钟内调用 GetNextReadyJob,如果它返回 false,它将调用 UpdateQueue。

void JobList.MarkJobCompleted(Job job, int jobIndex)

  lock(this)
  
    if(jobIndex >= CurrentSignal.SignalJobIndex)
      CurrentSignal.Next.IncompleteCount--;
    else
    
      CurrentSignal.IncompleteCount--;
      if(CurrentSignal.IncompleteCount==0)
        if(CurrentSignal.WaitJobIndex < int.MaxValue)
          CurrentSignal = CurrentSignal.Next;
    
  

根据列表长度、作业长度估计等进行调整

上面的代码没有关注任务列表的长度,所以如果有一百个小任务列表和一个巨大的任务列表,那么每个核心可能会获取一个单独的小任务列表,然后全部聚集在一起在巨大的一个,导致效率低下。这可以通过将 Ready[] 设置为优先于 (joblist.Jobs.Count - joblist.NextJobIndex) 的优先级队列数组来解决,但优先级仅在正常 UpdateQueue 情况下才会实际更新以提高效率。

通过创建一个考虑信号/等待组合的数量和间隔以确定优先级的启发式方法,这可以变得更加复杂。最好通过使用作业持续时间和资源使用情况的分布来调整这种启发式方法。

如果单个作业的持续时间已知,或者如果可以对它们进行良好的估计,那么启发式可以使用估计的剩余持续时间,而不仅仅是列表长度。

结语

对于您提出的问题,这是一个相当标准的解决方案。你可以使用我提供的算法,它们会起作用,包括锁定,但是你将无法编译我上面写的代码,原因有几个:

    这是 C++ 和 C# 语法的疯狂组合。我最初开始用 C# 编写,然后将一堆语法更改为 C++ 风格,因为我认为这更有可能是您在这样的项目中使用的。但是我留下了很多 C# 主义。幸运的是没有 LINQ ;-)。

    LinkedList 的详细信息有些令人费解。我假设列表可以执行 First、Last、Add 和 Remove,并且列表中的项目可以执行 Previous 和 Next。但是我没有将实际的 API 用于我所知道的任何真正的链表类。

    我没有编译或测试它。我保证那里有一两个错误。

底线:您应该将上面的代码视为伪代码,即使它看起来像真正的 McCoy。

享受吧!

【讨论】:

嗨,好帖子。您对阅读有关该主题的材料有什么建议吗?谢谢。 @Mark:对不起,我知道我在 25-30 年前读过几篇与此相关的好文章,但现在无法给你参考。如果你明白我的意思,这样的事情只会留在你的脑海里,不会消失。

以上是关于使用内部同步实现作业列表的主要内容,如果未能解决你的问题,请参考以下文章

使用内部的方法来实现HTTP请求

Task运行过程分析3——Map Task内部实现

Vector(使用数组实现,线程同步)

5分钟了解Redis的内部实现快速列表(quicklist)

201521123105 第11周Java学习总结

技术干货|如何利用 ChunJun 实现数据实时同步?