与记录器线程的多线程应用程序交互

Posted

技术标签:

【中文标题】与记录器线程的多线程应用程序交互【英文标题】:Multi-threaded application interaction with logger thread 【发布时间】:2010-12-09 11:33:10 【问题描述】:

我再次提出关于多线程的问题和我的并发编程课的练习。

我有一个多线程服务器 - 使用 .NET 异步编程模型 实现 - 使用 GET下载)和PUT上传) 文件服务。这部分已经完成并经过测试。

问题的陈述是说这个服务器必须有logging活动,对服务器响应时间的影响最小,并且应该由一个低优先级的线程支持 em> - 记录器线程 - 为此效果创建。所有记录消息都应由产生它们的线程传递给这个记录器线程,使用可能不的通信机制strong> 锁定调用它的线程(除了确保互斥的必要锁定)并假设某些日志消息可能会被忽略。

这是我目前的解决方案,请帮助验证这是否可以解决所述问题:

using System;
using System.IO;
using System.Threading;

// Multi-threaded Logger
public class Logger 
    // textwriter to use as logging output
    protected readonly TextWriter _output;
    // logger thread
    protected Thread _loggerThread;
    // logger thread wait timeout
    protected int _timeOut = 500; //500ms
    // amount of log requests attended
    protected volatile int reqNr = 0;
    // logging queue
    protected readonly object[] _queue;
    protected struct LogObj 
        public DateTime _start;
        public string _msg;
        public LogObj(string msg) 
            _start = DateTime.Now;
            _msg = msg;
        
        public LogObj(DateTime start, string msg) 
            _start = start;
            _msg = msg;
        
        public override string ToString() 
            return String.Format("0: 1", _start, _msg);
        
    

    public Logger(int dimension,TextWriter output) 
        /// initialize queue with parameterized dimension
        this._queue = new object[dimension];
        // initialize logging output
        this._output = output;
        // initialize logger thread
        Start();
    
    public Logger() 
        // initialize queue with 10 positions
        this._queue = new object[10];
        // initialize logging output to use console output
        this._output = Console.Out;
        // initialize logger thread
        Start();
    

    public void Log(string msg) 
        lock (this) 
            for (int i = 0; i < _queue.Length; i++) 
                // seek for the first available position on queue
                if (_queue[i] == null) 
                    // insert pending log into queue position
                    _queue[i] = new LogObj(DateTime.Now, msg);
                    // notify logger thread for a pending log on the queue
                    Monitor.Pulse(this);
                    break;
                
                // if there aren't any available positions on logging queue, this
                // log is not considered and the thread returns
            
        
    

    public void GetLog() 
        lock (this) 
            while(true) 
                for (int i = 0; i < _queue.Length; i++) 
                    // seek all occupied positions on queue (those who have logs)
                    if (_queue[i] != null) 
                        // log
                        LogObj obj = (LogObj)_queue[i];
                        // makes this position available
                        _queue[i] = null;
                        // print log into output stream
                        _output.WriteLine(String.Format("[Thread #0 | 1ms] 2",
                                                        Thread.CurrentThread.ManagedThreadId,
                                                        DateTime.Now.Subtract(obj._start).TotalMilliseconds,
                                                        obj.ToString()));
                    
                
                // after printing all pending log's (or if there aren't any pending log's),
                // the thread waits until another log arrives
                //Monitor.Wait(this, _timeOut);
                Monitor.Wait(this);
            
        
    

    // Starts logger thread activity
    public void Start() 
        // Create the thread object, passing in the Logger.Start method
        // via a ThreadStart delegate. This does not start the thread.
        _loggerThread = new Thread(this.GetLog);
        _loggerThread.Priority = ThreadPriority.Lowest;
        _loggerThread.Start();
    

    // Stops logger thread activity
    public void Stop() 
        _loggerThread.Abort();
        _loggerThread = null;
    

    // Increments number of attended log requests
    public void IncReq()  reqNr++; 


基本上,这里是这段代码的要点:

    启动一个低优先级的线程,循环日志队列并将待处理的日志打印到输出。在此之后,线程被挂起,直到新的log到达; 当日志到达时,记录器线程被唤醒并开始工作。

这个解决方案线程安全吗?我一直在阅读Producers-Consumers问题和解决算法,但是在这个问题中虽然我有多个生产者,但我只有一个阅读器。

【问题讨论】:

【参考方案1】:

它似乎应该工作。在单一消费者的情况下,生产者-消费者不应该发生太大变化。小挑剔:

获取锁可能是一项昂贵的操作(正如@Vitaliy Lipchinsky 所说)。我建议将您的记录器与天真的“直写”记录器和使用联锁操作的记录器进行基准测试。另一种选择是将现有队列与GetLog 中的空队列交换并立即离开临界区。这样,任何生产者都不会被消费者中的长时间操作阻塞。

创建 LogObj 引用类型(类)。让它成为 struct 毫无意义,因为无论如何你都在装箱。否则将_queue 字段设为LogObj[] 类型(无论如何这更好)。

将您的线程设置为背景,这样它就不会在Stop 不会被调用时阻止关闭您的程序。

刷新你的TextWriter。否则,即使是那些设法适合队列的记录,您也有丢失的风险(恕我直言,10 项有点小)

实现 IDisposable 和/或终结器。您的记录器拥有线程和文本编写器,这些应该被释放(并刷新 - 见上文)。

【讨论】:

感谢您的考虑。肯定会相应地更改我的代码。【参考方案2】:

虽然它看起来是线程安全的,但我认为它并不是特别理想。我会建议按照这些思路的解决方案

注意:只需阅读其他回复即可。下面是一个基于您自己的相当优化的、乐观的锁定解决方案。主要区别在于锁定内部类,最小化“关键部分”,并提供优雅的线程终止。如果您想完全避免锁定,那么您可以按照@Vitaliy Lipchinsky 的建议尝试一些不稳定的“非锁定”链表。

using System.Collections.Generic;
using System.Linq;
using System.Threading;

...

public class Logger

    // BEST PRACTICE: private synchronization object. 
    // lock on _syncRoot - you should have one for each critical
    // section - to avoid locking on public 'this' instance
    private readonly object _syncRoot = new object ();

    // synchronization device for stopping our log thread.
    // initialized to unsignaled state - when set to signaled
    // we stop!
    private readonly AutoResetEvent _isStopping = 
        new AutoResetEvent (false);

    // use a Queue<>, cleaner and less error prone than
    // manipulating an array. btw, check your indexing
    // on your array queue, while starvation will not
    // occur in your full pass, ordering is not preserved
    private readonly Queue<LogObj> _queue = new Queue<LogObj>();

    ...

    public void Log (string message)
    
        // you want to lock ONLY when absolutely necessary
        // which in this case is accessing the ONE resource
        // of _queue.
        lock (_syncRoot)
        
            _queue.Enqueue (new LogObj (DateTime.Now, message));
        
    

    public void GetLog ()
    
        // while not stopping
        // 
        // NOTE: _loggerThread is polling. to increase poll
        // interval, increase wait period. for a more event
        // driven approach, consider using another
        // AutoResetEvent at end of loop, and signal it
        // from Log() method above
        for (; !_isStopping.WaitOne(1); )
        
            List<LogObj> logs = null;
            // again lock ONLY when you need to. because our log
            // operations may be time-intensive, we do not want
            // to block pessimistically. what we really want is 
            // to dequeue all available messages and release the
            // shared resource.
            lock (_syncRoot)
            
                // copy messages for local scope processing!
                // 
                // NOTE: .Net3.5 extension method. if not available
                // logs = new List<LogObj> (_queue);
                logs = _queue.ToList ();
                // clear the queue for new messages
                _queue.Clear ();
                // release!
            
            foreach (LogObj log in logs)
            
                // do your thang
                ...
            
        
    

...
public void Stop ()

    // graceful thread termination. give threads a chance!
    _isStopping.Set ();
    _loggerThread.Join (100);
    if (_loggerThread.IsAlive)
    
        _loggerThread.Abort ();
    
    _loggerThread = null;

【讨论】:

您好。感谢您记得我使用优雅的线程终止,而不是强制的“中止”方法。我一定会采纳的! 乍一看,当我尝试按照您的建议调整我的解决方案时,我没有注意到我在您的代码中发现的一些问题。事实上,您可能不会对相应“锁定”范围之外的同步对象进行“脉冲”或“等待”。考虑到这一点,在“GetLog”方法上创建记录器队列的本地副本没有任何好处;在您的“日志”方法中,也无法减少队列上的锁定获取。另外,您引用的某些方法不存在。队列没有“ToList()”方法(只有“ToArray()”)和“Monitor.Pulse()”必须接收参数(同步对象)... 咳咳,重要的一课,永远不要“煽动”回应。 [正确地]使用监视器与使用锁同义。由于我们使用锁来保护我们的队列,Monitor 完全是多余的。 [从上述解决方案和\或您以前的解决方案中] 将其删除。这不是必需的。创建队列的本地副本有很多好处。切勿在关键部分执行任何处理。你想访问你的共享资源并释放它。处理[即写入控制台、文件、RPC 等] 可以稍后执行。 ToList 是一种扩展方法,在 .Net3.5 中可用,使用 System.Linq er,通过“翼”响应,显然是指我自己。应该早点查看监视器,会发现冗余。 :S【参考方案3】:

实际上,您在这里引入了锁定。将日志条目推送到队列时有锁定(Log 方法):如果 10 个线程同时将 10 个项目推送到队列并唤醒 Logger 线程,则第 11 个线程将等待,直到 logger 线程记录所有项目...

如果您想要一些真正可扩展的东西 - 实现无锁队列(示例如下)。使用无锁队列同步机制将非常直接(您甚至可以使用单个等待句柄进行通知)。

如果您无法在 Web 中找到无锁队列实现,请参考以下方法: 使用链表实现。链表中的每个节点都包含一个 value 和对下一个节点的 volatile 引用。因此,对于入队和出队操作,您可以使用 Interlocked.CompareExchange 方法。我希望,这个想法很清楚。如果没有,请告诉我,我会提供更多详细信息。

【讨论】:

您好。非常感谢您的回复,但我真的很想避免使用互锁机制的解决方案。练习声明中说我可以使用锁定,尽管仅用于互斥目的。难道我不能考虑我的解决方案的“锁定”机制来满足这个目的吗,因为我只是想确保互斥访问日志队列? @XpiritO:如果没有 Interlocked 指令(或其 GCC 等效指令),您将无法进行任何多处理器安全无锁工作。如果您使用的是互斥锁,则不需要互锁。但 Vitaliy 的建议是建立一个无锁队列。 好的,如果您没有关键的性能要求 - 您的代码乍一看看起来是线程安全的。【参考方案4】:

我只是在这里做一个思想实验,因为我现在没有时间实际尝试代码,但我认为如果你有创意的话,你完全可以不加锁地做到这一点。

让您的日志记录类包含一个在每次调用时分配队列和信号量的方法(以及另一个在线程完成时释放队列和信号量的方法)。想要进行日志记录的线程将在启动时调用此方法。当他们想要记录时,他们将消息推送到自己的队列中并设置信号量。记录器线程有一个贯穿队列并检查相关信号量的大循环。如果与队列关联的信号量大于零,则队列被弹出并且信号量减少。

因为在设置信号量之前您不会尝试将内容从队列中弹出,并且在将内容推送到队列之前您不会设置信号量,我认为这将是安全的。根据队列类的 MSDN 文档,如果您正在枚举队列并且另一个线程修改了集合,则会引发异常。抓住那个例外,你应该很好。

【讨论】:

以上是关于与记录器线程的多线程应用程序交互的主要内容,如果未能解决你的问题,请参考以下文章

编程思想之多线程与多进程——Java中的多线程

CoppeliaSim(Vrep)与VS使用多线程交互

CoppeliaSim(Vrep)与VS使用多线程交互

进行概念详解 多线程上篇

Java多线程,线程安全与不安全的理解,程序的多线程并发编程的基础概念,进程与线程的区别是什么

Java多线程,线程安全与不安全的理解,程序的多线程并发编程的基础概念,进程与线程的区别是什么