TPL Dataflow ,完成一个 Block ,重新创建一个 BLock
Posted
技术标签:
【中文标题】TPL Dataflow ,完成一个 Block ,重新创建一个 BLock【英文标题】:TPL Dataflow , finish a Block , re-create a BLock 【发布时间】:2021-09-28 00:00:08 【问题描述】:我正在使用 TPL 数据流显示视频,同时首先通过 TCP 将数据传递到板。我正在使用 CancellationTokenSource 来取消 Block 活动。但问题是,当我重新运行“CreateVideoProcessingNetwork”函数时,我没有响应。 .Post() 命令返回 false。我应该如何重新创建或“重新运行”TPL 数据流?这是代码:
private void TPL_Click(object sender, EventArgs e)
CreateVideoProcessingNetwork();
public async void CreateVideoProcessingNetwork()
string video_path = @"C:\.....\video_640x360_360p.mp4";
_canceller = new CancellationTokenSource();
/****************** METHOD 1 - with yield *************/
/* Video Loading TPL Block */
//var video_loader = new TransformManyBlock<string, Bitmap>(load_video, new ExecutionDataflowBlockOptions BoundedCapacity = 10 );
var send_recv_block = new TransformBlock<Bitmap, Bitmap>(async recv_bitmap =>
Console.WriteLine("Inside send_recv block");
var mem_stream = new MemoryStream();
recv_bitmap.Save(mem_stream, System.Drawing.Imaging.ImageFormat.Jpeg);
var recv_image_array = mem_stream.ToArray();
NetworkStream stream = client.GetStream();
byte[] transmit_buffer = new byte[4];
transmit_buffer[0] = (byte)(recv_image_array.Length & (0xFF));
transmit_buffer[1] = (byte)((recv_image_array.Length >> 8) & (0xFF));
transmit_buffer[2] = (byte)((recv_image_array.Length >> 16) & (0xFF));
transmit_buffer[3] = (byte)((recv_image_array.Length >> 24) & (0xFF));
// Sending first the 32bit length
await stream.WriteAsync(transmit_buffer, 0, 4);
// Sending data
await stream.WriteAsync(recv_image_array, 0, recv_image_array.Length);
// Receiving data
var recv_buffer = await Receive(stream);
Bitmap tx_image_array;
using (var ms = new MemoryStream(recv_image_array))
tx_image_array = new Bitmap(ms);
return tx_image_array;
,
new ExecutionDataflowBlockOptions
//BoundedCapacity = 10,
CancellationToken = cancellationSource.Token
);
/****************** METHOD 2 - with send async ***********/
var video_loader = new ActionBlock<string>(async path =>
Console.WriteLine("video_loader");
capture = new VideoCapture(path);
Mat matrix = new Mat();
capture.Read(matrix);
var mem_stream = new MemoryStream();
while (matrix.Rows != 0 && matrix.Width != 0)
Console.WriteLine("Inside Loop");
capture.Read(matrix);
if (matrix.Rows == 0 && matrix.Width == 0) break;
Bitmap bitmap = new Bitmap(matrix.Width, matrix.Rows);
bitmap = matrix.ToBitmap();
await send_recv_block.SendAsync(bitmap);
await Task.Delay(20);
if (_canceller.Token.IsCancellationRequested) break;
, new ExecutionDataflowBlockOptions
//BoundedCapacity = 10 ,
CancellationToken = cancellationSource.Token
);
/* Video Loading TPL Block */
var display_video = new ActionBlock<Bitmap>(async received_image =>
Console.WriteLine("Inside Display Video");
PicturePlot2.Image = received_image;
await Task.Delay(25);
,
new ExecutionDataflowBlockOptions()
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
//BoundedCapacity = 10,
CancellationToken = cancellationSource.Token
);
var linkOptions = new DataflowLinkOptions PropagateCompletion = true ;
/****************** METHOD 2 - with send async *************/
Console.WriteLine("to Link");
var send_recv_disposable = send_recv_block.LinkTo(display_video, linkOptions);
Console.WriteLine("Video path" + video_path);
//var apotelesma_post = video_loader.Post(video_path);
var apotelesma_post = await video_loader.SendAsync(video_path);
Console.WriteLine("Apotelesma Post "+ apotelesma_post);
video_loader.Complete();
try
await display_video.Completion;
catch (TaskCanceledException ex)
Console.WriteLine(ex.CancellationToken.IsCancellationRequested);
video_loader.Complete();
send_recv_block.Complete();
display_video.Complete();
MessageBox.Show("Video Ended");
private void Stop_Reset_Click(object sender, EventArgs e)
cancellationSource.Cancel();
_canceller.Cancel();
提前致谢
【问题讨论】:
您使用了多少个CancellationTokenSource
s?这些块侦听未指定的cancellationSource
,但该方法初始化_canceller
。为什么不只使用 _canceller
?这段代码的编写方式,第二次调用该方法的块将使用已经取消的cancellationSource
【参考方案1】:
你没有显示你声明变量cancellationSource
的位置,你提供给ExecutionDataFlowBlockOptions
。
当您向ExecutionDataFlowBlockOptions
提供取消令牌时,您是在告诉块在取消令牌时进入已完成状态,任务状态为已取消。文档告诉我们这是最终结果:
由于 CancellationToken 属性永久取消数据流块的执行,所以在用户取消操作后又想向管道添加更多工作项后,必须重新创建整个管道。[1]
因为您的停止按钮将此标记设置为取消,所以当您重新创建块时,它们开始处于取消状态。
_canceller = new CancellationTokenSource();
上方需要添加cancellationSource = new CancellationTokenSource();
。
【讨论】:
以上是关于TPL Dataflow ,完成一个 Block ,重新创建一个 BLock的主要内容,如果未能解决你的问题,请参考以下文章
使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?