线程队列

Posted Skyven

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程队列相关的知识,希望对你有一定的参考价值。

在web应用中,单个进程或者机器的响应速度有限,类似大量数据导入导出的操作的数量如果不加限制,会导致服务器cpu被吃满,导致其他一些很简单的请求无法及时响应的问题。针对这个限制提出了如下要求。
1. 先到达的请求先执行: 先入先出原则
2. 只能同时执行若干请求:避免cpu被吃满
3. 异步执行:如果长时间执行会长期占用iis的工作线程

基于上述的要求我设计了一个队列。这个队列我们需要稍微提一个组件,ParallelExtensionsExtras

这是微软提供的一个线程的扩展,具体的自行搜索下相关资料,这里开始的时候我并没有用这个组件,而是自己对task的封装,但是实际上task还是利用的线程池,线程池默认的线程数10个,并不能满足某些场景对多个现成的要求,于是在漫长的搜索过程中才发现了这个组建。

这里我主要用到两个对TaskScheduler的扩展

QueuedTaskScheduler:对task进行排队执行,执行时在Thread环境中,并且可以控制线程的数量,类似自定义的线程池。

ThreadPerTaskScheduler: 顾名思义就是在线程中执行每个task。

针对两种区别在于,QueuedTaskScheduler 的线程是可复用的,在线程数量固定的情况下推荐使用。

ThreadPerTaskScheduler 只创建线程而不进行销毁,每次执行一个task都是使用一个new一个thread执行。适用于在某些线程数量动态变化的情况。

下面是实现代码:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Configuration;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Schedulers;
using Coralcode.Framework.Data;
using Coralcode.Framework.Extensions;
using ThreadState = System.Threading.ThreadState;

namespace Coralcode.Framework.Task
{
    /// <summary>
    /// 线程队列
    /// 注意如果使用ScheduleType.Defaut 存在线程池数量限制
    /// </summary>
    public class TaskQueue
    {
        public static readonly TaskQueue Instance = new TaskQueue(1, ScheduleType.Thread);

        /// <summary>
        /// 获取队列
        /// </summary>
        /// <param name="maxCount">最大并发数</param>
        /// <param name="type">执行计划</param>
        /// <returns></returns>
        public static TaskQueue GetQueue(int maxCount,ScheduleType type=ScheduleType.Thread)
        {
            return new TaskQueue(maxCount, type);
        }

        /// <summary>
        /// 获取队列
        /// </summary>
        /// <param name="maxCount">最大并发数</param>
        /// <param name="name">队列名称</param>
        /// <param name="type">执行计划</param>
        /// <returns></returns>
        public static TaskQueue GetQueue(int maxCount, string name, ScheduleType type = ScheduleType.Thread)
        {
            return new TaskQueue(maxCount, name, type);
        }

        /// <summary>
        /// 获取队列
        /// </summary>
        /// <param name="maxCount">最大并发数</param>
        /// <param name="name">队列名称</param>
        /// <param name="token">取消令牌</param>
        /// <param name="type">执行计划</param>
        public static TaskQueue GetQueue(int maxCount, string name, CancellationToken token, ScheduleType type = ScheduleType.Thread)
        {
            return new TaskQueue(maxCount, name, token, type);
        }

        /// <summary>
        /// 获取队列
        /// </summary>
        /// <param name="maxCount">最大并发数</param>
        /// <param name="token">取消令牌</param>
        /// <param name="type">执行计划</param>
        public static TaskQueue GetQueue(int maxCount, CancellationToken token, ScheduleType type = ScheduleType.Thread)
        {
            return new TaskQueue(maxCount, IdentityGenerator.NewGuidString(), token, type);
        }

        private ConcurrentQueue<System.Threading.Tasks.Task> _tasks;
        private readonly int _limitedTaskCount;
        private int _runningTaskCount;
        private Thread _mainExcuteThread;
        private CancellationToken _token;
        private TaskScheduler _scheduler;

        private TaskQueue(int maxCount, ScheduleType type) :
            this(maxCount, IdentityGenerator.NewGuidString(), type)
        {
        }

        private TaskQueue(int maxCount, string name, ScheduleType type) :
            this(maxCount, name, CancellationToken.None, type)

        {
        }

        private TaskQueue(int maxCount, string name, CancellationToken token, ScheduleType type)
        {
            _limitedTaskCount = maxCount;
            _tasks = new ConcurrentQueue<System.Threading.Tasks.Task>();
            Name = name;
            _token = token;
            switch (type)
            {
                case ScheduleType.Default:
                    _scheduler = new QueuedTaskScheduler(maxCount);
                    break;
                case ScheduleType.Thread:
                    _scheduler = new ThreadPerTaskScheduler();
                    break;
                default:
                    throw new ArgumentOutOfRangeException(nameof(type), type, null);
            }

        }

        /// <summary>
        /// 执行不带返回结果的方法
        /// </summary>
        /// <param name="func"></param>
        /// <returns></returns>
        public System.Threading.Tasks.Task Execute(Action func)
        {
            var task = new System.Threading.Tasks.Task(func, _token);
            _tasks.Enqueue(task);
            if (_mainExcuteThread == null || _mainExcuteThread.ThreadState.HasFlag(ThreadState.Stopped))
            {
                _mainExcuteThread?.DisableComObjectEagerCleanup();
                _mainExcuteThread = new Thread(NotifyThreadPendingWork);
                _mainExcuteThread.Start();
            }
            return task;
        }

        /// <summary>
        /// 执行带返回结果的方法
        /// </summary>
        /// <typeparam name="TResult"></typeparam>
        /// <param name="func"></param>
        /// <returns></returns>

        [MethodImpl(MethodImplOptions.Synchronized)]
        public Task<TResult> Execute<TResult>(Func<TResult> func)
        {
            var task = new Task<TResult>(func, _token);
            _tasks.Enqueue(task);
            if (_mainExcuteThread == null || _mainExcuteThread.ThreadState.HasFlag(ThreadState.Stopped))
            {
                _mainExcuteThread?.DisableComObjectEagerCleanup();
                _mainExcuteThread = new Thread(NotifyThreadPendingWork);
                _mainExcuteThread.Priority = ThreadPriority.Highest;
                _mainExcuteThread.Start();
            }
            return task;
        }

        private void NotifyThreadPendingWork()
        {
            try
            {
                while (true)
                {
                    if (_token.IsCancellationRequested)
                    {
                        _tasks = new ConcurrentQueue<System.Threading.Tasks.Task>();
                        break;
                    }

                    System.Threading.Tasks.Task task;
                    if (!_tasks.TryDequeue(out task))
                        break;

                    task.Start(_scheduler);
                    Interlocked.Increment(ref _runningTaskCount);
                    task.ContinueWith(item =>
                    {
                        Interlocked.Decrement(ref _runningTaskCount);
                    });
                    //Debug.WriteLine("队列{0},允许执行 {1} 条,等待线程为 {2} ,执行中 {3} 条,时间为 {4} ", _name, _limitedTaskCount, _tasks.Count, _runningTaskCount, DateTime.Now);
                    while (_runningTaskCount >= _limitedTaskCount)
                    {
                        Thread.Sleep(500);
                    }
                }
            }
            finally
            {
                _runningTaskCount = 0;
            }
        }

        /// <summary>
        /// 线程队列的名字
        /// </summary>
        public string Name { get; }

        /// <summary>
        /// 根据返回的task的id获取到当前task排队的位置
        /// </summary>
        /// <param name="taskId"></param>
        /// <returns>返回-1表示正在执行,或者task没有加进去,返回大于等于0则表示其顺序</returns>
        public int GetCurrentTaskIndex(int taskId)
        {
            lock (_tasks)
            {
                return _tasks.IndexOf(item => item.Id == taskId);
            }
        }

        /// <summary>
        /// 等待执行的线程数量
        /// </summary>
        public int WaitingTaskCount => _tasks.Count;
        /// <summary>
        /// 正在执行的线程数量
        /// </summary>
        public int RunningTaskCount => _runningTaskCount;
        /// <summary>
        /// 并发数
        /// </summary>
        public int LimitedTaskCount => _limitedTaskCount;
    }

    /// <summary>
    /// 
    /// </summary>
    public enum ScheduleType {
        /// <summary>
        /// 线程池,默认方式
        /// </summary>
        Default,
        /// <summary>
        /// 自定义线程,ThreadPerTaskScheduler
        /// </summary>
        Thread
    }
    
}

  

重点部分说明
1. ConcurrentQueue 本身在处理多线程环境所以采用线程安全的队列。
2. ContinueWith 在任务执行完毕之后需要对执行的数量-1。
3. NotifyThreadPendingWork 这里就是启动另外一个主线程来对任务进行分发。
4. 主线程分发来满足异步的需求。
5. CancellationToken 提供取消执行的功能。
6. 可以采用默认也可以自己实例化,而默认是一个线程,即考虑常用情况,也提供扩展的功能。

测试代码


写出来这个队列的部分可能只用了1小时,但是写测试代码和调试用了差不多半天时间才搞定。

线程的测试在单元测试中一直是难以控制的,在这个case中多个线程并发的情况下,实时获取排队数量,执行中数量也是个很难测试的部分。

 

下面是测试代码:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Coralcode.Framework.Task;
using Coralcode.Framework.Utils;
using iTextSharp.text;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace FrameworkTest.Task
{
    [TestClass]
    public class TaskQueueTest
    {
        [TestMethod]
        public void TwoTimesExcuteWithFreeBetweenTwoTimsTest()
        {
            var wantExecuteTaskCount = 10;
            var maxCanExcuteTaskCount = 1;
            var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount);
            var addList = new List<int>();
            var executedList = new List<int>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                addList.Add(i);
            }
            /*
             * 测试原理:
             * 添加一批任务,等待执行完成以后在执行一次
             * 第二次也能执行完毕,无报错的话认为,
             * 队列在执行完毕后自动暂停,等有新任务进来的时候可以重新启动
             */
            addList.ForEach(index =>
            {
                var list = executedList;
                queue.Execute(() =>
                {
                    var item = index / maxCanExcuteTaskCount;
                    list.Add(item);
                    return true;
                });
            });
            while (addList.Count != executedList.Count)
            {
                Thread.Sleep(1000);

            }
            Assert.AreEqual(queue.WaitingTaskCount, 0);
            executedList.Clear();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                addList.Add(i);
            }
            addList.ForEach(index =>
            {
                var list = executedList;
                queue.Execute(() =>
                {
                    var item = index / maxCanExcuteTaskCount;
                    list.Add(item);
                    return true;
                });
            });
            while (addList.Count != executedList.Count)
            {
                Thread.Sleep(1000);
            }
            Assert.AreEqual(queue.WaitingTaskCount, 0);
        }




        [TestMethod]
        public void ExecuteAsAddSequenceTest()
        {
            var wantExecuteTaskCount = 10;
            var maxCanExcuteTaskCount = 1;
            var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount);
            var addList = new List<int>();
            var executedList = new List<int>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                addList.Add(i);
            }


            /*
             * 测试原理:
             * 按需添加,然后记录执行,结果也必须是顺序的
             * 如果一次执行多条,那么多条被认为同一个批次
             * 同一个批次的顺序应该是一致的
             */
            addList.ForEach(index =>
            {
                var list = executedList;
                queue.Execute(() =>
                {
                    var item = index / maxCanExcuteTaskCount;
                    list.Add(item);
                    return true;
                });
            });

            while (addList.Count != executedList.Count)
            {
                Thread.Sleep(1000);

            }
            addList = addList.Select(item => item / maxCanExcuteTaskCount).Distinct().ToList();
            executedList = executedList.Distinct().ToList();
            for (int i = 0; i < addList.Count; i++)
            {
                Assert.AreEqual(addList[i], executedList[i]);
            }
        }



        [TestMethod]
        public void WaitCountTest()
        {
            var wantExecuteTaskCount = 10;
            var maxCanExecuteTaskCount = 2;
            var queue = TaskQueue.GetQueue(maxCanExecuteTaskCount);
            var executedOrExecutingCount = 0;

            /*
             * 测试原理:
             * 记录执行了的数量
             * 队列中等待的数量 =最开始放入的数量-队列运行执行的最大线程数-已经执行的数量
             * 当所有的线程都在被执行时,队列中等待数量为0,但是还未执行完的话,直接用0来判断
             */
            var taskList = new List<System.Threading.Tasks.Task>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                var task = queue.Execute(() =>
                {
                    Thread.Sleep(2000);
                    Debug.WriteLine("已执行的数量" + executedOrExecutingCount);
                    Interlocked.Increment(ref executedOrExecutingCount);
                });
                taskList.Add(task);
            }
            // ReSharper disable once LoopVariableIsNeverChangedInsideLoop
            while (executedOrExecutingCount < wantExecuteTaskCount)
            {
                Thread.Sleep(700);
                var queueWaitCount = queue.WaitingTaskCount;
                var wantWaitCount = wantExecuteTaskCount - maxCanExecuteTaskCount - executedOrExecutingCount;
                Debug.WriteLine("************");
                Debug.WriteLine("队列等待执行的数量" + queueWaitCount);
                Debug.WriteLine("队列期望等待执行的数量" + wantWaitCount);
                Debug.WriteLine("************");
                if (wantWaitCount < 0)
                    Assert.AreEqual(queueWaitCount, 0);
                else
                    Assert.AreEqual(queueWaitCount, wantWaitCount);
            }
        }

        [TestMethod]
        public void WaitTest()
        {

            //System.Threading.Tasks.Task.Factory.StartNew(() =>
            //{
            //    try
            //    {
            //        var str = HttpUtil.Get<string>("http://172.16.2.3:10004",
            //        "api/Acl/PermissionAuthorityApi/GetPermissionCountByApp?app=1");
            //    }
            //    catch (Exception e)
            //    {
            //        Console.WriteLine(e);
            //        throw;
            //    }

            //});
            var wantExecuteTaskCount = 10;
            var maxCanExcuteTaskCount = 1;
            var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount);
            var addList = new List<int>();
            var executedList = new List<int>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                addList.Add(i);
            }

            var tasks = new List<System.Threading.Tasks.Task>();
            /*
             * 测试原理:
             * 执行异步请求看最后是否成功执行
             */
            addList.ForEach(index =>
            {
                var list = executedList;
                var task = queue.Execute(() =>
                {
                    try
                    {
                        var str = HttpUtil.Get<string>("http://172.16.2.3:10004",
                            "api/Acl/PermissionAuthorityApi/GetPermissionCountByApp?app=1");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                    }
                    var item = index / maxCanExcuteTaskCount;
                    list.Add(item);
                });
                tasks.Add(task);
            });
            System.Threading.Tasks.Task.WaitAll(tasks.ToArray());

            //System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
            addList = addList.Select(item => item / maxCanExcuteTaskCount).Distinct().ToList();
            executedList = executedList.Distinct().ToList();
            for (int i = 0; i < addList.Count; i++)
            {
                Assert.AreEqual(addList[i], executedList[i]);
            }
        }

        [TestMethod]
        public void ExceptionTestTest()
        {
            var wantExecuteTaskCount = 10;
            var maxCanExecuteTaskCount = 2;
            var queue = TaskQueue.GetQueue(maxCanExecuteTaskCount);
            var executedOrExecutingCount = 0;

            /*
             * 发现部分情况下队列里面的异常检测不到
             * 这里做了测试
             */

            queue.Execute(() =>
            {
                try
                {
                    throw new Exception("测试异常");
                }
                catch (Exception ex)
                {
                    Assert.IsNotNull(ex);
                }
            });

        }


        [TestMethod]
        public void GetCurrentTaskIndexTeset()
        {
            var wantExecuteTaskCount = 10;
            var maxCount = 10;
            var waitSeconds = 1000;
            var queue = TaskQueue.GetQueue(maxCount);

            List< System.Threading.Tasks.Task> executeingTasks = new List<System.Threading.Tasks.Task>();
            /*
             * 测试方法:
             * 假设并发为10,那么首先塞进去10个线程执行,
             * 那么后面的线程都在等待,
             * 在放进去n个,则后面的正在排队,返回的位置和放进去的位置一致
             * 而之前放进去的10个都在执行,返回-1
             */
            for (int i = 0; i < maxCount; i++)
            {
               var task= queue.Execute(() =>
                {
                    try
                    {
                        Thread.Sleep(waitSeconds * 1000);
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                    }
                });
                executeingTasks.Add(task);
            }
            Dictionary<int,System.Threading.Tasks.Task> tasks = new Dictionary<int, System.Threading.Tasks.Task>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                var task = queue.Execute(() =>
                {
                    try
                    {
                        Thread.Sleep(waitSeconds*1000);
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                    }
                });
                tasks.Add(i,task);
            }
            foreach (var task in executeingTasks)
            {
                Assert.AreEqual(queue.GetCurrentTaskIndex(task.Id), -1);
            }
            foreach (var task in tasks)
            {
                Assert.AreEqual(queue.GetCurrentTaskIndex(task.Value.Id),task.Key);
            }

        }

    }
}

  


具体的代码中都有每个测试用例的设计思考。

 


注意事项

注意事项就是我在这里踩过的坑

Session丢失


在web应用中session的存储实际以线程为基础,类似ThreadLocal实现的线程隔离和静态使用。
这里再执行之前需要预先取出来要使用的对象然后再在action/func中使用

DbContext


1. 在ef中如果获取一个dbcontext在另外一个线程中保存就会报错。
2. 如果一个从ef取出来的对象从一个县城传递到另外一个县城,修改提交就会报错。
3. 如果主线程和队列中线程同时操作例如一个读一个写,或者同时写,此时也会报错


Ioc的问题

 

如果ioc采用prethread或者preresolve这两种方式来管理生命周期,理论上在http请求结束的时候都要对线程中使用的对象进行释放,

因为队列为异步那么当请求结束时候线程却还在执行,此时就会出现空指针的问题,或者ef中对象已经释放。


最佳实践

        /// <summary>
        /// 调用示例
        /// 
        /// </summary>
        /// <typeparam name="TService"></typeparam>
        public  static void AsyncExecuteServiceAction<TService>(this TService service,string actionName,params  object[] parameters)
            where TService:CoralService
        {
            TaskQueue.Instance.Execute(() =>
            {
                using (var newservice = UnityService.Resolve<TService>())
                {
                    newservice.InitContext(service.AppContext,service.UserContext,service.SessionContext,service.PageContext);
                    newservice.ExecuteMethod<object>(actionName, parameters);
                }
            });
        }

  

1. service.AppContext,service.UserContext,service.SessionContext,service.PageContext 这是四种不同级别的缓存,理解为session
2. using 的目的就是为了释放对象
3. 反射参数的方式只是为了方便调用

 


总结
1. 代码不难,直接拷贝就可以使用,单元测试也是一样
2. TPL我觉得是在.net技术中很好用的一部分,需要熟悉
3. 线程中有很多对象,锁等问题,这个要随着经验不断的挖坑填坑来增长经验


展望

1. 前面说过两种线程计划,我这里放入prethread是在某个工具中有压力测试的部分,需要以最快的速度来创建线程,所以没有采用复用的方式,具体后面有一个接口测试工具的系列文章来介绍。
2. 这里说的线程队列,和前面动态类型序列化是后面一个作业调度系统的基础,这部分还在设计和实现中,预计月底可以在文章中和大家见面。
3. 其实上一篇和这一篇都是在为作业系统做铺垫。
4. 下一篇将介绍一个定时执行任务的设计。
5. 虽然我一直觉得设计是最重要的,实现其次,但是要落地还是要依赖于实现,所以这些基本的组件和帮助类还是需要的。

 

以上是关于线程队列的主要内容,如果未能解决你的问题,请参考以下文章

# Java 常用代码片段

Android UI 线程消息队列调度顺序

C#多线程处理多个队列数据的方法

图解为什么要使用线程池?

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

来自dispatch_async全局崩溃的C函数调用,但在主队列上工作