TPL Dataflow ,完成一个 Block ,重新创建一个 BLock

Posted

技术标签:

【中文标题】TPL Dataflow ,完成一个 Block ,重新创建一个 BLock【英文标题】:TPL Dataflow , finish a Block , re-create a BLock 【发布时间】:2021-09-28 00:00:08 【问题描述】:

我正在使用 TPL 数据流显示视频,同时首先通过 TCP 将数据传递到板。我正在使用 CancellationTokenSource 来取消 Block 活动。但问题是,当我重新运行“CreateVideoProcessingNetwork”函数时,我没有响应。 .Post() 命令返回 false。我应该如何重新创建或“重新运行”TPL 数据流?这是代码:

private void TPL_Click(object sender, EventArgs e)
        
            CreateVideoProcessingNetwork();
        

        

        public async void CreateVideoProcessingNetwork()
        
            string video_path = @"C:\.....\video_640x360_360p.mp4";

            
            _canceller = new CancellationTokenSource();
            /****************** METHOD 1 - with yield *************/

            /* Video Loading TPL Block */
            //var video_loader = new TransformManyBlock<string, Bitmap>(load_video, new ExecutionDataflowBlockOptions  BoundedCapacity = 10 );


            var send_recv_block = new TransformBlock<Bitmap, Bitmap>(async recv_bitmap =>
            
                Console.WriteLine("Inside send_recv block");
                var mem_stream = new MemoryStream();
                recv_bitmap.Save(mem_stream, System.Drawing.Imaging.ImageFormat.Jpeg);
                var recv_image_array = mem_stream.ToArray();


                NetworkStream stream = client.GetStream();
                byte[] transmit_buffer = new byte[4];
                transmit_buffer[0] = (byte)(recv_image_array.Length & (0xFF));
                transmit_buffer[1] = (byte)((recv_image_array.Length >> 8) & (0xFF));
                transmit_buffer[2] = (byte)((recv_image_array.Length >> 16) & (0xFF));
                transmit_buffer[3] = (byte)((recv_image_array.Length >> 24) & (0xFF));
                // Sending first the 32bit length
                await stream.WriteAsync(transmit_buffer, 0, 4);
                // Sending data
                await stream.WriteAsync(recv_image_array, 0, recv_image_array.Length);
                // Receiving data
                var recv_buffer = await Receive(stream);

                Bitmap tx_image_array;
                using (var ms = new MemoryStream(recv_image_array))
                
                    tx_image_array = new Bitmap(ms);
                

                return tx_image_array;
                

            ,
            new ExecutionDataflowBlockOptions
            
                //BoundedCapacity = 10,
                CancellationToken = cancellationSource.Token
            );



            /****************** METHOD 2 - with send async  ***********/
            var video_loader = new ActionBlock<string>(async path =>
            

 

                Console.WriteLine("video_loader");
                capture = new VideoCapture(path);
                Mat matrix = new Mat();
                capture.Read(matrix);
                var mem_stream = new MemoryStream();

                while (matrix.Rows != 0 && matrix.Width != 0)
                

                    Console.WriteLine("Inside Loop");
                    capture.Read(matrix);
                    if (matrix.Rows == 0 && matrix.Width == 0) break;
                    Bitmap bitmap = new Bitmap(matrix.Width, matrix.Rows);
                    bitmap = matrix.ToBitmap();

                    await send_recv_block.SendAsync(bitmap);
                    await Task.Delay(20);
                    if (_canceller.Token.IsCancellationRequested) break;

                

            , new ExecutionDataflowBlockOptions 
            
                //BoundedCapacity = 10 ,
                CancellationToken = cancellationSource.Token
            );



            /* Video Loading TPL Block */
            var display_video = new ActionBlock<Bitmap>(async received_image =>
            
                Console.WriteLine("Inside Display Video");
                PicturePlot2.Image = received_image;
                await Task.Delay(25);
            ,
            new ExecutionDataflowBlockOptions()
            
                TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
                //BoundedCapacity = 10,
                CancellationToken = cancellationSource.Token
            );


            var linkOptions = new DataflowLinkOptions  PropagateCompletion = true ;


            /****************** METHOD 2 - with send async *************/
            Console.WriteLine("to Link");
            var send_recv_disposable = send_recv_block.LinkTo(display_video, linkOptions);
            Console.WriteLine("Video path" + video_path);
            //var apotelesma_post = video_loader.Post(video_path);
            var apotelesma_post = await video_loader.SendAsync(video_path);
            Console.WriteLine("Apotelesma Post "+ apotelesma_post);
            video_loader.Complete();
            try
            
                await display_video.Completion;
            
            catch (TaskCanceledException ex)
            
                Console.WriteLine(ex.CancellationToken.IsCancellationRequested);
                video_loader.Complete();
                send_recv_block.Complete();
                display_video.Complete();
                MessageBox.Show("Video Ended");
                
            

            
        
            
        private  void Stop_Reset_Click(object sender, EventArgs e)
        
            cancellationSource.Cancel();
            _canceller.Cancel();



        

提前致谢

【问题讨论】:

您使用了多少个CancellationTokenSources?这些块侦听未指定的cancellationSource,但该方法初始化_canceller。为什么不只使用 _canceller ?这段代码的编写方式,第二次调用该方法的块将使用已经取消的cancellationSource 【参考方案1】:

你没有显示你声明变量cancellationSource的位置,你提供给ExecutionDataFlowBlockOptions

当您向ExecutionDataFlowBlockOptions 提供取消令牌时,您是在告诉块在取消令牌时进入已完成状态,任务状态为已取消。文档告诉我们这是最终结果:

由于 CancellationToken 属性永久取消数据流块的执行,所以在用户取消操作后又想向管道添加更多工作项后,必须重新创建整个管道。[1]

因为您的停止按钮将此标记设置为取消,所以当您重新创建块时,它们开始处于取消状态。

_canceller = new CancellationTokenSource();上方需要添加cancellationSource = new CancellationTokenSource();

【讨论】:

以上是关于TPL Dataflow ,完成一个 Block ,重新创建一个 BLock的主要内容,如果未能解决你的问题,请参考以下文章

使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?

TPL-Dataflow 是不是适用于高并发应用程序?

TPL Dataflow BufferBlock 线程安全吗?

如何在 TPL/Dataflow 中发出笛卡尔积?

TPL Dataflow 如何与“全局”数据同步

TPL Dataflow LinkTo TransformBlock 非常慢