TPL异步并行编程之任务超时
Posted bing
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TPL异步并行编程之任务超时相关的知识,希望对你有一定的参考价值。
此处参考自阿涛的博文:http://www.cnblogs.com/HelloMyWorld/p/5526914.html
一 自己定义
基本的思路:
net中异步操作由于是交给线程来实现,因此不可能真正想js那样将一个单线程上的任务移除:如
var id=setTimeout(fun,200);
if(id>0){
clearTimeout(id);//将一个任务从单线程的任务栈中移除,自然就做到了真正的移除任务
}
但是在net中一个任务交给线程执行后,具体什么时候执行完成我们并不确定,就算是我们把线程终止掉,如果任务执行完了,且执行完后与之关联的处理函数关系任然建立,那么其处理函数一样会执行
那么对于net这样的现状,我们只好斩断与之关联的处理函数的关系,来达到取消一个任务的或者认为他超时
同样的原理,比如我们有一个搜索框,或者地图缩放来做一些事情,其实我们在输入框中输入一段文字是非常快的,假如我们以文本发生变化就去发起网络搜索,那么其实会发起n个字符的搜索请求;
但是实际上我们想要只是最后一次发起的请求而已,那么问题就来了,这些回来的数据谁先谁后都是随机的,也就自然的出现了结果乱七八糟,那么怎么办呢?
1 同步搜索
我们可以锁定ui界面让用户不再能够发起请求:当然是一个传统并非很友好的解决
如:点击搜索按钮,调用搜索程序发起搜索,锁住ui,等待结果返回,再解锁ui将操作交给用户;
2 延时处理
比如连续的触发任务执行,那么我们就让他在小于一段时间内的触发不发起真正的请求,这种办法也只是减少无用请求而已
如:我一直点搜索按钮一直点,是的一直点,但是当我到达比如200ms的才会发起请求,之前点的都没用;
3 取消无用的处理
比如我连续发起了10次请求但是,只要最后一次的,那么我把之前的就取消掉,请求其实已经发出,只是对于结果我们丢弃掉了
如:我一直点一直点一直点,发起了10次请求,但是我每次发起请求前就把上一次的丢弃掉,注意这里并不是真正让这个网络请求取消了,本质上是没有办法取消掉的,
我只是让回来的结果丢弃掉而已不做任何处理了,那么就算是我点了n次其实我也只取了我最后一次的结果,所以这样看起来合情合理,但是对于网络流量要求的app来说就很不科学了
4 将2和3结合起来
首先我们做一个延时处理比如200ms,当达到延时处理后再发起请求,但是有个特殊地方比如我在下一个200ms内又发起请求,此时结果并没有回来,那么我们将之前的任务取消掉就好了,这样也就相对友好的
处理了这些矛盾,这里兼顾了2和3的优缺点
以下是我改进的代码,让任务可以手动取消
封装的超时Task类
1 public class TimeoutTask 2 { 3 #region 字段 4 private Action _action; 5 private CancellationToken _token; 6 private event AsyncCompletedEventHandler _asyncCompletedEvent; 7 private TaskCompletionSource<AsyncCompletedEventArgs> _tcs; 8 #endregion 9 10 #region 静态方法 11 public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, CancellationToken token) 12 { 13 return await TimeoutTask.StartNewTask(action, token, Timeout.Infinite); 14 } 15 16 public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, int timeout) 17 { 18 return await TimeoutTask.StartNewTask(action, CancellationToken.None, timeout); 19 } 20 21 public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, CancellationToken token, 22 int timeout = Timeout.Infinite) 23 { 24 var task = new TimeoutTask(action, token, timeout); 25 26 return await task.Run(); 27 } 28 #endregion 29 30 #region 构造 31 32 public TimeoutTask(Action action, int timeout) : this(action, CancellationToken.None, timeout) 33 { 34 35 } 36 37 public TimeoutTask(Action action, CancellationToken token) : this(action, token, Timeout.Infinite) 38 { 39 40 } 41 42 public TimeoutTask(Action action, CancellationToken token, int timeout = Timeout.Infinite) 43 { 44 _action = action; 45 46 _tcs = new TaskCompletionSource<AsyncCompletedEventArgs>(); 47 48 if (timeout != Timeout.Infinite) 49 { 50 var cts = CancellationTokenSource.CreateLinkedTokenSource(token); 51 cts.CancelAfter(timeout); 52 _token = cts.Token; 53 } 54 else 55 { 56 _token = token; 57 } 58 } 59 #endregion 60 61 #region 公用方法 62 63 /// <summary> 64 /// 运行 65 /// </summary> 66 /// <returns></returns> 67 public async Task<AsyncCompletedEventArgs> Run() 68 { 69 _asyncCompletedEvent += AsyncCompletedEventHandler; 70 71 try 72 { 73 using (_token.Register(() => _tcs.TrySetCanceled())) 74 { 75 ExecuteAction(); 76 return await _tcs.Task.ConfigureAwait(false); 77 } 78 79 } 80 finally 81 { 82 _asyncCompletedEvent -= AsyncCompletedEventHandler; 83 } 84 85 } 86 87 public void Cancel() 88 { 89 if (!_token.CanBeCanceled) 90 { 91 _tcs.TrySetCanceled(); 92 } 93 } 94 #endregion 95 96 #region 私有方法 97 98 /// <summary> 99 /// 执行Action 100 /// </summary> 101 private void ExecuteAction() 102 { 103 Task.Factory.StartNew(() => 104 { 105 _action.Invoke(); 106 107 OnAsyncCompleteEvent(null); 108 }); 109 } 110 111 /// <summary> 112 /// 异步完成事件处理 113 /// </summary> 114 /// <param name="sender"></param> 115 /// <param name="e"></param> 116 private void AsyncCompletedEventHandler(object sender, AsyncCompletedEventArgs e) 117 { 118 if (e.Cancelled) 119 { 120 _tcs.TrySetCanceled(); 121 } 122 else if (e.Error != null) 123 { 124 _tcs.TrySetException(e.Error); 125 } 126 else 127 { 128 _tcs.TrySetResult(e); 129 } 130 } 131 132 /// <summary> 133 /// 触发异步完成事件 134 /// </summary> 135 /// <param name="userState"></param> 136 private void OnAsyncCompleteEvent(object userState) 137 { 138 if (_asyncCompletedEvent != null) 139 { 140 _asyncCompletedEvent(this, new AsyncCompletedEventArgs(error: null, cancelled: false, userState: userState)); 141 } 142 } 143 #endregion 144 } 145 146 /// <summary> 147 /// 有返回值,可超时,可取消的Task 148 /// </summary> 149 /// <typeparam name="T"></typeparam> 150 public class TimeoutTask<T> 151 { 152 #region 字段 153 private Func<T> _func; 154 private CancellationToken _token; 155 private event AsyncCompletedEventHandler _asyncCompletedEvent; 156 private TaskCompletionSource<AsyncCompletedEventArgs> _tcs; 157 #endregion 158 159 #region 静态方法 160 public static async Task<T> StartNewTask(Func<T> func, CancellationToken token, 161 int timeout = Timeout.Infinite) 162 { 163 var task = new TimeoutTask<T>(func, token, timeout); 164 165 return await task.Run(); 166 } 167 168 public static async Task<T> StartNewTask(Func<T> func, int timeout) 169 { 170 return await TimeoutTask<T>.StartNewTask(func, CancellationToken.None, timeout); 171 } 172 173 public static async Task<T> StartNewTask(Func<T> func, CancellationToken token) 174 { 175 return await TimeoutTask<T>.StartNewTask(func, token, Timeout.Infinite); 176 } 177 178 179 180 #endregion 181 182 #region 公用方法 183 /// <summary> 184 /// 运行Task 185 /// </summary> 186 /// <returns></returns> 187 public async Task<T> Run() 188 { 189 _asyncCompletedEvent += AsyncCompletedEventHandler; 190 191 try 192 { 193 using (_token.Register(() => _tcs.TrySetCanceled())) 194 { 195 ExecuteFunc(); 196 var args = await _tcs.Task.ConfigureAwait(false); 197 return (T)args.UserState; 198 } 199 200 } 201 finally 202 { 203 _asyncCompletedEvent -= AsyncCompletedEventHandler; 204 } 205 206 } 207 208 209 public bool CanBeCanceled 210 { 211 get { return _token.CanBeCanceled; } 212 } 213 214 public void Cancel() 215 { 216 if (!_token.CanBeCanceled) 217 { 218 _tcs.SetCanceled(); 219 } 220 } 221 #endregion 222 223 #region 构造 224 public TimeoutTask(Func<T> func, CancellationToken token) : this(func, token, Timeout.Infinite) 225 { 226 227 } 228 229 public TimeoutTask(Func<T> func, int timeout = Timeout.Infinite) : this(func, CancellationToken.None, timeout) 230 { 231 232 } 233 234 public TimeoutTask(Func<T> func, CancellationToken token, int timeout = Timeout.Infinite) 235 { 236 _func = func; 237 238 _tcs = new TaskCompletionSource<AsyncCompletedEventArgs>(); 239 240 if (timeout != Timeout.Infinite) 241 { 242 var cts = CancellationTokenSource.CreateLinkedTokenSource(token); 243 244 cts.CancelAfter(timeout); 245 _token = cts.Token; 246 } 247 else 248 { 249 _token = token; 250 } 251 } 252 #endregion 253 254 #region 私有方法 255 256 257 /// <summary> 258 /// 执行 259 /// </summary> 260 private void ExecuteFunc() 261 { 262 ThreadPool.QueueUserWorkItem(s => 263 { 264 var result = _func.Invoke(); 265 266 OnAsyncCompleteEvent(result); 267 }); 268 } 269 270 /// <summary> 271 /// 异步完成事件处理 272 /// </summary> 273 /// <param name="sender"></param> 274 /// <param name="e"></param> 275 private void AsyncCompletedEventHandler(object sender, AsyncCompletedEventArgs e) 276 { 277 if (e.Cancelled) 278 { 279 _tcs.TrySetCanceled(); 280 } 281 else if (e.Error != null) 282 { 283 _tcs.TrySetException(e.Error); 284 } 285 else 286 { 287 _tcs.TrySetResult(e); 288 } 289 } 290 291 /// <summary> 292 /// 触发异步完成事件 293 /// </summary> 294 /// <param name="userState"></param> 295 private void OnAsyncCompleteEvent(object userState) 296 { 297 if (_asyncCompletedEvent != null) 298 { 299 _asyncCompletedEvent(this, new AsyncCompletedEventArgs(error: null, cancelled: false, userState: userState)); 300 } 301 } 302 #endregion 303 }
demo
1 class Program 2 { 3 private static TimeoutTask<string> result; 4 5 static void Main(string[] args) 6 { 7 8 9 ThreadMethod(); 10 11 12 Console.WriteLine("启动完成"); 13 Console.ReadLine(); 14 } 15 16 private async static void ThreadMethod() 17 { 18 // await TimeoutTask.StartNewTask(LongTimeWork, 6000); 19 // await TimeoutTask<string>.StartNewTask(LongTimeWork2, 2000); 20 try 21 { 22 for (int i = 0; i < 5; i++) 23 { 24 25 //我手动取消掉上一次的 26 if (result!=null) 27 { 28 try 29 { 30 //取消掉 31 result.Cancel(); 32 } 33 catch (Exception er) 34 { 35 } 36 } 37 38 result = new TimeoutTask<string>(LongTimeWork2); 39 40 41 42 try 43 { 44 result.Run(); 45 } 46 catch (Exception ee) 47 { 48 49 } 50 } 51 52 53 Console.WriteLine(result); 54 } 55 catch (Exception ex) 56 { 57 58 } 59 } 60 61 private static void LongTimeWork() 62 { 63 Thread.Sleep(5000); 64 } 65 66 private static string LongTimeWork2() 67 { 68 Thread.Sleep(5000); 69 return "XXD"; 70 } 71 72 73 }
二 Task天生超时
什么是尝试超时,比如说连接数据库就有TryConnect尝试,在一些访问资源的时候经常用到,且Task本身也天生支持超时处理
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var task=Task.Factory.StartNew(() =>
{
Thread.Sleep(3 * 1000);
}, token);
var timeout = task.Wait(4*1000,token);
if (timeout)
{
}
Wait会等待给定的时间,如果在给定的时间内已经完成那么,将返回true,意思是在指定的时间内完成了一个task,反之就认为超时了,这个也不乏一种超时处理
以上是关于TPL异步并行编程之任务超时的主要内容,如果未能解决你的问题,请参考以下文章