设计一个对象池(Anno.XObjectPool)

Posted 杜燕明

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了设计一个对象池(Anno.XObjectPool)相关的知识,希望对你有一定的参考价值。

设计一个.net对象池  

     对象池对于创建开销比较大的对象来说很有意义,为了优化程序的运行速度、避免频繁创建销毁开销比较大的对象,我们可以通过对象池来复用创建开销大的对象。对象池的思路比较简单,事先创建好一批对象,放到一个集合中,以后每当程序需要新的对象时候,都从对象池里获取,每当程序用完该对象后,都把该对象归还给对象池。这样会避免重复的对象创建,提高程序性能。 

应用场景 

  在Anno微服务框架中的使用,由于客户端调用微服的时候需要建立Socket连接,频繁的创建和销毁连接会有很大的开销。所以我们设想我们如果可以重复利用这些对象那么性能上也会是很大的提升。这时候我们就需要一个对象池来存放这些可以重复利用的对象。不仅如此我们还需要可以控制对象池的最大存活对象数量、最小闲置对象数量、最大闲置对象数量、如何创建对象、销毁对象、定期清理闲置对象。(网上没找到好用的于是开始造我们的轮子)

Install-Package Anno.XObjectPool -Version 1.0.3.4

Xpool核心代码

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Anno.XObjectPool
{
    public class XPool<T> : IDisposable
    {
        private bool disposed;
        private XPoolConfiguration xcfg;
        /// <summary>
        /// 初始化对象池
        /// </summary>
        /// <param name="createObject">创建XObject对象</param>
        /// <param name="activeXObject"> 获取XObject对象之前验证True 有效</param>
        public XPool(Func<T> createObject, Func<T, bool> activeXObject = null) : this(new XPoolConfiguration()
        {
            MaxActive = 1000,
            MaxIdle = 400,
            MinIdle = 10
        }, createObject, activeXObject)
        {

        }
        /// <summary>
        /// 初始化对象池
        /// </summary>
        /// <param name="maxActive">最大活动数量</param>
        /// <param name="minIdle">最小空闲数量</param>
        /// <param name="maxIdle">最大空闲数量</param>
        /// <param name="createObject">创建XObject对象</param>
        /// <param name="activeXObject"> 获取XObject对象之前验证True 有效</param>
        public XPool(int maxActive, int minIdle, int maxIdle, Func<T> createObject, Func<T, bool> activeXObject = null)
        {
            xcfg = new XPoolConfiguration()
            {
                MaxActive = maxActive,
                MaxIdle = maxIdle,
                MinIdle = minIdle
            };
            pools = new ConcurrentStack<XObject<T>>();
            ResetEvent = new AutoResetEvent(false);
            if (createObject != null)
            {
                CreateXObject = createObject;
            }
            else
            {
                throw new ArgumentNullException("createObject 不能为空");
            }
            if (activeXObject != null)
            {
                ActiveXObject = activeXObject;
            }
            Parallel.For(0, minIdle, x =>
            {
                pools.Push(new XObject<T>()
                {
                    Value = CreateXObject.Invoke(),
                    LastDateTime = DateTime.Now,
                    Pool = this
                });
            });
            StartTaskClearLongIdleXObject();
        }
        /// <summary>
        /// 初始化对象池
        /// </summary>
        /// <param name="xcfg">对象池配置</param>
        /// <param name="createObject">创建XObject对象</param>
        /// <param name="activeXObject"> 获取XObject对象之前验证True 有效</param>
        public XPool(XPoolConfiguration xcfg, Func<T> createObject, Func<T, bool> activeXObject = null) : this(xcfg.MaxActive, xcfg.MinIdle, xcfg.MaxIdle, createObject, activeXObject)
        {

        }
        private ConcurrentStack<XObject<T>> pools;
        private int _activedTransportCount = 0;
        private AutoResetEvent ResetEvent { get; set; }
        /// <summary>
        /// 活动链接数量
        /// </summary>
        public int ActivedTransportCount => _activedTransportCount;

        /// <summary>
        /// 原子性增加 活动链接数量
        /// </summary>
        private void InterlockedIncrement()
        {
            Interlocked.Increment(ref _activedTransportCount);
        }
        /// <summary>
        /// 原子性减少 活动链接数量
        /// </summary>
        private void InterlockedDecrement()
        {
            Interlocked.Decrement(ref _activedTransportCount);
        }

        public XObject<T> Borrow(TimeSpan? timeout = null)
        {
            if (!pools.TryPop(out XObject<T> xobj))
            {
                if (pools.Count < xcfg.MinIdle && _activedTransportCount < xcfg.MaxActive)
                {
                    pools.Push(new XObject<T>()
                    {
                        Value = CreateXObject.Invoke(),
                        LastDateTime = DateTime.Now,
                        Pool = this
                    });
                }
                if (!pools.Any() && _activedTransportCount >= xcfg.MaxActive)
                {
                    int millisecondsTimeout = 20000;
                    if (timeout.HasValue && timeout.Value.TotalMilliseconds > 0)
                    {
                        millisecondsTimeout = (int)timeout.Value.TotalMilliseconds;
                    }
                    bool result = ResetEvent.WaitOne(millisecondsTimeout);
                    if (!result)
                    {
                        throw new TimeoutException($"Timeout对象池等待超时!");
                    }
                }
                if (!pools.TryPop(out xobj))
                {
                    xobj = new XObject<T>()
                    {
                        Value = CreateXObject.Invoke(),
                        LastDateTime = DateTime.Now,
                        Pool = this
                    };
                }
            }
            InterlockedIncrement();
            //借出之前判断对象是否有效
            if (!ActiveXObject(xobj.Value))
            {
                throw new InvalidOperationException("对象无效,请在有效性检测函数activeXObject中设置有效性");
            }
            return xobj;
        }
        public void Return(XObject<T> xObject, bool isDispose = false)
        {
            if (xObject == null)
            {
                throw new ArgumentNullException("xObject 不能为空!");
            }
            /*
             * 主动释放的释放
             * 超出最大闲置数量的释放
             * 无效的释放
             */
            if (isDispose || _activedTransportCount > xcfg.MaxIdle || !ActiveXObject(xObject.Value))
            {
                DisposeXObject(xObject);
                xObject.Pool = null;
                InterlockedDecrement();
                return;
            }
            xObject.LastDateTime = DateTime.Now;
            pools.Push(xObject);
            InterlockedDecrement();
            ResetEvent.Set();
        }


        private void Dispose(bool disposing)
        {
            if (!disposed)
            {
                if (disposing)
                {
                    try
                    {
                        while (pools.TryPop(out XObject<T> xobj))
                        {
                            //Pool 释放的时候XObject不再归还到Pool
                            DisposeXObject(xobj);
                            xobj.Pool = null;
                        }
                    }
                    catch (Exception)
                    {

                    }
                }
                disposed = true;
            }
        }
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// 创建XObject对象
        /// </summary>
        public Func<T> CreateXObject { get; set; } = () => { return default(T); };
        /// <summary>
        /// 获取XObject对象之前验证True 有效
        /// </summary>
        public Func<T, bool> ActiveXObject { get; set; } = x => { return true; };
        /// <summary>
        /// 释放XObject时候触发
        /// </summary>
        public Action<XObject<T>> DisposeXObject { get; set; } = x => { };
        /// <summary>
        /// 移除长度为count的元素
        /// </summary>
        /// <param name="count">除元素的长度count</param>
        private void DisposeLongIdleXObject(int count)
        {
            int startIndex = pools.Count - count;
            XObject<T>[] popXObjects = new XObject<T>[count];
            pools.TryPopRange(popXObjects, 0, count);
            for (int i = 0; i < popXObjects.Length; i++)
            {
                Return(popXObjects[i], true);
            }
        }
        /// <summary>
        /// 每隔10秒检测一次清理30秒未使用的对象数量的对象
        /// (如果存在对象30未使用,说明对象池有对象长时间闲置未使用)则从头部弹出一定数量的对象释放掉
        /// </summary>
        private void StartTaskClearLongIdleXObject()
        {
            Task.Factory.StartNew(async () =>
            {
                while (!disposed)
                {
                    await Task.Delay(10000);
                    try
                    {
                        var removeCount = 0;
                        var now = DateTime.Now.AddSeconds(-30);
                        var _pools = pools.ToList();
                        for (int i = _pools.Count - 1; i >= xcfg.MinIdle; i--)
                        {
                            if (_pools[i].LastDateTime < now)
                            {
                                removeCount++;
                            }
                        }
                        if (removeCount > 0 && removeCount <= (pools.Count - xcfg.MinIdle))
                        {
                            DisposeLongIdleXObject(removeCount);
                        }
                    }
                    finally { }
                }
            }, TaskCreationOptions.LongRunning);
        }
    }
}
View Code

初始化一个对象池

  最大活动对象数量 50个,最小闲置对象数量2个,最大闲置数量20个。

var UserPool = new XPool<User>(50, 2, 20, () =>
             {
                 int age = Interlocked.Increment(ref _activedTransportCount);
                 return new User()
                 {
                     Age = age,
                     Name = $"Name{age}"

                 };
             });

并行调用

  200个并行调用

  Parallel.For(0, 200, x =>
  {
                using (var user = UserPool.Borrow())
                {
                    Console.WriteLine($"Age:{user.Value.Age},Name:{user.Value.Name}");//,Msg:{user.Value.Msg}
                }
   });

 结果:

  从上图我们看到在200个并行过程中,只有4个对象被使用。因此可以看出我们没有频繁的创建对象。

 

欢迎加入QQ群:478399354 ,到这里我们互为师长项目学习。

Anno开源地址:

AnnoGitHub源码:https://github.com/duyanming/Anno.Core  

AnnoGitee源码:https://gitee.com/dotnetchina/anno.core 

Viper示例项目:https://github.com/duyanming/Viper  

体验地址:http://140.143.207.244/Home/Login

文档地址:https://duyanming.github.io/

关于Anno的更多内容,随后更新。敬请关注。开源不易,感谢Star。

以上是关于设计一个对象池(Anno.XObjectPool)的主要内容,如果未能解决你的问题,请参考以下文章

设计模式之对象池模式

有趣的设计模式之对象池模型

对象池设计模式的 Python 实现

设计高并发内存池申请流程

设计高并发内存池申请流程

ObjectPool 对象池设计模式