Redis分布式队列和缓存更新

Posted Kaden

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis分布式队列和缓存更新相关的知识,希望对你有一定的参考价值。

  原文链接:https://www.cnblogs.com/hua66/p/9600085.html

  在使用Redis中,我们可能会遇到以下场景:

  例如:

  某用户向服务器中发送一个请求,服务器将用户请求加入Redis任务队列,任务完成则移出队列。

  以上场景有几点疑问:

  1. Redis队列中数据如果不仅仅来自于我们的应用程序,那么我们怎么把这个数据加入Redis?
  2. 当Redis队列中用户的请求达程序所能处理的峰值。那么我们该如何处理这些用户请求?

  解决方案:

  1.  对外提供接口,将请求数据添加至DB。启动一个定时服务,在规定时间扫描DB中的请求数据并添加至Redis队列。
  2. 使用分布式异步队列。

  以上解决方案都可以使用插件来实现。

  1. 使用Quartz.Net
  2. 使用StackExchange.Redis

  一、

  关于Quartz.Net可以通过上面链接获取官方API。它与SQL Server中的代理作业有着同样功能。

  代码示例:

  

 1  /// <summary>
 2         /// 添加Job并以周期的形式运行
 3         /// </summary>
 4         /// <typeparam name="T"></typeparam>
 5         /// <param name="jobName">job名称</param>
 6         /// <param name="jobGroupName">job组名称</param>
 7         /// <param name="replace">job是否可修改</param>
 8         /// <param name="triggerName">job触发器的名称</param>
 9         /// <param name="minutes">job执行的时间间隔,以分为单位</param>
10         /// <returns></returns>
11         public DateTimeOffset AddJob<T>(string jobName, string jobGroupName, bool replace, string triggerName, int minutes) where T : IJob
12         {
13             IJobDetail jobDetail = JobBuilder.Create<T>().WithIdentity(jobName, jobGroupName).Build();
14             _sched.AddJob(jobDetail, replace);
15             ITrigger trigger = TriggerBuilder.Create()
16               .WithIdentity(triggerName, jobGroupName)
17               .StartNow()
18               .WithSimpleSchedule(x => x
19                   .WithIntervalInMinutes(minutes)//表示分钟的时间间隔
20                   .RepeatForever())
21               .Build();
22             return _sched.ScheduleJob(jobDetail, trigger).Result;
23         }
View Code

  以上的代码中是基于Quartz封装一个添加了Job的方法。这个方法依赖于 “IJobDetail” 和 “ITrigger” 这两个对象。

  “IJobDetail” 表示Job的身份信息,“ITrigger” 则包含了Job执行信息,它表示Job该如何执行。

  以下为调用示例:

  

1 QuartzHelper quartzHelper = QuartzHelper.CreateInstance();
2                 quartzHelper.AddJob<TestJob>("testJob", "testJob_Group",false, "testJob_Trigger",1*10);
View Code

  上述实例中的 “TestJob” 实现Quartz.Net中的 "IJob" 接口。这个接口只有一个方法 “Execute”,并由Quartz.Net框架自行调用该方法。

  你可以在此方法中执行你的代码。并在添加该Job制定你的执行策略 “ITrigger” 对象。然后框架会根据你制定的策略进行调用。调用参数请参见上述封装。

 

  下面是向Redis队列插入数据的示例Job:

  

 1 public class TestJob : IJob
 2     {
 3         Task IJob.Execute(IJobExecutionContext context)
 4         {
 5             //JobDataMap dataMap = context.JobDetail.JobDataMap;
 6             Task task = Task.Run(
 7                   () =>
 8                   {
 9                       Console.WriteLine(string.Format("{0}开始执行!当前系统时间{1}", this.GetType().Name, DateTime.Now));
10                       try
11                       {
12                           string redisKey = this.GetType().Name;
13                           RedisHelper redisHelper = new RedisHelper();
14                           if (redisHelper.KeyExists(redisKey))
15                           {
16                               redisHelper.KeyDelete(redisKey);
17                           };
18 
19                           for (int i = 1; i <= 10; i++)
20                           {
21                               User user = new User()
22                               {
23                                   ID = i,
24                                   Name = "user" + DateTime.Now.Ticks +"_"+ i                                  
25                               };
26                               redisHelper.ListLeftPush<User>(redisKey, user);//模拟DB用户数据
27                           }
28                       }
29                       catch (Exception ex)
30                       {
31                           Console.WriteLine(string.Format("{0}任务出现异常,异常信息:{1}!当前系统时间{2}", this.GetType().Name, ex.Message, DateTime.Now));
32                       }
33                   }
34                   );
35             return task;
36         }
37     }
View Code

 

  上面的 “TestJob” 模拟了从DB加载用户请求数据至Redis队列。至此我们已经解决了上面的第一个问题。

 

  二、

  在.Net中Redis的插件不多。比较流行有 "ServiceStack.Redis" 和 "StackExchange.Redis" 。

  "ServiceStack.Redis" 为官方推出的插件。非开源插件,且普通版最高只支持 6000/S 读写。高级版是要收费的。为了后续扩展,这里我们采用 "StackExchange.Redis" 。

  关于StackExchange.Redis可以通过上面链接获取官方API,目前是开源的。

  在第一个问题中,已经通过定时Job的方式向Redis队列中填充数据。下面我们通过 "StackExchange.Redis" 获取Redis队列中的请求并处理这些请求。

 

  1.加载数据至Redis:

  

 1 using APP_Test.Job;
 2 using Common.Quartz;
 3 using Common.Redis.StackExchange;
 4 using Quartz;
 5 using System;
 6 using System.Collections.Generic;
 7 using System.Linq;
 8 using System.Text;
 9 using System.Threading.Tasks;
10 
11 namespace APP_Test
12 {
13     class Program
14     {
15         static void Main(string[] args)
16         {
17             {
18                 RedisHelper redisHelperA = new RedisHelper();
19                 RedisHelper redisHelperB = new RedisHelper();
20                 string stra = redisHelperA.StringGet("mykey");
21                 string strb = redisHelperB.StringGet("mykey");
22                 if (stra== strb)
23                 {
24                     Console.WriteLine(string.Format("***********{0}=={1}***********", stra, strb));
25                 }
26                 else
27                 {
28                     Console.WriteLine(string.Format("***********{0}!={1}***********", stra, strb));
29                 }
30             }
31 
32             {
33                 
34                 QuartzHelper quartzHelper = QuartzHelper.CreateInstance();
35                 quartzHelper.AddJob<TestJob>("testJob", "testJob_Group",false, "testJob_Trigger",1*10);//这里设置了以秒为单位
36             }
37 
38             Console.ReadKey();
39         }
40         
41     }
42 }
View Code

 

  

 

  可以看到上面代码执行的时间节点与我们所添加job中的 ”ITrigger “ 的触发策略完全一致。至此,我们第一步已得到验证。

 

  2.启动处理Redis队列中请求的程序。

  

 1 using APP_Test.Models;
 2 using Common.Redis.StackExchange;
 3 using System;
 4 using System.Collections.Generic;
 5 using System.Linq;
 6 using System.Text;
 7 using System.Threading.Tasks;
 8 
 9 namespace APP_RedisClientTest
10 {
11     class Program
12     {
13         static void Main(string[] args)
14         {
15             RedisHelper redisHelper = new RedisHelper();
16             string redisKey = "TestJob";
17             while (true)
18             {
19                 Action action = new Action(() =>
20                 {
21                     User user = redisHelper.ListLeftPop<User>(redisKey);//获取请求数据并移出队列
22                     if (user!=null)
23                     {
24                         Console.WriteLine(string.Format("*******{0}*******", user.Name));
25                     }                    
26                 }
27                     );
28                 action.EndInvoke(action.BeginInvoke(null, null));
29             }
30             Console.ReadKey();
31         }
32     }
33 }
View Code

 

  

  上面我启动3个客户端实例,他们一起处理Redis队列中的请求。每当Job向Redis队列中添加请求对象后就会立即被我们处理请求的程序获取并消费,每当一个请求被消费就会被移出Redis队列。并且遵循先入先出的准则。按照上述,如果出现主程序请求量过高情形,我们只需要启动多个处理请求的辅助程序即可缓解主程序的压力。

  至此上面的两个问题已得到验证。

 

  如下附个人基于 "Quartz.Net" 和 "StackExchange.Redis" 封装的帮助类

  

  1 using System;
  2 using System.Collections.Generic;
  3 
  4 namespace Common.Quartz
  5 {
  6     using global::Quartz;
  7     using global::Quartz.Impl;
  8     using global::Quartz.Impl.Matchers;
  9 
 10     /// <summary>
 11     /// V:3.0.6.0
 12     /// </summary>
 13     public class QuartzHelper
 14     {
 15         private readonly static object _obj = new object();//单例锁
 16 
 17         //private  ISchedulerFactory _sf = null;
 18 
 19         private static IScheduler _sched = null;
 20         /// <summary>
 21         /// 提供IScheduler对象,访问异步方法
 22         /// </summary>
 23         public IScheduler Scheduler { get { return _sched; } }
 24 
 25         private static QuartzHelper _quartzHelper = null;//单例对象
 26 
 27         private QuartzHelper()
 28         {
 29             //_sf = new StdSchedulerFactory();
 30             //_sched = _sf.GetScheduler().Result;
 31             _sched = StdSchedulerFactory.GetDefaultScheduler().Result;
 32             _sched.Start();
 33         }
 34 
 35         /// <summary>
 36         /// 获取单例对象
 37         /// </summary>
 38         /// <returns></returns>
 39         public static QuartzHelper CreateInstance()
 40         {
 41             if (_quartzHelper == null) //双if +lock
 42             {
 43                 lock (_obj)
 44                 {
 45                     if (_quartzHelper == null)
 46                     {
 47                         _quartzHelper = new QuartzHelper();
 48                     }
 49                 }
 50             }
 51             return _quartzHelper;
 52         }
 53         public bool CheckExists(TriggerKey triggerKey)
 54         {
 55             return _sched.CheckExists(triggerKey).Result;
 56         }
 57         public bool CheckExists(JobKey jobKey)
 58         {
 59             return _sched.CheckExists(jobKey).Result;
 60         }
 61         public IReadOnlyCollection<IJobExecutionContext> GetCurrentlyExecutingJobs()
 62         {
 63             return _sched.GetCurrentlyExecutingJobs().Result;
 64         }
 65 
 66         /// <summary>
 67         /// 添加Job并以周期的形式运行
 68         /// </summary>
 69         /// <typeparam name="T"></typeparam>
 70         /// <param name="jobName">job名称</param>
 71         /// <param name="jobGroupName">job组名称</param>
 72         /// <param name="replace">job是否可修改</param>
 73         /// <param name="triggerName">job触发器的名称</param>
 74         /// <param name="minutes">job执行的时间间隔,以分为单位</param>
 75         /// <returns></returns>
 76         public DateTimeOffset AddJob<T>(string jobName, string jobGroupName, bool replace, string triggerName, int minutes) where T : IJob
 77         {
 78             IJobDetail jobDetail = JobBuilder.Create<T>().WithIdentity(jobName, jobGroupName).Build();
 79             _sched.AddJob(jobDetail, replace);
 80             ITrigger trigger = TriggerBuilder.Create()
 81               .WithIdentity(triggerName, jobGroupName)
 82               .StartNow()
 83               .WithSimpleSchedule(x => x
 84                   .WithIntervalInSeconds(minutes)//seconds表示秒的时间间隔
 85                                                  //.WithIntervalInMinutes(minutes)//表示分钟的时间间隔
 86                   .RepeatForever())
 87               .Build();
 88             return _sched.ScheduleJob(jobDetail, trigger).Result;
 89         }
 90         public bool DeleteJobs(IReadOnlyCollection<JobKey> jobKeys)
 91         {
 92             return _sched.DeleteJobs(jobKeys).Result;
 93         }
 94         public IJobDetail GetJobDetail(JobKey jobKey)
 95         {
 96             return _sched.GetJobDetail(jobKey).Result;
 97         }
 98         public IReadOnlyCollection<string> GetJobGroupNames()
 99         {
100             return _sched.GetJobGroupNames().Result;
101         }
102         public IReadOnlyCollection<JobKey> GetJobKeys(GroupMatcher<JobKey> matcher)
103         {
104             return _sched.GetJobKeys(matcher).Result;
105         }
106         public bool Interrupt(JobKey jobKey)
107         {
108             return _sched.Interrupt(jobKey).Result;
109         }
110         public bool IsJobGroupPaused(string groupName)
111         {
112             return _sched.IsJobGroupPaused(groupName).Result;
113         }
114         public ITrigger GetTrigger(TriggerKey triggerKey)
115         {
116             return _sched.GetTrigger(triggerKey).Result;
117         }
118         public IReadOnlyCollection<string> GetTriggerGroupNames()
119         {
120             return _sched.GetTriggerGroupNames().Result;
121         }
122         public IReadOnlyCollection<TriggerKey> GetTriggerKeys(GroupMatcher<TriggerKey> matcher)
123         {
124             return _sched.GetTriggerKeys(matcher).Result;
125         }
126         public IReadOnlyCollection<ITrigger> GetTriggersOfJob(JobKey jobKey)
127         {
128             return _sched.GetTriggersOfJob(jobKey).Result;
129         }
130         public TriggerState GetTriggerState(TriggerKey triggerKey)
131         {
132             return _sched.GetTriggerState(triggerKey).Result;
133         }
134         public IReadOnlyCollection<string> GetPausedTriggerGroups()
135         {
136             return _sched.GetPausedTriggerGroups().Result;
137         }
138         public bool Interrupt(string fireInstanceId)
139         {
140             return _sched.Interrupt(fireInstanceId).Result;
141         }
142         public bool IsTriggerGroupPaused(string groupName)
143         {
144             return _sched.IsTriggerGroupPaused(groupName).Result;
145         }
146         public void PauseAll()
147         {
148             _sched.PauseAll();
149         }
150         public void PauseJobs(GroupMatcher<JobKey> matcher)
151         {
152             _sched.PauseJobs(matcher);
153         }
154         public void PauseTriggers(GroupMatcher<TriggerKey> matcher)
155         {
156             _sched.PauseTriggers(matcher);
157         }
158         public void ResumeAll()
159         {
160             _sched.ResumeAll();
161         }
162         public void ResumeJobs(GroupMatcher<JobKey> matcher)
163         {
164             _sched.ResumeJobs(matcher);
165         }
166         public void ResumeTriggers(GroupMatcher<TriggerKey> matcher)
167         {
168             _sched.ResumeTriggers(matcher);
169         }
170         public void ScheduleJobs(IReadOnlyDictionary<IJobDetail, IReadOnlyCollection<ITrigger>> triggersAndJobs, bool replace)
171         {
172             _sched.ScheduleJobs(triggersAndJobs, replace);
173         }
174         public DateTimeOffset? RescheduleJob(TriggerKey triggerKey, ITrigger newTrigger)
175         {
176             return _sched.RescheduleJob(triggerKey, newTrigger).Result;
177         }
178         public void Shutdown(bool waitForJobsToComplete)
179         {
180             _sched.Shutdown(waitForJobsToComplete);
181         }
182         public void Clear()
183         {
184             _sched.Clear();
185         }
186 
187 
188     }
189 }
View Code