Linux fifo 缓冲采坑

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux fifo 缓冲采坑相关的知识,希望对你有一定的参考价值。

参考技术A 本文记录了我在使用 fifo(命名管道)的过程中由于缓冲踩的坑。

事情的经过是这样的:我有一个 python 脚本,它会实时向标准输出打印数据,现在我想把这些打印到的标准输出数据导入 FIFO 中,并且在另一个 Qt 程序中从 FIFO 读取数据,于是我开始了下述操作。

首先用 mkfifo tmp.fifo 创建了一个 FIFO 文件,然后以下述方式运行我的 python 脚本

另一边创建了一个 Qt 程序打开这个 FIFO 读取数据

两个程序都运行后,出现了一种比较诡异的现象:python 脚本向 FIFO 写入了大量数据后,Qt 程序才会从 readLine() 函数返回,正常情况下应该是 FIFO 写入一行数据,Qt 程序就能够从 FIFO 读出一行数据,但实际情况不是这样,当 FIFO 被写入了大量数据后,才允许从 FIFO 另一端读取数据。

我琢磨了一下午才想到是缓冲区的问题。

C 标准库 IO 函数分为三种缓冲类型:

那么 FIFO 是属于哪一种缓冲区呢?我在 APUE 里找到了下面这几句话:

因此,FIFO 是全缓冲设备,这也解释了为什么上面介绍的 python 脚本将数据打印到标准输出时数据是一行一行的显示,而打印到 FIFO 时,数据只能是一大块一大块的读取。

找到了问题的缘由,我更改了 python 脚本的 print() 函数:

每写入一行数据,自动刷洗数据,这样就成功解决了问题。

线程安全 FIFO 队列/缓冲区

【中文标题】线程安全 FIFO 队列/缓冲区【英文标题】:Threadsafe FIFO Queue/Buffer 【发布时间】:2012-09-04 17:10:37 【问题描述】:

我需要实现一种任务缓冲区。基本要求是:

在单个后台线程中处理任务 从多个线程接收任务 处理所有接收到的任务,即确保在收到停止信号后缓冲区中的缓冲任务已耗尽 必须保持每个线程接收的任务顺序

我正在考虑使用如下所示的队列来实现它。将不胜感激有关实施的反馈。还有其他更好的想法来实现这样的事情吗?

public class TestBuffer

    private readonly object queueLock = new object();
    private Queue<Task> queue = new Queue<Task>();
    private bool running = false;

    public TestBuffer()
    
    

    public void start()
    
        Thread t = new Thread(new ThreadStart(run));
        t.Start();
    

    private void run()
    
        running = true;

        bool run = true;
        while(run)
        
            Task task = null;
            // Lock queue before doing anything
            lock (queueLock)
            
                // If the queue is currently empty and it is still running
                // we need to wait until we're told something changed
                if (queue.Count == 0 && running)
                
                    Monitor.Wait(queueLock);
                

                // Check there is something in the queue
                // Note - there might not be anything in the queue if we were waiting for something to change and the queue was stopped
                if (queue.Count > 0)
                
                    task = queue.Dequeue();
                
            

            // If something was dequeued, handle it
            if (task != null)
            
                handle(task);
            

            // Lock the queue again and check whether we need to run again
            // Note - Make sure we drain the queue even if we are told to stop before it is emtpy
            lock (queueLock)
            
                run = queue.Count > 0 || running;
            
        
    

    public void enqueue(Task toEnqueue)
    
        lock (queueLock)
        
            queue.Enqueue(toEnqueue);
            Monitor.PulseAll(queueLock);
        
    

    public void stop()
    
        lock (queueLock)
        
            running = false;
            Monitor.PulseAll(queueLock);
        
    

    public void handle(Task dequeued)
    
        dequeued.execute();
    

【问题讨论】:

Queues And Wait Handles in C# 的可能重复项 你想在 stop() 被调用后允许入队吗? 【参考方案1】:

您实际上可以使用开箱即用的BlockingCollection 来处理这个问题。

它被设计成有 1 个或多个生产者,以及 1 个或多个消费者。在您的情况下,您将有多个生产者和一个消费者。

当您收到停止信号时,使用该信号处理程序

信号生产者线程停止 在 BlockingCollection 实例上调用 CompleteAdding

消费者线程将继续运行,直到所有排队的项目都被移除并处理完毕,然后它会遇到 BlockingCollection 完成的条件。当线程遇到该条件时,它就会退出。

【讨论】:

感谢您的建议。不幸的是,我仅限于 .Net 3.5,因此排除了 BlockingCollection。抱歉,我应该在要求中提到这一点;)【参考方案2】:

你应该想想ConcurrentQueue,实际上是先进先出。如果不合适,请尝试Thread-Safe Collections中的一些亲戚。使用这些可以避免一些风险。

【讨论】:

【参考方案3】:

我建议你看看TPL DataFlow。 BufferBlock 是您正在寻找的,但它提供的更多。

【讨论】:

感谢您的建议。不幸的是,我仅限于 .Net 3.5,因此排除了这些。抱歉,我应该在要求中提到这一点;)【参考方案4】:

您可以为此在 .NET 3.5 上使用 Rx。它可能永远不会出现在 RC 中,但我相信它是稳定的*并且被许多生产系统使用。如果您不需要 Subject,您可能会发现 .NET 3.5 的原语(如并发集合),您可以使用 .NET Framework 4.0 之前未附带的原语。

Alternative to Rx (Reactive Extensions) for .net 3.5

* - Nit Picker 的角落:除了可能超出范围的高级时间窗口,但缓冲区(按计数和时间)、排序和调度程序都是稳定的。

【讨论】:

【参考方案5】:

看看我的线程安全 FIFO 队列的轻量级实现,它是一个使用线程池的非阻塞同步工具 - 在大多数情况下比创建自己的线程更好,并且比使用阻塞同步工具作为锁和互斥锁更好。 https://github.com/Gentlee/SerialQueue

用法:

var queue = new SerialQueue();
var result = await queue.Enqueue(() => /* code to synchronize */);

【讨论】:

根据guidelines,Enqueue方法应该命名为EnqueueAsync @TheodorZoulias 为什么 Task.Run 和 TaskFactory.StartNew 没有异步呢?与 Task.ContinueWith 相同。应该是 RunAsync、StartNewAsync、ContinueWithAsync。 @TheodorZoulias 而且,实际的入队是同步发生的,所以在方法完成后动作已经入队。 如果Enqueue 同步运行,那么返回Task 是没有意义的。这个Task 应该代表什么?异步完成什么? 加入任务队列是一回事,任务完成是另一回事。入队是同步的,但完成不是。如果你愿意,你可以等待完成。与 TaskFactory.StartNew 相同 - 任务创建和启动是同步的,但完成不是。它还返回任务。

以上是关于Linux fifo 缓冲采坑的主要内容,如果未能解决你的问题,请参考以下文章

linux 有名管道(FIFO)

环形缓冲器(FIFO)转

golang fifo 缓冲通道

线程安全 FIFO 队列/缓冲区

基于Verilog的带FIFO输出缓冲的串口接收接口封装

从 ipc fifo 文件描述符读取缓冲区时未初始化 QDataStream