C#中构建多线程应用程序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#中构建多线程应用程序相关的知识,希望对你有一定的参考价值。

参考技术A

   引言

  随着双核 四核等多核处理器的推广 多核处理器或超线程单核处理器的计算机已很常见 基于多核处理的编程技术也开始受到程序员们普遍关注 这其中一个重要的方面就是构建多线程应用程序(因为不使用多线程的话 开发人员就不能充分发挥多核计算机的强大性能)

  本文针对的是构建基于单核计算机的多线程应用程序 目的在于介绍多线程相关的基本概念 内涵 以及如何通过System Threading命名空间的类 委托和BackgroundWorker组件等三种手段构建多线程应用程序

  本文如果能为刚接触多线程的朋友起到抛砖引玉的作用也就心满意足了 当然 本人才疏学浅 文中难免会有不足或错误的地方 恳请各位朋友多多指点

   理解多线程

  我们通常理解的应用程序就是一个* exe文件 当运行* exe应用程序以后 系统会在内存中为该程序分配一定的空间 同时加载一些该程序所需的资源 其实这就可以称为创建了一个进程 可以通过Windows任务管理器查看这个进程的相关信息 如映像名称 用户名 内存使用 PID(唯一的进程标示)等 如图下所示

  

  而线程则只是进程中的一个基本执行单元 一个应用程序往往只有一个程序入口 如

  [STAThread]

  static void Main()   //应用程序主入口点

  

  Application EnableVisualStyles();

  Application SetCompatibleTextRenderingDefault(false);

  Application Run(new MainForm());

  

  进程会包含一个进入此入口的线程 我们称之为主线程 其中 特性 [STAThread] 指示应用程序的默认线程模型是单线程单元(相关信息可参考 us/library/system stathreadattribute(VS ) aspx) 只包含一个主线程的进程是线程安全的 相当于程序仅有一条工作线 只有完成了前面的任务才能执行排在后面的任务

  然当在程序处理一个很耗时的任务 如输出一个大的文件或远程访问数据库等 此时的窗体界面程序对用户而言基本像是没反应一样 菜单 按钮等都用不了 因为窗体上控件的响应事件也是需要主线程来执行的 而主线程正忙着干其他的事 控件响应事件就只能排队等著主线程忙完了再执行

  为了克服单线程的这个缺陷 Win API可以让主线程再创建其他的次线程 但不论是主线程还是次线程都是进程中独立的执行单元 可以同时访问共享的数据 这样就有了多线程这个概念

  相信到这 应该对多线程有个比较感性的认识了 但笔者在这要提醒一下 基于单核计算机的多线程其实只是操作系统施展的一个障眼法而已(但这不会干扰我们理解构建多线程应用程序的思路) 他并不能缩短完成所有任务的时间 有时反而还会因为使用过多的线程而降低性能 延长时间 之所以这样 是因为对于单CPU而言 在一个单位时间(也称时间片)内 只能执行一个线程 即只能干一件事 当一个线程的时间片用完时 系统会将该线程挂起 下一个时间内再执行另一个线程 如此 CPU以时间片为间隔在多个线程之间交替执行运算(其实这里还与每个线程的优先级有关 级别高的会优先处理) 由于交替时间间隔很短 所以造成了各个线程都在 同时 工作的假象 而如果线程数目过多 由于系统挂起线程时要记录线程当前的状态数据等 这样又势必会降低程序的整体性能 但对于这些 多核计算机就能从本质上(真正的同时工作)提高程序的执行效率

   线程异步与线程同步

  从线程执行任务的方式上可以分为线程同步和线程异步 而为了方便理解 后面描述中用 同步线程 指代与线程同步相关的线程 同样 用 异步线程 表示与线程异步相关的线程

  线程异步就是解决类似前面提到的执行耗时任务时界面控件不能使用的问题 如创建一个次线程去专门执行耗时的任务 而其他如界面控件响应这样的任务交给另一个线程执行(往往由主线程执行) 这样 两个线程之间通过线程调度器短时间(时间片)内的切换 就模拟出多个任务 同时 被执行的效果

  线程异步往往是通过创建多个线程执行多个任务 多个工作线同时开工 类似多辆在宽广的公路上并行的汽车同时前进 互不干扰(读者要明白 本质上并没有 同时 仅仅是操作系统玩的一个障眼法 但这个障眼法却对提高我们的程序与用户之间的交互 以及提高程序的友好性很有用 不是吗)

  在介绍线程同步之前 先介绍一个与此紧密相关的概念——并发问题

  前面提到 线程都是独立的执行单元 可以访问共享的数据 也就是说 在一个拥有多个次线程的程序中 每个线程都可以访问同一个共享的数据 再稍加思考你会发现这样可能会出问题 由于线程调度器会随机的挂起某一个线程(前面介绍的线程间的切换) 所以当线程a对共享数据D的访问(修改 删除等操作)完成之前被挂起 而此时线程b又恰好去访问数据D 那么线程b访问的则是一个不稳定的数据 这样就会产生非常难以发现bug 由于是随机发生的 产生的结果是不可预测的 这样样的bug也都很难重现和调试 这就是并发问题

  为了解决多线程共同访问一个共享资源(也称互斥访问)时产生的并发问题 线程同步就应运而生了 线程同步的机理 简单的说 就是防止多个线程同时访问某个共享的资源 做法很简单 标记访问某共享资源的那部分代码 当程序运行到有标记的地方时 CLR(具体是什么可以先不管 只要知道它能控制就行)对各线程进行调整 如果已有线程在访问一资源 CLR就会将其他访问这一资源的线程挂起 直到前一线程结束对该资源的访问 这样就保证了同一时间只有一个线程访问该资源 打个比方 就如某资源放在只有一独木桥相连的孤岛上 如果要使用该资源 大家就得排队 一个一个来 前面的回来了 下一个再去 前面的没回来 后面的就原地待命

  这里只是把基本的概念及原理做了一个简单的阐述 不至于看后面的程序时糊里糊涂的 具体如何编写代码 下面的段落将做详细介绍

   创建多线程应用程序

  这里做一个简单的说明 下面主要通过介绍通过System Threading命名空间的类 委托和BackgroundWorker组件三种不同的手段构建多线程应用程序 具体会从线程异步和线程同步两个方面来阐述

   通过System Threading命名空间的类构建

  在 NET平台下 System Threading命名空间提供了许多类型来构建多线程应用程序 可以说是专为多线程服务的 由于本文仅是想起到一个 抛砖引玉 的作用 所以对于这一块不会探讨过多 过深 主要使用System Threading Thread类

  先从System Threading Thread类本身相关的一个小例子说起 代码如下 解释见注释

  using System;

  using System Threading; //引入System Threading命名空间

  namespace MultiThread

  

  class Class

  

  static void Main(string[] args)

  

  Console WriteLine( ************** 显示当前线程的相关信息 ************* );

  //声明线程变量并赋值为当前线程

  Thread primaryThread = Thread CurrentThread;

  //赋值线程的名称

  primaryThread Name = 主线程 ;

  //显示线程的相关信息

  Console WriteLine( 线程的名字 primaryThread Name);

  Console WriteLine( 线程是否启动? primaryThread IsAlive);

  Console WriteLine( 线程的优先级 primaryThread Priority);

  Console WriteLine( 线程的状态 primaryThread ThreadState);

  Console ReadLine();

  

  

  

  输出结果如下

  ************** 显示当前线程的相关信息 *************

  线程的名字 主线程

  线程是否启动? True

  线程的优先级 Normal

  线程的状态 Running

  对于上面的代码不想做过多解释 只说一下Thread CurrentThread得到的是执行当前代码的线程

   异步调用线程

  这里先说一下前台线程与后台线程 前台线程能阻止应用程序的终止 既直到所有前台线程终止后才会彻底关闭应用程序 而对后台线程而言 当所有前台线程终止时 后台线程会被自动终止 不论后台线程是否正在执行任务 默认情况下通过Thread Start()方法创建的线程都自动为前台线程 把线程的属性IsBackground设为true时就将线程转为后台线程

  下面先看一个例子 该例子创建一个次线程执行打印数字的任务 而主线程则干其他的事 两者同时进行 互不干扰

  using System;

  using System Threading;

  using System Windows Forms;

  namespace MultiThread

  

  class Class

  

  static void Main(string[] args)

  

  Console WriteLine( ************* 两个线程同时工作 ***************** );

  //主线程 因为获得的是当前在执行Main()的线程

  Thread primaryThread = Thread CurrentThread;

  primaryThread Name = 主线程 ;

  Console WriteLine( > 在执行主函数 Main() Thread CurrentThread Name);

  //次线程 该线程指向PrintNumbers()方法

  Thread SecondThread = new Thread(new ThreadStart(PrintNumbers));

  SecondThread Name = 次线程 ;

  //次线程开始执行指向的方法

  SecondThread Start();

  //同时主线程在执行主函数中的其他任务

  MessageBox Show( 正在执行主函数中的任务 主线程在工作 );

  Console ReadLine();

  

  //打印数字的方法

  static void PrintNumbers()

  

  Console WriteLine( > 在执行打印数字函数 PrintNumber() Thread CurrentThread Name);

  Console WriteLine( 打印数字 );

  for (int i = ; i < ; i++)

  

  Console Write( i);

  //Sleep()方法使当前线程挂等待指定的时长在执行 这里主要是模仿打印任务

  Thread Sleep( );

  

  Console WriteLine();

  

  

  

  程序运行后会看到一个窗口弹出 如图所示 同时控制台窗口也在不断的显示数字

  

  输出结果为

  ************* 两个线程同时工作 *****************

   > 主线程 在执行主函数 Main()

   > 次线程 在执行打印数字函数 PrintNumber()

  打印数字

  

  这里稍微对 Thread SecondThread = new Thread(new ThreadStart(PrintNumbers)); 这一句做个解释 其实 ThreadStart 是 System Threading 命名空间下的一个委托 其声明是 public delegate void ThreadStart() 指向不带参数 返回值为空的方法 所以当使用 ThreadStart 时 对应的线程就只能调用不带参数 返回值为空的方法 那非要指向含参数的方法呢?在System Threading命名空间下还有一个ParameterizedThreadStart 委托 其声明是 public delegate void ParameterizedThreadStart(object obj) 可以指向含 object 类型参数的方法 这里不要忘了 object 可是所有类型的父类哦 有了它就可以通过创建各种自定义类型 如结构 类等传递很多参数了 这里就不再举例说明了

   并发问题

  这里再通过一个例子让大家切实体会一下前面说到的并发问题 然后再介绍线程同步

  using System;

  using System Threading;

  namespace MultiThread

  

  class Class

  

  static void Main(string[] args)

  

  Console WriteLine( ********* 并发问题演示 *************** );

  //创建一个打印对象实例

  Printer printer = new Printer();

  //声明一含 个线程对象的数组

  Thread[] threads = new Thread[ ];

  for (int i = ; i < ; i++)

  

  //将每一个线程都指向printer的PrintNumbers()方法

  threads[i] = new Thread(new ThreadStart(printer PrintNumbers));

  //给每一个线程编号

  threads[i] Name = i ToString() + 号线程 ;

  

  //开始执行所有线程

  foreach (Thread t in threads)

  t Start();

  Console ReadLine();

  

  

  //打印类

  public class Printer

  

  //打印数字的方法

  public void PrintNumbers()

  

  Console WriteLine( > 正在执行打印任务 开始打印数字 Thread CurrentThread Name);

  for (int i = ; i < ; i++)

  

  Random r = new Random();

  //为了增加冲突的几率及 使各线程各自等待随机的时长

  Thread Sleep( * r Next( ));

  //打印数字

  Console Write( i);

  

  Console WriteLine();

  

  

  

  上面的例子中 主线程产生的 个线程同时访问同一个对象实例printer的方法PrintNumbers() 由于没有锁定共享资源(注意 这里是指控制台) 所以在PrintNumbers()输出到控制台之前 调用PrintNumbers()的线程很可能被挂起 但不知道什么时候(或是否有)挂起 导致得到不可预测的结果 如下是两个不同的结果(当然 读者的运行结果可能会是其他情形)

  

  情形一

  

  情形二

   线程同步

  线程同步的访问方式也称为阻塞调用 即没有执行完任务不返回 线程被挂起 可以使用C#中的lock关键字 在此关键字范围类的代码都将是线程安全的 lock关键字需定义一个标记 线程进入锁定范围是必须获得这个标记 当锁定的是一个实例级对象的私有方法时使用方法本身所在对象的引用就可以了 将上面例子中的打印类Printer稍做改动 添加lock关键字 代码如下

  //打印类

  public class Printer

  

  public void PrintNumbers()

  

  //使用lock关键字 锁定d的代码是线程安全的

  lock (this)

  

  Console WriteLine( > 正在执行打印任务 开始打印数字 Thread CurrentThread Name);

  for (int i = ; i < ; i++)

  

  Random r = new Random();

  //为了增加冲突的几率及 使各线程各自等待随机的时长

  Thread Sleep( * r Next( ));

  //打印数字

  Console Write( i);

  

  Console WriteLine();

  

  

  

  

  同步后执行结果如下

  

  也可以使用System Threading命名空间下的Monitor类进行同步 两者内涵是一样的 但Monitor类更灵活 这里就不在做过多的探讨 代码如下

  //打印类

  public class Printer

  

  public void PrintNumbers()

  

  Monitor Enter(this);

  try

  

  Console WriteLine( > 正在执行打印任务 开始打印数字 Thread CurrentThread Name);

  for (int i = ; i < ; i++)

  

  Random r = new Random();

  //为了增加冲突的几率及 使各线程各自等待随机的时长

  Thread Sleep( * r Next( ));

  //打印数字

  Console Write( i);

  

  Console WriteLine();

  

  finally

  

  Monitor Exit(this);

  

  

  

  输出结果与上面的一样

   通过委托构建多线程应用程序

  在看下面的内容时要求对委托有一定的了解 如果不清楚的话推荐参考一下博客园张子阳的《C# 中的委托和事件》 里面对委托与事件进行由浅入深的较系统的讲解

  这里先举一个关于委托的简单例子 具体解说见注释

  using System;

  namespace MultiThread

  

  //定义一个指向包含两个int型参数 返回值为int型的函数的委托

  public delegate int AddOp(int x int y);

  class Program

  

  static void Main(string[] args)

  

  //创建一个指向Add()方法的AddOp对象p

  AddOp pAddOp = new AddOp(Add);

  //使用委托间接调用方法Add()

  Console WriteLine( + = pAddOp( ));

  Console ReadLine();

  

  //求和的函数

  static int Add(int x int y)

  

  int sum = x + y;

  return sum;

  

  

  

  运行结果为

   + =

   线程异步

  先说明一下 这里不打算讲解委托线程异步或同步的参数传递 获取返回值等 只是做个一般性的开头而已 如果后面有时间了再另外写一篇关于多线程中参数传递 获取返回值的文章

  注意观察上面的例子会发现 直接使用委托实例 pAddOp( ) 就调用了求和方法 Add() 很明显 这个方法是由主线程执行的 然而 委托类型中还有另外两个方法——BeginInvoke()和EndInvoke() 下面通过具体的例子来说明 将上面的例子做适当改动 如下

  using System;

  using System Threading;

  using System Runtime Remoting Messaging;

  namespace MultiThread

  

  //声明指向含两个int型参数 返回值为int型的函数的委托

  public delegate int AddOp(int x int y);

  class Program

  

  static void Main(string[] args)

  

  Console WriteLine( ******* 委托异步线程 两个线程 同时 工作 ********* );

  //显示主线程的唯一标示

  Console WriteLine( 调用Main()的主线程的线程ID是 Thread CurrentThread ManagedThreadId);

  //将委托实例指向Add()方法

  AddOp pAddOp = new AddOp(Add);

  //开始委托次线程调用 委托BeginInvoke()方法返回的类型是IAsyncResult

  //包含这委托指向方法结束返回的值 同时也是EndInvoke()方法参数

  IAsyncResult iftAR = pAddOp BeginInvoke( null null);

  Console WriteLine( nMain()方法中执行其他任务 n );

  int sum = pAddOp EndInvoke(iftAR);

  Console WriteLine( + = sum);

  Console ReadLine();

  

  //求和方法

  static int Add(int x int y)

  

  //指示调用该方法的线程ID ManagedThreadId是线程的唯一标示

  Console WriteLine( 调用求和方法 Add()的线程ID是 Thread CurrentThread ManagedThreadId);

  //模拟一个过程 停留 秒

  Thread Sleep( );

  int sum = x + y;

  return sum;

  

  

  

  运行结果如下

  ******* 委托异步线程 两个线程 同时 工作 *********

  调用Main()的主线程的线程ID是

  Main()方法中执行其他任务

  调用求和方法 Add()的线程ID是

   + =

   线程同步

  委托中的线程同步主要涉及到上面使用的pAddOp BeginInvoke( null null)方法中后面两个为null的参数 具体的可以参考相关资料 这里代码如下 解释见代码注释

  using System;

  using System Threading;

  using System Runtime Remoting Messaging;

  namespace MultiThread

  

  //声明指向含两个int型参数 返回值为int型的函数的委托

  public delegate int AddOp(int x int y);

  class Program

  

  static void Main(string[] args)

  

  Console WriteLine( ******* 线程同步 阻塞 调用 两个线程工作 ********* );

  Console WriteLine( Main() invokee on thread Thread CurrentThread ManagedThreadId);

  //将委托实例指向Add()方法

  AddOp pAddOp = new AddOp(Add);

  IAsyncResult iftAR = pAddOp BeginInvoke( null null);

  //判断委托线程是否执行完任务

  //没有完成的话 主线程就做其他的事

  while (!iftAR IsCompleted)

  

  Console WriteLine( Main()方法工作中 );

  Thread Sleep( );

  

  //获得返回值

  int answer = pAddOp EndInvoke(iftAR);

  Console WriteLine( + = answer);

  Console ReadLine();

  

  //求和方法

  static int Add(int x int y)

  

  //指示调用该方法的线程ID ManagedThreadId是线程的唯一标示

  Console WriteLine( 调用求和方法 Add()的线程ID是 Thread CurrentThread ManagedThreadId);

  //模拟一个过程 停留 秒

  Thread Sleep( );

  int sum = x + y;

  return sum;

  

  

  

  运行结果如下

  ******* 线程同步 阻塞 调用 两个线程工作 *********

  Main() invokee on thread

  Main()方法工作中

  调用求和方法 Add()的线程ID是

  Main()方法工作中

  Main()方法工作中

  Main()方法工作中

  Main()方法工作中

   + =

   BackgroundWorker组件

  BackgroundWorker组件位于工具箱中 用于方便的创建线程异步的程序 新建一个WindowsForms应用程序 界面如下

  

  代码如下 解释参见注释

  private void button _Click(object sender EventArgs e)

  

  try

  

  //获得输入的数字

  int numOne = int Parse(this textBox Text);

  int numTwo = int Parse(this textBox Text);

  //实例化参数类

  AddParams args = new AddParams(numOne numTwo);

  //调用RunWorkerAsync()生成后台线程 同时传入参数

  this backgroundWorker RunWorkerAsync(args);

  

  catch (Exception ex)

  

  MessageBox Show(ex Message);

  

  

  //backgroundWorker新生成的线程开始工作

  private void backgroundWorker _DoWork(object sender DoWorkEventArgs e)

  

  //获取传入的AddParams对象

  AddParams args = (AddParams)e Argument;

  //停留 秒 模拟耗时任务

  Thread Sleep( );

  //返回值

  e Result = args a + args b;

  

  //当backgroundWorker 的DoWork中的代码执行完后会触发该事件

  //同时 其执行的结果会包含在RunWorkerCompletedEventArgs参数中

  private void backgroundWorker _RunWorkerCompleted(object sender RunWorkerCompletedEventArgs e)

  

  //显示运算结果

  MessageBox Show( 运行结果为 + e Result ToString() 结果 );

  

  

  //参数类 这个类仅仅起到一个记录并传递参数的作用

  class AddParams

  

  public int a b;

  public AddParams(int numb int numb )

  

  a = numb ;

  b = numb ;

  

  

  注意 在计算结果的同时 窗体可以随意移动 也可以重新在文本框中输入信息 这就说明主线程与backgroundWorker组件生成的线程是异步的

   总结

lishixinzhi/Article/program/net/201311/11400

C#多线程(16):手把手教你撸一个工作流

前言

前面学习了很多多线程和任务的基础知识,这里要来实践一下啦。通过本篇教程,你可以写出一个简单的工作流引擎。

本篇教程内容完成是基于任务的,只需要看过笔者的三篇关于异步的文章,掌握 C# 基础,即可轻松完成。

由于本篇文章编写的工作流程序,主要使用任务,有些逻辑过程会比较难理解,多测试一下就好。代码主要还是 C# 基础,为什么说简单?

  • 不包含 async 、await
  • 几乎不含包含多线程(有个读写锁)
  • 不包含表达式树
  • 几乎不含反射(有个小地方需要反射一下,但是非常简单)
  • 没有复杂的算法

因为是基于任务(Task)的,所以可以轻松设计组合流程,组成复杂的工作流。

由于只是讲述基础,所以不会包含很多种流程控制,这里只实现一些简单的。

先说明,别用到业务上。。。这个工作流非常简单,就几个功能,这个工作流是基于笔者的多线程系列文章的知识点。写这个东西是为了讲解任务操作,让读者更加深入理解任务。

代码地址:https://github.com/whuanle/CZGL.FLow

这两天忙着搬东西,今天没认真写文章,代码不明白的地方,可以到微信群找我。微信名称:痴者工良,dotnet 的群基本我都在。

节点

在开始前,我们来设计几种流程控制的东西。

将一个 步骤/流程/节点 称为 step。

Then

一个普通的节点,包含一个任务。

多个 Then 节点,可以组成一条连续的工作流。

Parallel

并行节点,可以设置多个并行节点放到 Parallel 中,以及在里面为任一个节点创建新的分支。

Schedule

定时节点,创建后会在一定时间后执行节点中的任务。

Delay

让当前任务阻塞一段时间。

试用一下

顺序节点

打开你的 VS ,创建项目,Nuget 引用 CZGL.DoFlow ,版本 1.0.2 。

创建一个类 MyFlow1,继承 IDoFlow

    public class MyFlow1 : IDoFlow
    {
        public int Id => 1;

        public string Name => "随便起个名字";

        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            throw new NotImplementedException();
        }
    }

你可以创建多个工作流任务,每个工作流的 Id 必须唯一。Name 和 Version 随便填,因为这里笔者没有对这几个字段做逻辑。

IDoFlowBuilder 是构建工作流的一个接口。

我们来写一个工作流测试一下。

/// <summary>
/// 普通节点 Then 使用方法
/// </summary>
public class MyFlow1 : IDoFlow
{
    public int Id => 1;
    public string Name => "test";
    public int Version => 1;

    public IDoFlowBuilder Build(IDoFlowBuilder builder)
    {
        builder.StartWith(() =>
        {
            Console.WriteLine("工作流开始");
        }).Then(() =>
        {
            Console.WriteLine("下一个节点");
        }).Then(() =>
         {
             Console.WriteLine("最后一个节点");
         });
        return builder;
    }
} 

Main 方法中:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow1>();
            // FlowCore.RegisterWorkflow(new MyFlow1());
            FlowCore.Start(1);
            Console.ReadKey();
        }

.StartWith() 方法开始一个工作流;

FlowCore.RegisterWorkflow<T>() 注册一个工作流;

FlowCore.Start();执行一个工作流;

并行任务

其代码如下:

    /// <summary>
    /// 并行节点 Parallel 使用方法
    /// </summary>
    public class MyFlow2 : IDoFlow
    {
        public int Id => 2;
        public string Name => "test";
        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            builder.StartWith()
                .Parallel(steps =>
                {
                    // 每个并行任务也可以设计后面继续执行其它任务
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行1");
                    }).Do(() =>
                    {
                        Console.WriteLine("并行2");
                    });
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行3");
                    });

                    // 并行任务设计完成后,必须调用此方法
                    // 此方法必须放在所有并行任务 .Do() 的最后
                    steps.EndParallel();

                    // 如果 .Do() 在 EndParallel() 后,那么不会等待此任务
                    steps.Do(() => { Console.WriteLine("并行异步"); });

                    // 开启新的分支
                    steps.StartWith()
                    .Then(() =>
                    {
                        Console.WriteLine("新的分支" + Task.CurrentId);
                    }).Then(() => { Console.WriteLine("分支2.0" + Task.CurrentId); });

                }, false)
                .Then(() =>
                {
                    Console.WriteLine("11111111111111111 ");
                });

            return builder;
        }
    }

Main 方法中:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow2>();
            FlowCore.Start(2);
            Console.ReadKey();
        }

通过以上示例,可以大概了解本篇文章中我们要写的程序。

编写工作流

建立一个类库项目,名为 DoFlow

建立 ExtensionsInterfacesServices 三个目录。

接口构建器

新建 IStepBuilder 接口文件到 Interfaces 目录,其内容如下:

using System;

namespace DoFlow.Interfaces
{
    public interface IStepBuilder
    {
        /// <summary>
        /// 普通节点
        /// </summary>
        /// <param name="stepBuilder"></param>
        /// <returns></returns>
        IStepBuilder Then(Action action);

        /// <summary>
        /// 多个节点
        /// <para>默认下,需要等待所有的任务完成,这个step才算完成</para>
        /// </summary>
        /// <param name="action"></param>
        /// <param name="anyWait">任意一个任务完成即可跳转到下一个step</param>
        /// <returns></returns>
        IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);

        /// <summary>
        /// 节点将在某个时间间隔后执行
        /// <para>异步,不会阻塞当前工作流的运行,计划任务将在一段时间后触发</para>
        /// </summary>
        /// <returns></returns>
        IStepBuilder Schedule(Action action, TimeSpan time);

        /// <summary>
        /// 阻塞一段时间
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        IStepBuilder Delay(TimeSpan time);
    }
}

新建 IStepParallel 文件到 Interfaces 目录。

using System;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 并行任务
    ///  <para>默认情况下,只有这个节点的所有并行任务都完成后,这个节点才算完成</para>
    /// </summary>
    public interface IStepParallel
    {
        /// <summary>
        /// 一个并行任务
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepParallel Do(Action action);

        /// <summary>
        /// 开始一个分支
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepBuilder StartWith(Action action = null);

        /// <summary>
        /// 必须使用此方法结束一个并行任务
        /// </summary>
        void EndParallel();
    }

    /// <summary>
    /// 并行任务
    /// <para>任意一个任务完成后,就可以跳转到下一个 step</para>
    /// </summary>
    public interface IStepParallelAny : IStepParallel
    {

    }
}

工作流构建器

新建 IDoFlowBuilder 接口文件到 Interfaces 目录。

using System;
using System.Threading.Tasks;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 构建工作流任务
    /// </summary>
    public interface IDoFlowBuilder
    {
        /// <summary>
        /// 开始一个 step
        /// </summary>
        IStepBuilder StartWith(Action action = null);
        void EndWith(Action action);

        Task ThatTask { get; }
    }
}

新建 IDoFlow 接口文件到 Interfaces 目录。

namespace DoFlow.Interfaces
{

    /// <summary>
    /// 工作流
    /// <para>无参数传递</para>
    /// </summary>
    public interface IDoFlow
    {
        /// <summary>
        /// 全局唯一标识
        /// </summary>
        int Id { get; }

        /// <summary>
        /// 标识此工作流的名称
        /// </summary>
        string Name { get; }

        /// <summary>
        /// 标识此工作流的版本
        /// </summary>
        int Version { get; }

        IDoFlowBuilder Build(IDoFlowBuilder builder);
    }
}

依赖注入

新建 DependencyInjectionService 文件到 Services 目录。

用于实现依赖注入和解耦。

using DoFlow.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;

namespace DoFlow.Services
{
    /// <summary>
    /// 依赖注入服务
    /// </summary>
    public static class DependencyInjectionService
    {
        private static IServiceCollection _servicesList;
        private static IServiceProvider _services;
        static DependencyInjectionService()
        {
            IServiceCollection services = new ServiceCollection();
            _servicesList = services;
            // 注入引擎需要的服务
            InitExtension.StartInitExtension();
            var serviceProvider = services.BuildServiceProvider();
            _services = serviceProvider;
        }

        /// <summary>
        /// 添加一个注入到容器服务
        /// </summary>
        /// <typeparam name="TService"></typeparam>
        /// <typeparam name="TImplementation"></typeparam>
        public static void AddService<TService, TImplementation>()
            where TService : class
            where TImplementation : class, TService
        {
            _servicesList.AddTransient<TService, TImplementation>();
        }

        /// <summary>
        /// 获取需要的服务
        /// </summary>
        /// <typeparam name="TIResult"></typeparam>
        /// <returns></returns>
        public static TIResult GetService<TIResult>()
        {
            TIResult Tservice = _services.GetService<TIResult>();
            return Tservice;
        }
    }
}

添加一个 InitExtension 文件到 Extensions 目录。

using DoFlow.Interfaces;
using DoFlow.Services;

namespace DoFlow.Extensions
{
    public static class InitExtension
    {
        private static bool IsInit = false;
        public static void StartInitExtension()
        {
            if (IsInit) return;
            IsInit = true;
            DependencyInjectionService.AddService<IStepBuilder, StepBuilder>();
            DependencyInjectionService.AddService<IDoFlowBuilder, DoFlowBuilder>();
            DependencyInjectionService.AddService<IStepParallel, StepParallelWhenAll>();
            DependencyInjectionService.AddService<IStepParallelAny, StepParallelWhenAny>();
        }
    }
}

实现工作流解析

以下文件均在 Services 目录建立。

新建 StepBuilder 文件,用于解析节点,构建任务。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{

    /// <summary>
    /// 节点工作引擎
    /// </summary>
    public class StepBuilder : IStepBuilder
    {
        private Task _task;

        /// <summary>
        /// 延迟执行
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Delay(TimeSpan time)
        {
            Task.Delay(time).Wait();
            return this;
        }

        /// <summary>
        /// 并行 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Parallel(Action<IStepParallel> action, bool anyAwait = false)
        {
            IStepParallel parallel = anyAwait ? DependencyInjectionService.GetService<IStepParallelAny>() : DependencyInjectionService.GetService<IStepParallel>();
            Task task = new Task(() =>
            {
                action.Invoke(parallel);
            });

            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
            });
            _task = task;
            return this;
        }

        /// <summary>
        /// 计划任务
        /// </summary>
        /// <param name="action"></param>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Schedule(Action action, TimeSpan time)
        {
            Task.Factory.StartNew(() =>
            {
                Task.Delay(time).Wait();
                action.Invoke();
            });
            return this;
        }

        /// <summary>
        /// 普通 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Then(Action action)
        {
            Task task = new Task(action);
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
                task.Wait();
            });
            _task = task;
            return this;
        }

        public void SetTask(Task task)
        {
            _task = task;
        }
    }
}

新建 StepParallel 文件,里面有两个类,用于实现同步任务。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    /// <summary>
    /// 第一层所有任务结束后才能跳转下一个 step
    /// </summary>
    public class StepParallelWhenAll : IStepParallel
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAll()
        {
            _task = new Task(() => { },TaskCreationOptions.AttachedToParent);
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAll(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }

    /// <summary>
    /// 完成任意一个任务即可跳转到下一个 step
    /// </summary>
    public class StepParallelWhenAny : IStepParallelAny
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAny()
        {
            _task = Task.Run(() => { });
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAny(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }
}

新建 DoFlowBuilder 文件,用于构建工作流。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    public class DoFlowBuilder : IDoFlowBuilder
    {
        private Task _task;
        public Task ThatTask => _task;

        public void EndWith(Action action)
        {
            _task.Start();
        }

        public IStepBuilder StartWith(Action action = null)
        {
            if (action is null)
                _task = new Task(() => { });
            else _task = new Task(action);

            IStepBuilder _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            ((StepBuilder)_stepBuilder).SetTask(_task);
            return _stepBuilder;
        }
    }
}

新建 FlowEngine 文件,用于执行工作流。

using DoFlow.Interfaces;

namespace DoFlow.Services
{
    /// <summary>
    /// 工作流引擎
    /// </summary>
    public class FlowEngine
    {
        private readonly IDoFlow _flow;
        public FlowEngine(IDoFlow flow)
        {
            _flow = flow;
        }

        /// <summary>
        /// 开始一个工作流
        /// </summary>
        public void Start()
        {
            IDoFlowBuilder builder = DependencyInjectionService.GetService<IDoFlowBuilder>();
            _flow.Build(builder).ThatTask.Start();
        }
    }
}

新建 FlowCore 文件,用于存储和索引工作流。使用读写锁解决并发字典问题。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;

namespace DoFlow.Services
{
    public static class FlowCore
    {
        private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();

        // 读写锁
        private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();

        /// <summary>
        /// 注册工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow(IDoFlow flow)
        {
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 注册工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow<TDoFlow>()
        {

            Type type = typeof(TDoFlow);
            IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 要启动的工作流
        /// </summary>
        /// <param name="id"></param>
        public static bool Start(int id)
        {
            FlowEngine engine;
            // 读写锁
            try
            {
                readerWriterLockSlim.EnterUpgradeableReadLock();

                if (!flowEngines.ContainsKey(id))
                    return default;
                try
                {
                    readerWriterLockSlim.EnterWriteLock();
                    engine = flowEngines[id];
                }
                catch { return default; }
                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }
            }
            catch { return default; }
            finally
            {
                readerWriterLockSlim.ExitUpgradeableReadLock();
            }

            engine.Start();
            return true;
        }
    }
}

就这样程序写完了。

忙去了。

以上是关于C#中构建多线程应用程序的主要内容,如果未能解决你的问题,请参考以下文章

在 C# 中使用多线程屏蔽/过滤图像(Windows 窗体应用程序)

C#多线程学习 多线程的相关概念

c#多线程应用程序中的界面冻结

一个C# (队列多任务+多线程处理)对象的winform demo

C# 多线程

C#多线程(16):手把手教你撸一个工作流