c#之Redis队列在邮件提醒中的应用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c#之Redis队列在邮件提醒中的应用相关的知识,希望对你有一定的参考价值。

场景

有这样一个场景,一个邮件提醒的windows服务,获取所有开启邮件提醒的用户,循环获取这些用户的邮件,发送一条服务号消息。但问题来了,用户比较少的情况下,轮询一遍时间还能忍受,如果用户多了,那用户名称排序靠后的人,收到邮件提醒的消息,延迟时间就非常长了。

准备

c#之Redis实践list,hashtable

c#之Redis队列

方案

1、生产者线程一获取所有开启邮件提醒的用户。

2、根据配置来决定使用多少个队列,以及每个队列的容量。

3、线程一,获取未满的队列,将当前用户入队。如果所有的队列已满,则挂起2s,然后重新获取未满的队列,用户入队。

4、根据配置开启消费者线程,每个线程独立处理逻辑。如果获取的用户为空或者当前队列为空,挂起2s。否则通过EWS服务拉取该用户的邮件,并提醒。

5、如果在获取用户邮件的过程中出错,则将该用户重新入当前队列,等待下次拉取。

测试

技术分享

队列

技术分享

测试代码

    /// <summary>
    /// 消息队列管理
    /// </summary>
    public class MyRedisQueueBus : IDisposable
    {
        /// <summary>
        /// 线程个数
        /// </summary>
        private int _threadCount;
        /// <summary>
        /// 每个线程中itcode的容量
        /// </summary>
        private int _threadCapacity;
        /// <summary>
        /// 线程
        /// </summary>
        private Thread[] _threads;
        /// <summary>
        /// 生产者线程
        /// </summary>
        private Thread _producerThread;
        /// <summary>
        /// 挂起时间
        /// </summary>
        private const int WAITSECONDE = 2000;
        /// <summary>
        /// 队列名称前缀
        /// </summary>
        private string _queuePrefix;
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="threadCount">线程个数</param>
        /// <param name="threadCapacity">每个线程处理的队列容量</param>
        ///  <param name="queuePrefix">每个线程处理的队列容量</param>
        public MyRedisQueueBus(int threadCount, int threadCapacity, string queuePrefix)
        {
            this._threadCapacity = threadCapacity;
            this._threadCount = threadCount;
            this._queuePrefix = queuePrefix + "_{0}";
        }
        /// <summary>
        /// 开启生产者
        /// </summary>
        public void StartProducer()
        {
            _producerThread = new Thread(() =>
            {
                IRedisClientFactory factory = RedisClientFactory.Instance;
                EmailAlertsData emailAlertsData = new EmailAlertsData();
                //白名单
                string[] userIdsWhiteArray = TaskGloableParameter.WhiteList.Split(new char[] { ,,  },
StringSplitOptions.RemoveEmptyEntries);
while (true) { //获取所有开启邮件提醒的用户 List<EmailAlerts> lstEmails = emailAlertsData.GetAllStartAlerts(userIdsWhiteArray); //入队 using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort)) { client.Password = WebConfig.RedisPwd; client.Db = WebConfig.RedisServerDb; foreach (var item in lstEmails) { int queueIndex = -1; string queueName = string.Format(this._queuePrefix, queueIndex); for (int i = 0; i < _threadCount; i++) { queueName = string.Format(this._queuePrefix, i); //如果当前队列没有填满,则直接跳出,使用该队列进行入队 if (client.GetListCount(queueName) < _threadCapacity) { queueIndex = i; break; } } //如果所有队列都已经满了,则挂起2s等待消费者消耗一部分数据,然后重新开始 if (queueIndex == -1) { Thread.SpinWait(WAITSECONDE); //重新获取队列 for (int i = 0; i < _threadCount; i++) { queueName = string.Format(this._queuePrefix, i); //如果当前队列没有填满,则直接跳出,使用该队列进行入队 if (client.GetListCount(queueName) < _threadCapacity) { queueIndex = i; break; } } } else { //入队 client.EnqueueItemOnList(queueName, JsonConvert.SerializeObject(new MyQueueItem { UserId = item.itcode, SyncState = item.Email_SyncState })); } } } } }); _producerThread.Start(); } /// <summary> /// 开启消费者 /// </summary> public void StartCustomer() { _threads = new Thread[_threadCount]; for (int i = 0; i < _threads.Length; i++) { _threads[i] = new Thread(CustomerRun); _threads[i].Start(i); } } private void CustomerRun(object obj) { int threadIndex = Convert.ToInt32(obj); string queueName = string.Format(this._queuePrefix, threadIndex); IRedisClientFactory factory = RedisClientFactory.Instance; while (true) { using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort)) { client.Password = WebConfig.RedisPwd; client.Db = WebConfig.RedisServerDb; if (client.GetListCount(queueName) > 0) { string resultJson = client.DequeueItemFromList(queueName); //如果获取的结果为空,则挂起2s if (string.IsNullOrEmpty(resultJson)) { Thread.SpinWait(WAITSECONDE); } else { try { //耗时业务处理 MyQueueItem item = JsonConvert.DeserializeObject<MyQueueItem>(resultJson); Console.WriteLine("Threadid:{0},User:{1}", Thread.CurrentThread.ManagedThreadId.ToString(), item.UserId); } catch (Exception ex) { //如果出错,重新入队 client.EnqueueItemOnList(queueName, resultJson); } } } else { //当前队列为空,挂起2s Thread.SpinWait(WAITSECONDE); } } } } public void Dispose() { //释放资源时,销毁线程 if (this._threads != null) { for (int i = 0; i < this._threads.Length; i++) { this._threads[i].Abort(); } } GC.Collect(); } }

Main方法调用

        static void Main(string[] args)
        {         
            MyRedisQueueBus bus = new MyRedisQueueBus(10, 10, "mail_reminder_queue");
            bus.StartProducer();
            Thread.SpinWait(2000);
            bus.StartCustomer();
            Console.Read();
        }

总结

通过配置的方式,确定开启的队列数和线程数,如果用户增加可以增加线程数,或者添加机器的方式解决。这样,可以解决排名靠后的用户,通过随机分发队列,有机会提前获取邮件提醒,可以缩短邮件提醒的延迟时间。当然,这种方案并不太完美,目前也只能想到这里了。这里把这个思路写出来,也是希望获取一个更好的解决方案。

以上是关于c#之Redis队列在邮件提醒中的应用的主要内容,如果未能解决你的问题,请参考以下文章

C#和ASP.NET通过Gmail账户发送邮件的代码

redis消息队列简单应用

EWS 邮件提醒

到点提醒功能如何实现简单,不如试试Redis

C#使用RabbitMQ

C#使用RabbitMQ