BeetleX.FastHttpApi之控制器调度设计
Posted dotNET跨平台
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了BeetleX.FastHttpApi之控制器调度设计相关的知识,希望对你有一定的参考价值。
为了可以更灵活地在Webapi应用服务中分配线程资源,BeetleX.FastHttpApi在线程调度上直接细化到Action级别;组件不仅可以精准控制每个Action的最大RPS限制,还能精细到控制使用多少线程资源来处理这些API的请求。接下来详细讲解组件针对这一块的实现结构和代码。
需求
为什么要做到这么精细的控制呢?如果有足够资源那是不用考虑这方面的问题;但实际应用中资源不足是经常需要面对的问题。在整个服务中往往有些API非常占用资源,这个时候就希望通过简单配置来控制API使用的线程数达到一个理想的资源分配结果。在控制上最直接的办法是控制对应的RPS数量,但有时候希望以线程资源的方式来分配。
使用
组件可以在控制器的Action上根据需求标记对应的限制属性
//每秒最大处理数100,超过就拒绝
[RequestMaxRPS(100)]
public object MasRps(IHttpContext context)
{
return DateTime.Now;
}
//不限制,由框架通过线程池调度
public object None(string name, IHttpContext context)
{
return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";
}
//所有请求用一个线程有序处理
[ThreadQueue(ThreadQueueType.Single)]
public object SingleQueue(string name, IHttpContext context)
{
return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";
}
//所有请求分配两个线程有序处理
[ThreadQueue(ThreadQueueType.Multiple, 2)]
public object MultipleQueue(string name, IHttpContext context)
{
return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";
}
//根据Name的值一致线程处理,同一值会分配到一个线程中有序处理
[ThreadQueue("name")]
public object UniqueQueue(string name, IHttpContext context)
{
return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";
}
设计实现
接下来看一下BeetleX.FastHttpApi组件代码是如何进行工作的。由于需要线程控制,那自然就需要一个队列;组件提供一个NextQueue的队列来完成这方面的工作,每个NextQueue会分配一个线程来处理。
public class NextQueue : IDisposable
{
public NextQueue()
{
mQueue = new System.Collections.Concurrent.ConcurrentQueue<IEventWork>();
ID = System.Threading.Interlocked.Increment(ref mID);
}
public long ID { get; set; }
private static long mID;
private readonly object _workSync = new object();
private bool _doingWork;
private int mCount;
private System.Collections.Concurrent.ConcurrentQueue<IEventWork> mQueue;
public int Count => mCount;
//添加任务到队列中
public void Enqueue(IEventWork item)
{
mQueue.Enqueue(item);
System.Threading.Interlocked.Increment(ref mCount);
lock (_workSync)
{
//当前队列是否工作中
if (!_doingWork)
{
//获取一个线程进行工作
System.Threading.ThreadPool.QueueUserWorkItem(OnStart);
_doingWork = true;
}
}
}
private void OnError(Exception e, IEventWork work)
{
try
{
Error?.Invoke(e, work);
}
catch
{
}
}
public static Action<Exception, IEventWork> Error { get; set; }
private async void OnStart(object state)
{
while (true)
{
//获取队列任务并执行
while (mQueue.TryDequeue(out IEventWork item))
{
System.Threading.Interlocked.Decrement(ref mCount);
using (item)
{
try
{
//等待任务执行
await item.Execute();
}
catch (Exception e_)
{
OnError(e_, item);
}
}
}
lock (_workSync)
{
//队列为空跑出线程
if (mQueue.IsEmpty)
{
try
{
Unused?.Invoke();
}
catch { }
_doingWork = false;
return;
}
}
}
}
public Action Unused { get; set; }
public void Dispose()
{
while (mQueue.TryDequeue(out IEventWork work))
{
try
{
work.Dispose();
}
catch
{
}
}
}
}
NextQueue是一个支持异步任务的处理队列,它确保添加进来的任务都是有序执行,即使任务内部处理的任务是异步。
ActionContext
该对象是用于执行控制器方法,包括webapi控制器和Websocket控制器。在这里只讲述控制怎样调度执行的,更详细了解可以查看
https://github.com/beetlex-io/FastHttpApi/blob/master/src/ActionContext.cs
主要讲解一下Execute方法是怎样调用控制器方法的
internal async Task Execute(IActionResultHandler resultHandler)
{
//验证RPS
if (Handler.ValidateRPS())
{
Handler.IncrementRequest();
//是否存在队列控制配置
if (Handler.ThreadQueue == null || Handler.ThreadQueue.Type == ThreadQueueType.None)
{
if (Handler.Async)//异步方法
{
await OnAsyncExecute(resultHandler);
}
else
{
//同步方法
OnExecute(resultHandler);
}
}
else
{
//配置了队列控制r妊
ActionTask actionTask = new ActionTask(this, resultHandler,new TaskCompletionSource<object>());
//获取异步队列
var queue = Handler.ThreadQueue.GetQueue(this.HttpContext);
//阶列是否有效,为了安全队列都有最大等待数限制,超过就拒绝处理
if (Handler.ThreadQueue.Enabled(queue))
{
this.HttpContext.Queue = queue;
//把当前任务插入队列
queue.Enqueue(actionTask);
//等待队执行结果通知
await actionTask.CompletionSource.Task;
}
else
{
Handler.IncrementError();
resultHandler.Error(new Exception($"{Handler.SourceUrl} process error,out of queue limit!"), EventArgs.LogType.Warring, 500);
}
}
}
else
{
Handler.IncrementError();
resultHandler.Error(new Exception($"{Handler.SourceUrl} process error,out of max rps!"), EventArgs.LogType.Warring, 509);
}
}
GetQueue
应该方法根据当前请示信息和配置来获取对应的异步队列
public NextQueue GetQueue(IHttpContext context)
{
//单队执行,永远返回针对当前控制器方法的第一个队列
if (Type == ThreadQueueType.Single)
return QueueGroup.Queues[0];
//轮循当前分配最大队列数
else if (Type == ThreadQueueType.Multiple)
return QueueGroup.Next();
//针对请求数据做一致性队列分配
else if (Type == ThreadQueueType.DataUnique)
{
string value = null;
if (UniqueName != null)
{
if (string.Compare(UniqueName, "$path", true) == 0)
{
value = context.Request.GetSourcePath();
}
else if(UniqueName.IndexOf("__")==0)
{
return mUniqueQueueGroup.Has(UniqueName.GetHashCode());
}
else
{
value = context.Request.Header[UniqueName];
if (value == null)
context.Data.TryGetString(UniqueName, out value);
}
}
if (value == null)
value = context.Request.GetSourceUrl();
return mUniqueQueueGroup.Has(value.GetHashCode());
}
//如果都没匹配到就获取轮循的下一个
return QueueGroup.Next();
}
ActionTask
方法异步任务对象,队列会有序地执行相关对象,这对象的实现非常简单。
struct ActionTask : IEventWork
{
public ActionTask(ActionContext context, IActionResultHandler resultHandler, TaskCompletionSource<object> completionSource)
{
Context = context;
ResultHandler = resultHandler;
CompletionSource = completionSource;
}
public TaskCompletionSource<object> CompletionSource { get; set; }
public ActionContext Context { get; set; }
public IActionResultHandler ResultHandler { get; set; }
public void Dispose()
{
}
public async Task Execute()
{
try
{
if (Context.Handler.Async)
{
//异步方法
await Context.OnAsyncExecute(ResultHandler);
}
else
{
//同步方法
Context.OnExecute(ResultHandler);
}
}
finally
{
//回调执行完成,让队列继续下一个任务。
CompletionSource?.TrySetResult(new object());
}
}
}
总结
到这里整个线程调度的核心就介绍完成了,如果不了解一些基础知识会感觉完成这些功能很复杂,其实都是一些基础功能的应用; 完成这些功能主要涉及几个基础知识分别是:队列,线程池和用于处理异步回调的TaskCompletionSource对象。
BeetleX
开源跨平台通讯框架(支持TLS)
提供高性能服务和大数据处理解决方案
https://beetlex.io
以上是关于BeetleX.FastHttpApi之控制器调度设计的主要内容,如果未能解决你的问题,请参考以下文章