TPL异步并行编程之任务超时

Posted bing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TPL异步并行编程之任务超时相关的知识,希望对你有一定的参考价值。

此处参考自阿涛的博文:http://www.cnblogs.com/HelloMyWorld/p/5526914.html

 

一 自己定义

基本的思路:

net中异步操作由于是交给线程来实现,因此不可能真正想js那样将一个单线程上的任务移除:如

var id=setTimeout(fun,200);

 if(id>0){

  clearTimeout(id);//将一个任务从单线程的任务栈中移除,自然就做到了真正的移除任务

}

但是在net中一个任务交给线程执行后,具体什么时候执行完成我们并不确定,就算是我们把线程终止掉,如果任务执行完了,且执行完后与之关联的处理函数关系任然建立,那么其处理函数一样会执行

 

那么对于net这样的现状,我们只好斩断与之关联的处理函数的关系,来达到取消一个任务的或者认为他超时

 

同样的原理,比如我们有一个搜索框,或者地图缩放来做一些事情,其实我们在输入框中输入一段文字是非常快的,假如我们以文本发生变化就去发起网络搜索,那么其实会发起n个字符的搜索请求;

 

但是实际上我们想要只是最后一次发起的请求而已,那么问题就来了,这些回来的数据谁先谁后都是随机的,也就自然的出现了结果乱七八糟,那么怎么办呢?

 

1 同步搜索

我们可以锁定ui界面让用户不再能够发起请求:当然是一个传统并非很友好的解决

如:点击搜索按钮,调用搜索程序发起搜索,锁住ui,等待结果返回,再解锁ui将操作交给用户;

2 延时处理

比如连续的触发任务执行,那么我们就让他在小于一段时间内的触发不发起真正的请求,这种办法也只是减少无用请求而已

如:我一直点搜索按钮一直点,是的一直点,但是当我到达比如200ms的才会发起请求,之前点的都没用;

3 取消无用的处理

比如我连续发起了10次请求但是,只要最后一次的,那么我把之前的就取消掉,请求其实已经发出,只是对于结果我们丢弃掉了

如:我一直点一直点一直点,发起了10次请求,但是我每次发起请求前就把上一次的丢弃掉,注意这里并不是真正让这个网络请求取消了,本质上是没有办法取消掉的,

我只是让回来的结果丢弃掉而已不做任何处理了,那么就算是我点了n次其实我也只取了我最后一次的结果,所以这样看起来合情合理,但是对于网络流量要求的app来说就很不科学了

4 将2和3结合起来

首先我们做一个延时处理比如200ms,当达到延时处理后再发起请求,但是有个特殊地方比如我在下一个200ms内又发起请求,此时结果并没有回来,那么我们将之前的任务取消掉就好了,这样也就相对友好的

处理了这些矛盾,这里兼顾了2和3的优缺点

以下是我改进的代码,让任务可以手动取消

 

封装的超时Task类

  1 public class TimeoutTask
  2 {
  3     #region 字段
  4     private Action _action;
  5     private CancellationToken _token;
  6     private event AsyncCompletedEventHandler _asyncCompletedEvent;
  7     private TaskCompletionSource<AsyncCompletedEventArgs> _tcs;
  8     #endregion
  9 
 10     #region 静态方法
 11     public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, CancellationToken token)
 12     {
 13         return await TimeoutTask.StartNewTask(action, token, Timeout.Infinite);
 14     }
 15 
 16     public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, int timeout)
 17     {
 18         return await TimeoutTask.StartNewTask(action, CancellationToken.None, timeout);
 19     }
 20 
 21     public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, CancellationToken token,
 22         int timeout = Timeout.Infinite)
 23     {
 24         var task = new TimeoutTask(action, token, timeout);
 25 
 26         return await task.Run();
 27     }
 28     #endregion
 29 
 30     #region 构造
 31 
 32     public TimeoutTask(Action action, int timeout) : this(action, CancellationToken.None, timeout)
 33     {
 34 
 35     }
 36 
 37     public TimeoutTask(Action action, CancellationToken token) : this(action, token, Timeout.Infinite)
 38     {
 39 
 40     }
 41 
 42     public TimeoutTask(Action action, CancellationToken token, int timeout = Timeout.Infinite)
 43     {
 44         _action = action;
 45 
 46         _tcs = new TaskCompletionSource<AsyncCompletedEventArgs>();
 47 
 48         if (timeout != Timeout.Infinite)
 49         {
 50             var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
 51             cts.CancelAfter(timeout);
 52             _token = cts.Token;
 53         }
 54         else
 55         {
 56             _token = token;
 57         }
 58     }
 59     #endregion
 60 
 61     #region 公用方法
 62 
 63     /// <summary>
 64     /// 运行
 65     /// </summary>
 66     /// <returns></returns>
 67     public async Task<AsyncCompletedEventArgs> Run()
 68     {
 69         _asyncCompletedEvent += AsyncCompletedEventHandler;
 70 
 71         try
 72         {
 73             using (_token.Register(() => _tcs.TrySetCanceled()))
 74             {
 75                 ExecuteAction();
 76                 return await _tcs.Task.ConfigureAwait(false);
 77             }
 78 
 79         }
 80         finally
 81         {
 82             _asyncCompletedEvent -= AsyncCompletedEventHandler;
 83         }
 84 
 85     }
 86 
 87     public void Cancel()
 88     {
 89         if (!_token.CanBeCanceled)
 90         {
 91             _tcs.TrySetCanceled();
 92         }
 93     }
 94     #endregion
 95 
 96     #region 私有方法
 97 
 98     /// <summary>
 99     /// 执行Action
100     /// </summary>
101     private void ExecuteAction()
102     {
103         Task.Factory.StartNew(() =>
104         {
105             _action.Invoke();
106 
107             OnAsyncCompleteEvent(null);
108         });
109     }
110 
111     /// <summary>
112     /// 异步完成事件处理
113     /// </summary>
114     /// <param name="sender"></param>
115     /// <param name="e"></param>
116     private void AsyncCompletedEventHandler(object sender, AsyncCompletedEventArgs e)
117     {
118         if (e.Cancelled)
119         {
120             _tcs.TrySetCanceled();
121         }
122         else if (e.Error != null)
123         {
124             _tcs.TrySetException(e.Error);
125         }
126         else
127         {
128             _tcs.TrySetResult(e);
129         }
130     }
131 
132     /// <summary>
133     /// 触发异步完成事件
134     /// </summary>
135     /// <param name="userState"></param>
136     private void OnAsyncCompleteEvent(object userState)
137     {
138         if (_asyncCompletedEvent != null)
139         {
140             _asyncCompletedEvent(this, new AsyncCompletedEventArgs(error: null, cancelled: false, userState: userState));
141         }
142     }
143     #endregion
144 }
145 
146 /// <summary>
147 /// 有返回值,可超时,可取消的Task
148 /// </summary>
149 /// <typeparam name="T"></typeparam>
150 public class TimeoutTask<T>
151 {
152     #region 字段
153     private Func<T> _func;
154     private CancellationToken _token;
155     private event AsyncCompletedEventHandler _asyncCompletedEvent;
156     private TaskCompletionSource<AsyncCompletedEventArgs> _tcs;
157     #endregion
158 
159     #region 静态方法
160     public static async Task<T> StartNewTask(Func<T> func, CancellationToken token,
161         int timeout = Timeout.Infinite)
162     {
163         var task = new TimeoutTask<T>(func, token, timeout);
164 
165         return await task.Run();
166     }
167 
168     public static async Task<T> StartNewTask(Func<T> func, int timeout)
169     {
170         return await TimeoutTask<T>.StartNewTask(func, CancellationToken.None, timeout);
171     }
172 
173     public static async Task<T> StartNewTask(Func<T> func, CancellationToken token)
174     {
175         return await TimeoutTask<T>.StartNewTask(func, token, Timeout.Infinite);
176     }
177 
178 
179 
180     #endregion
181 
182     #region 公用方法
183     /// <summary>
184     /// 运行Task
185     /// </summary>
186     /// <returns></returns>
187     public async Task<T> Run()
188     {
189         _asyncCompletedEvent += AsyncCompletedEventHandler;
190 
191         try
192         {
193             using (_token.Register(() => _tcs.TrySetCanceled()))
194             {
195                 ExecuteFunc();
196                 var args = await _tcs.Task.ConfigureAwait(false);
197                 return (T)args.UserState;
198             }
199 
200         }
201         finally
202         {
203             _asyncCompletedEvent -= AsyncCompletedEventHandler;
204         }
205 
206     }
207 
208 
209     public bool CanBeCanceled
210     {
211         get { return _token.CanBeCanceled; }
212     }
213 
214     public void Cancel()
215     {
216         if (!_token.CanBeCanceled)
217         {
218             _tcs.SetCanceled(); 
219         }
220     }
221     #endregion
222 
223     #region 构造
224     public TimeoutTask(Func<T> func, CancellationToken token) : this(func, token, Timeout.Infinite)
225     {
226 
227     }
228 
229     public TimeoutTask(Func<T> func, int timeout = Timeout.Infinite) : this(func, CancellationToken.None, timeout)
230     {
231 
232     }
233 
234     public TimeoutTask(Func<T> func, CancellationToken token, int timeout = Timeout.Infinite)
235     {
236         _func = func;
237 
238         _tcs = new TaskCompletionSource<AsyncCompletedEventArgs>();
239 
240         if (timeout != Timeout.Infinite)
241         {
242             var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
243 
244             cts.CancelAfter(timeout);
245             _token = cts.Token;
246         }
247         else
248         {
249             _token = token;
250         }
251     }
252     #endregion
253 
254     #region 私有方法
255    
256 
257     /// <summary>
258     /// 执行
259     /// </summary>
260     private void ExecuteFunc()
261     {
262         ThreadPool.QueueUserWorkItem(s =>
263         {
264             var result = _func.Invoke();
265 
266             OnAsyncCompleteEvent(result);
267         });
268     }
269 
270     /// <summary>
271     /// 异步完成事件处理
272     /// </summary>
273     /// <param name="sender"></param>
274     /// <param name="e"></param>
275     private void AsyncCompletedEventHandler(object sender, AsyncCompletedEventArgs e)
276     {
277         if (e.Cancelled)
278         {
279             _tcs.TrySetCanceled();
280         }
281         else if (e.Error != null)
282         {
283             _tcs.TrySetException(e.Error);
284         }
285         else
286         {
287             _tcs.TrySetResult(e);
288         }
289     }
290 
291     /// <summary>
292     /// 触发异步完成事件
293     /// </summary>
294     /// <param name="userState"></param>
295     private void OnAsyncCompleteEvent(object userState)
296     {
297         if (_asyncCompletedEvent != null)
298         {
299             _asyncCompletedEvent(this, new AsyncCompletedEventArgs(error: null, cancelled: false, userState: userState));
300         }
301     }
302     #endregion
303 }
View Code

 

demo

 1 class Program
 2     {
 3         private static TimeoutTask<string> result;
 4 
 5         static void Main(string[] args)
 6         {
 7             
 8 
 9             ThreadMethod();
10 
11 
12             Console.WriteLine("启动完成");
13             Console.ReadLine();
14         }
15 
16         private async static void ThreadMethod()
17         {
18 //            await TimeoutTask.StartNewTask(LongTimeWork, 6000);
19 //            await TimeoutTask<string>.StartNewTask(LongTimeWork2, 2000);
20             try
21             {
22                 for (int i = 0; i < 5; i++)
23                 {
24 
25                     //我手动取消掉上一次的
26                     if (result!=null)
27                     {
28                         try
29                         {
30                             //取消掉
31                             result.Cancel();
32                         }
33                         catch (Exception er)
34                         {
35                         }
36                     }
37 
38                     result = new TimeoutTask<string>(LongTimeWork2);
39 
40 
41 
42                     try
43                     {
44                         result.Run();
45                     }
46                     catch (Exception ee)
47                     {
48                         
49                     }
50                 }
51 
52 
53                 Console.WriteLine(result);
54             }
55             catch (Exception ex)
56             {
57                 
58             }
59         }
60 
61         private static void LongTimeWork()
62         {
63             Thread.Sleep(5000);
64         }
65 
66         private static string LongTimeWork2()
67         {
68             Thread.Sleep(5000);
69             return "XXD";
70         }
71 
72 
73     }
View Code

 

 

 

二 Task天生超时

什么是尝试超时,比如说连接数据库就有TryConnect尝试,在一些访问资源的时候经常用到,且Task本身也天生支持超时处理

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var task=Task.Factory.StartNew(() =>
{
Thread.Sleep(3 * 1000);

}, token);

var timeout = task.Wait(4*1000,token);

if (timeout)
{

}

 

Wait会等待给定的时间,如果在给定的时间内已经完成那么,将返回true,意思是在指定的时间内完成了一个task,反之就认为超时了,这个也不乏一种超时处理

 

以上是关于TPL异步并行编程之任务超时的主要内容,如果未能解决你的问题,请参考以下文章

三分钟总览微软任务并行库TPL

线程可以做啥,而基于任务的异步模式(TAP)和任务并行(TPL)与任务(或任务<T>)不能做啥?

TPL 数据流是并行的或有序的,但不是两者

浅谈.Net异步编程的前世今生----异步函数篇(完结)

浅谈.Net异步编程的前世今生----TPL篇

理解并行编程