Redis分布式队列和缓存更新
Posted Kaden
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis分布式队列和缓存更新相关的知识,希望对你有一定的参考价值。
原文链接:https://www.cnblogs.com/hua66/p/9600085.html
在使用Redis中,我们可能会遇到以下场景:
例如:
某用户向服务器中发送一个请求,服务器将用户请求加入Redis任务队列,任务完成则移出队列。
以上场景有几点疑问:
- Redis队列中数据如果不仅仅来自于我们的应用程序,那么我们怎么把这个数据加入Redis?
- 当Redis队列中用户的请求达程序所能处理的峰值。那么我们该如何处理这些用户请求?
解决方案:
- 对外提供接口,将请求数据添加至DB。启动一个定时服务,在规定时间扫描DB中的请求数据并添加至Redis队列。
- 使用分布式异步队列。
以上解决方案都可以使用插件来实现。
一、
关于Quartz.Net可以通过上面链接获取官方API。它与SQL Server中的代理作业有着同样功能。
代码示例:
1 /// <summary> 2 /// 添加Job并以周期的形式运行 3 /// </summary> 4 /// <typeparam name="T"></typeparam> 5 /// <param name="jobName">job名称</param> 6 /// <param name="jobGroupName">job组名称</param> 7 /// <param name="replace">job是否可修改</param> 8 /// <param name="triggerName">job触发器的名称</param> 9 /// <param name="minutes">job执行的时间间隔,以分为单位</param> 10 /// <returns></returns> 11 public DateTimeOffset AddJob<T>(string jobName, string jobGroupName, bool replace, string triggerName, int minutes) where T : IJob 12 { 13 IJobDetail jobDetail = JobBuilder.Create<T>().WithIdentity(jobName, jobGroupName).Build(); 14 _sched.AddJob(jobDetail, replace); 15 ITrigger trigger = TriggerBuilder.Create() 16 .WithIdentity(triggerName, jobGroupName) 17 .StartNow() 18 .WithSimpleSchedule(x => x 19 .WithIntervalInMinutes(minutes)//表示分钟的时间间隔 20 .RepeatForever()) 21 .Build(); 22 return _sched.ScheduleJob(jobDetail, trigger).Result; 23 }
以上的代码中是基于Quartz封装一个添加了Job的方法。这个方法依赖于 “IJobDetail” 和 “ITrigger” 这两个对象。
“IJobDetail” 表示Job的身份信息,“ITrigger” 则包含了Job执行信息,它表示Job该如何执行。
以下为调用示例:
1 QuartzHelper quartzHelper = QuartzHelper.CreateInstance(); 2 quartzHelper.AddJob<TestJob>("testJob", "testJob_Group",false, "testJob_Trigger",1*10);
上述实例中的 “TestJob” 实现Quartz.Net中的 "IJob" 接口。这个接口只有一个方法 “Execute”,并由Quartz.Net框架自行调用该方法。
你可以在此方法中执行你的代码。并在添加该Job制定你的执行策略 “ITrigger” 对象。然后框架会根据你制定的策略进行调用。调用参数请参见上述封装。
下面是向Redis队列插入数据的示例Job:
1 public class TestJob : IJob 2 { 3 Task IJob.Execute(IJobExecutionContext context) 4 { 5 //JobDataMap dataMap = context.JobDetail.JobDataMap; 6 Task task = Task.Run( 7 () => 8 { 9 Console.WriteLine(string.Format("{0}开始执行!当前系统时间{1}", this.GetType().Name, DateTime.Now)); 10 try 11 { 12 string redisKey = this.GetType().Name; 13 RedisHelper redisHelper = new RedisHelper(); 14 if (redisHelper.KeyExists(redisKey)) 15 { 16 redisHelper.KeyDelete(redisKey); 17 }; 18 19 for (int i = 1; i <= 10; i++) 20 { 21 User user = new User() 22 { 23 ID = i, 24 Name = "user" + DateTime.Now.Ticks +"_"+ i 25 }; 26 redisHelper.ListLeftPush<User>(redisKey, user);//模拟DB用户数据 27 } 28 } 29 catch (Exception ex) 30 { 31 Console.WriteLine(string.Format("{0}任务出现异常,异常信息:{1}!当前系统时间{2}", this.GetType().Name, ex.Message, DateTime.Now)); 32 } 33 } 34 ); 35 return task; 36 } 37 }
上面的 “TestJob” 模拟了从DB加载用户请求数据至Redis队列。至此我们已经解决了上面的第一个问题。
二、
在.Net中Redis的插件不多。比较流行有 "ServiceStack.Redis" 和 "StackExchange.Redis" 。
"ServiceStack.Redis" 为官方推出的插件。非开源插件,且普通版最高只支持 6000/S 读写。高级版是要收费的。为了后续扩展,这里我们采用 "StackExchange.Redis" 。
关于StackExchange.Redis可以通过上面链接获取官方API,目前是开源的。
在第一个问题中,已经通过定时Job的方式向Redis队列中填充数据。下面我们通过 "StackExchange.Redis" 获取Redis队列中的请求并处理这些请求。
1.加载数据至Redis:
1 using APP_Test.Job; 2 using Common.Quartz; 3 using Common.Redis.StackExchange; 4 using Quartz; 5 using System; 6 using System.Collections.Generic; 7 using System.Linq; 8 using System.Text; 9 using System.Threading.Tasks; 10 11 namespace APP_Test 12 { 13 class Program 14 { 15 static void Main(string[] args) 16 { 17 { 18 RedisHelper redisHelperA = new RedisHelper(); 19 RedisHelper redisHelperB = new RedisHelper(); 20 string stra = redisHelperA.StringGet("mykey"); 21 string strb = redisHelperB.StringGet("mykey"); 22 if (stra== strb) 23 { 24 Console.WriteLine(string.Format("***********{0}=={1}***********", stra, strb)); 25 } 26 else 27 { 28 Console.WriteLine(string.Format("***********{0}!={1}***********", stra, strb)); 29 } 30 } 31 32 { 33 34 QuartzHelper quartzHelper = QuartzHelper.CreateInstance(); 35 quartzHelper.AddJob<TestJob>("testJob", "testJob_Group",false, "testJob_Trigger",1*10);//这里设置了以秒为单位 36 } 37 38 Console.ReadKey(); 39 } 40 41 } 42 }
可以看到上面代码执行的时间节点与我们所添加job中的 ”ITrigger “ 的触发策略完全一致。至此,我们第一步已得到验证。
2.启动处理Redis队列中请求的程序。
1 using APP_Test.Models; 2 using Common.Redis.StackExchange; 3 using System; 4 using System.Collections.Generic; 5 using System.Linq; 6 using System.Text; 7 using System.Threading.Tasks; 8 9 namespace APP_RedisClientTest 10 { 11 class Program 12 { 13 static void Main(string[] args) 14 { 15 RedisHelper redisHelper = new RedisHelper(); 16 string redisKey = "TestJob"; 17 while (true) 18 { 19 Action action = new Action(() => 20 { 21 User user = redisHelper.ListLeftPop<User>(redisKey);//获取请求数据并移出队列 22 if (user!=null) 23 { 24 Console.WriteLine(string.Format("*******{0}*******", user.Name)); 25 } 26 } 27 ); 28 action.EndInvoke(action.BeginInvoke(null, null)); 29 } 30 Console.ReadKey(); 31 } 32 } 33 }
上面我启动3个客户端实例,他们一起处理Redis队列中的请求。每当Job向Redis队列中添加请求对象后就会立即被我们处理请求的程序获取并消费,每当一个请求被消费就会被移出Redis队列。并且遵循先入先出的准则。按照上述,如果出现主程序请求量过高情形,我们只需要启动多个处理请求的辅助程序即可缓解主程序的压力。
至此上面的两个问题已得到验证。
如下附个人基于 "Quartz.Net" 和 "StackExchange.Redis" 封装的帮助类
1 using System; 2 using System.Collections.Generic; 3 4 namespace Common.Quartz 5 { 6 using global::Quartz; 7 using global::Quartz.Impl; 8 using global::Quartz.Impl.Matchers; 9 10 /// <summary> 11 /// V:3.0.6.0 12 /// </summary> 13 public class QuartzHelper 14 { 15 private readonly static object _obj = new object();//单例锁 16 17 //private ISchedulerFactory _sf = null; 18 19 private static IScheduler _sched = null; 20 /// <summary> 21 /// 提供IScheduler对象,访问异步方法 22 /// </summary> 23 public IScheduler Scheduler { get { return _sched; } } 24 25 private static QuartzHelper _quartzHelper = null;//单例对象 26 27 private QuartzHelper() 28 { 29 //_sf = new StdSchedulerFactory(); 30 //_sched = _sf.GetScheduler().Result; 31 _sched = StdSchedulerFactory.GetDefaultScheduler().Result; 32 _sched.Start(); 33 } 34 35 /// <summary> 36 /// 获取单例对象 37 /// </summary> 38 /// <returns></returns> 39 public static QuartzHelper CreateInstance() 40 { 41 if (_quartzHelper == null) //双if +lock 42 { 43 lock (_obj) 44 { 45 if (_quartzHelper == null) 46 { 47 _quartzHelper = new QuartzHelper(); 48 } 49 } 50 } 51 return _quartzHelper; 52 } 53 public bool CheckExists(TriggerKey triggerKey) 54 { 55 return _sched.CheckExists(triggerKey).Result; 56 } 57 public bool CheckExists(JobKey jobKey) 58 { 59 return _sched.CheckExists(jobKey).Result; 60 } 61 public IReadOnlyCollection<IJobExecutionContext> GetCurrentlyExecutingJobs() 62 { 63 return _sched.GetCurrentlyExecutingJobs().Result; 64 } 65 66 /// <summary> 67 /// 添加Job并以周期的形式运行 68 /// </summary> 69 /// <typeparam name="T"></typeparam> 70 /// <param name="jobName">job名称</param> 71 /// <param name="jobGroupName">job组名称</param> 72 /// <param name="replace">job是否可修改</param> 73 /// <param name="triggerName">job触发器的名称</param> 74 /// <param name="minutes">job执行的时间间隔,以分为单位</param> 75 /// <returns></returns> 76 public DateTimeOffset AddJob<T>(string jobName, string jobGroupName, bool replace, string triggerName, int minutes) where T : IJob 77 { 78 IJobDetail jobDetail = JobBuilder.Create<T>().WithIdentity(jobName, jobGroupName).Build(); 79 _sched.AddJob(jobDetail, replace); 80 ITrigger trigger = TriggerBuilder.Create() 81 .WithIdentity(triggerName, jobGroupName) 82 .StartNow() 83 .WithSimpleSchedule(x => x 84 .WithIntervalInSeconds(minutes)//seconds表示秒的时间间隔 85 //.WithIntervalInMinutes(minutes)//表示分钟的时间间隔 86 .RepeatForever()) 87 .Build(); 88 return _sched.ScheduleJob(jobDetail, trigger).Result; 89 } 90 public bool DeleteJobs(IReadOnlyCollection<JobKey> jobKeys) 91 { 92 return _sched.DeleteJobs(jobKeys).Result; 93 } 94 public IJobDetail GetJobDetail(JobKey jobKey) 95 { 96 return _sched.GetJobDetail(jobKey).Result; 97 } 98 public IReadOnlyCollection<string> GetJobGroupNames() 99 { 100 return _sched.GetJobGroupNames().Result; 101 } 102 public IReadOnlyCollection<JobKey> GetJobKeys(GroupMatcher<JobKey> matcher) 103 { 104 return _sched.GetJobKeys(matcher).Result; 105 } 106 public bool Interrupt(JobKey jobKey) 107 { 108 return _sched.Interrupt(jobKey).Result; 109 } 110 public bool IsJobGroupPaused(string groupName) 111 { 112 return _sched.IsJobGroupPaused(groupName).Result; 113 } 114 public ITrigger GetTrigger(TriggerKey triggerKey) 115 { 116 return _sched.GetTrigger(triggerKey).Result; 117 } 118 public IReadOnlyCollection<string> GetTriggerGroupNames() 119 { 120 return _sched.GetTriggerGroupNames().Result; 121 } 122 public IReadOnlyCollection<TriggerKey> GetTriggerKeys(GroupMatcher<TriggerKey> matcher) 123 { 124 return _sched.GetTriggerKeys(matcher).Result; 125 } 126 public IReadOnlyCollection<ITrigger> GetTriggersOfJob(JobKey jobKey) 127 { 128 return _sched.GetTriggersOfJob(jobKey).Result; 129 } 130 public TriggerState GetTriggerState(TriggerKey triggerKey) 131 { 132 return _sched.GetTriggerState(triggerKey).Result; 133 } 134 public IReadOnlyCollection<string> GetPausedTriggerGroups() 135 { 136 return _sched.GetPausedTriggerGroups().Result; 137 } 138 public bool Interrupt(string fireInstanceId) 139 { 140 return _sched.Interrupt(fireInstanceId).Result; 141 } 142 public bool IsTriggerGroupPaused(string groupName) 143 { 144 return _sched.IsTriggerGroupPaused(groupName).Result; 145 } 146 public void PauseAll() 147 { 148 _sched.PauseAll(); 149 } 150 public void PauseJobs(GroupMatcher<JobKey> matcher) 151 { 152 _sched.PauseJobs(matcher); 153 } 154 public void PauseTriggers(GroupMatcher<TriggerKey> matcher) 155 { 156 _sched.PauseTriggers(matcher); 157 } 158 public void ResumeAll() 159 { 160 _sched.ResumeAll(); 161 } 162 public void ResumeJobs(GroupMatcher<JobKey> matcher) 163 { 164 _sched.ResumeJobs(matcher); 165 } 166 public void ResumeTriggers(GroupMatcher<TriggerKey> matcher) 167 { 168 _sched.ResumeTriggers(matcher); 169 } 170 public void ScheduleJobs(IReadOnlyDictionary<IJobDetail, IReadOnlyCollection<ITrigger>> triggersAndJobs, bool replace) 171 { 172 _sched.ScheduleJobs(triggersAndJobs, replace); 173 } 174 public DateTimeOffset? RescheduleJob(TriggerKey triggerKey, ITrigger newTrigger) 175 { 176 return _sched.RescheduleJob(triggerKey, newTrigger).Result; 177 } 178 public void Shutdown(bool waitForJobsToComplete) 179 { 180 _sched.Shutdown(waitForJobsToComplete); 181 } 182 public void Clear() 183 { 184 _sched.Clear(); 185 } 186 187 188 } 189 }
1 using面试官心理分析+面试题剖析:消息队列+Redis 缓存+分布式系统等Redis个人笔记:Redis应用场景,Redis常见命令,Reids缓存击穿穿透,Redis分布式锁实现方案,秒杀设计思路,Redis消息队列,Reids持久化,Redis主从哨兵分片集群