csharp .NET 4 BlockingCollection的生产者/消费者实现
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了csharp .NET 4 BlockingCollection的生产者/消费者实现相关的知识,希望对你有一定的参考价值。
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ProducerConsumer
{
class QueueWorker : IDisposable
{
private readonly CancellationTokenSource _cancelTokenSource = new CancellationTokenSource();
private readonly Task _messageHandlerTask;
public QueueWorker(Action<object> messageHandler)
{
Id = Guid.NewGuid().ToString();
_messageHandlerTask = Task.Factory.StartNew(messageHandler, _cancelTokenSource.Token, TaskCreationOptions.LongRunning);
}
public string Id
{
get;
private set;
}
public void Stop()
{
_cancelTokenSource.Cancel();
_messageHandlerTask.Wait();
}
public void Dispose()
{
_cancelTokenSource.Dispose();
_messageHandlerTask.Dispose();
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
namespace ProducerConsumer
{
/// <summary>
/// Queue Service
/// </summary>
public class QueueService<TMessage> : IDisposable where TMessage : class
{
private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
private readonly List<QueueWorker> _workers = new List<QueueWorker>();
private readonly Action<TMessage> _messageHandler;
/// <summary>
/// Initializes a new instance of the <see cref="QueueService<TMessage>"/> class.
/// </summary>
/// <param name="messageHandler">The message handler.</param>
public QueueService(Action<TMessage> messageHandler)
{
_messageHandler = messageHandler;
}
/// <summary>
/// Queues the specified message.
/// </summary>
/// <param name="message">The message.</param>
public void Queue(TMessage message)
{
_messages.Add(message);
}
/// <summary>
/// How many messages are left in the queue to be sent
/// </summary>
public int QueueLength
{
get { return _messages.Count; }
}
/// <summary>
/// Gets a value indicating whether this instance is running.
/// </summary>
/// <value>
/// <c>true</c> if this instance is running; otherwise, <c>false</c>.
/// </value>
public bool IsRunning { get; private set; }
/// <summary>
/// How many worksers are currently running
/// </summary>
public int NumberOfWorkers
{
get { return _workers.Count; }
}
/// <summary>
/// Starts the specified number of workers ready to send queued messages
/// </summary>
/// <param name="numberOfWorkers">Number of Workers</param>
public void Start(int numberOfWorkers = 1)
{
IsRunning = true;
SetNumberOfWorkers(numberOfWorkers);
}
/// <summary>
/// Stops all workers and the service, without waiting for queued messages to be sent.
/// </summary>
public void Stop()
{
IsRunning = false;
SetNumberOfWorkers(0);
}
/// <summary>
/// Increase or decrease the number of workers to process queued messages.
/// </summary>
/// <param name="value">New Value for the number of workers to use</param>
public void SetNumberOfWorkers(int value)
{
lock (_workers)
{
while (_workers.Count > value)
{
_workers[0].Stop();
_workers.RemoveAt(0);
}
while (_workers.Count < value)
{
_workers.Add(new QueueWorker(MessageHander));
}
}
}
private void MessageHander(object state)
{
var cancelToken = (CancellationToken)state;
while (!cancelToken.IsCancellationRequested)
{
var message = default(TMessage);
if (!cancelToken.IsCancellationRequested && (message = _messages.Take()) != default(TMessage))
{
_messageHandler(message);
}
}
}
~QueueService()
{
Dispose(false);
}
protected virtual void Dispose(bool disposing)
{
_messages.Dispose();
_workers.ForEach(w => w.Dispose());
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
}
以上是关于csharp .NET 4 BlockingCollection的生产者/消费者实现的主要内容,如果未能解决你的问题,请参考以下文章
升级到 .NET 4.0 后 C# 动态 JObject 不起作用
csharp C#:在.NET中合并,追加,扩展两个数组(csharp,mono)
csharp .NET Standard如何与.NET平台相关
csharp .NET Standard如何与.NET平台相关