C# - Blocking Queue Implementation
12888 단어 C#
Ah, Microsoft has actually inventted a very smart trick here because if you take a look at the signature of the BlockingCollection, it is somethig like this:
[DebuggerDisplay("Count = {Count}, Type = {m_collection}")]
[DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))]
[ComVisible(false)]
public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
{
public BlockingCollection();
public BlockingCollection(int boundedCapacity);
public BlockingCollection(IProducerConsumerCollection<T> collection);
public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity);
// ...
}
as you can see some overload of the constructors takes a IProducerConsumerCollection
public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IEnumerable<T>, ICollection, IEnumerable
{
//...
}
Ah, so ConcurrentQueue is an instance of IProducerConsumerCollection
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{
void CopyTo(T[] array, int index);
T[] ToArray();
bool TryAdd(T item);
bool TryTake(out T item);
}
So, basically IProducerConsumerCollction provides a TryAdd/TryTake pair with additional methods to supports copy to/from array.So, back to our question, it sounds like that BlockingCollection is something that if you don't provide a underlying ProducerConsumer collection it will use a default one (not sure if that one is a FIFO, or FILO - you simply cannot make the assumption), but if you did, it will use the underlying collection that you provided (which means it will use the take/put characteristic of underlying collection, which means if you pass in a FIFO collection, then take/put will work in a FIFO manner ... The BlockingQueue serves as a container/wrapper of a ConcurrencyCollection with additional blocking ability.Knowing that the work sounds almost trivial, you simply put a ConcurrentQueue
public class BlockingQueue<T> : BlockingCollection<T>
{
#region ctor(s)
public BlockingQueue() : base(new ConcurrentQueue<T>())
{
}
public BlockingQueue (int maxSize) : base (new ConcurrentQueue<T>(), maxSize)
{
}
#endregion ctor(s)
#region Methods
/// <summary>
/// Enqueue an Item
/// </summary>
/// <param name="item">Item to enqueue</param>
/// <remarks>blocks if the blocking queue is full</remarks>
public void Enqueue(T item)
{
Add(item);
}
/// <summary>
/// Dequeue an item
/// </summary>
/// <param name="Item"></param>
/// <returns>Item dequeued</returns>
/// <remarks>blocks if the blocking queue is empty</remarks>
public T Dequeue()
{
return Take();
}
#endregion Methods
}
Ah, we explicitly add Enquee and Dequeue to mock up a Queue operation (you can use directly the Take or Add methods as you like) Let's put this into a multiple thread environment and give it a spin.First, we declare a Blocking Collection.
public List<HashSet<ITabularPushCallback>> callbackChannelLists = new List<HashSet<ITabularPushCallback>>();
// this is the Blocking Queue List.
public List<BlockingQueue<string[][]>> messageQueues = new List<BlockingQueue<string[][]>>();
and If there is data coming, one background thread will send a background thread to do Enqueue.
public void NotifyMesasge(string[][] messages, int tableId)
{
if (IsChannelRegistered(tableId))
{
HashSet<ITabularPushCallback> callbackChannelList = null;
BlockingQueue<string[][]> queue = null;
lock (SyncObj)
{
callbackChannelList = new HashSet<ITabularPushCallback>(callbackChannelLists[tableId]);
queue = messageQueues[tableId];
}
if (callbackChannelList.Count > 0 && queue != null)
{
ThreadPool.QueueUserWorkItem((o =>
{
if (queue != null)
queue.Enqueue(messages);
}), null);
}
}
else
{
// throw or swallow?
//throw new ArgumentOutOfRangeException("tableId", tableId, "Invalid callback channel");
}
}
and there a dedicated Background thread on parameterized on each "tableId", which peeks data from the Queue and do processing, here is the main code that does the Dequeue and processing.
private void NotifyMessageThunk(int tableId)
{
HashSet<ITabularPushCallback> callbackChannelList = null;
BlockingQueue<string[][]> queue = null;
lock (SyncObj)
{
if (tableId < 0 || tableId > callbackChannelLists.Count) throw new ArgumentOutOfRangeException("tableId", tableId, "Expected nonnegative number and existing tableId!");
if (!IsChannelRegistered(tableId))
{
Thread.Sleep(100); // CPU effecient means.
return;
}
callbackChannelList = GetCallbackChannel(tableId);
queue = messageQueues[tableId];
if (queue == null)
{
Thread.Sleep(100); // CPU effecient boosts
return;
}
}
string[][] message = queue.Dequeue();
if (message != null)
{
HashSet<ITabularPushCallback> channelCopy = null;
channelCopy = new HashSet<ITabularPushCallback>(callbackChannelList);
foreach (var channel in channelCopy)
{
try
{
channel.NotifyMessage(message, tableId);
}
catch
{
// swallow?
if (!TryRemoveCallbackChannel(channel, tableId))
{
// Logs
}
}
}
}
}
Pretty easy, isn't it?Actually you can make you own blocking queue with primitives of WaitHandles , something as shown in the references list [0] where you can do something simiar to
class SizeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int maxSize;
public SizeQueue(int maxSize) { this.maxSize = maxSize; }
public void Enqueue(T item)
{
lock (queue)
{
while (queue.Count >= maxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if (queue.Count == 1)
{
// wake up any blocked dequeue
Monitor.PulseAll(queue);
}
}
}
public T Dequeue()
{
lock (queue)
{
while (queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return item;
}
}
}
The author, Marc Gravell, actually proposed a improved version which enales exiting cleanly. Below is his idea.
bool closing;
public void Close()
{
lock(queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return true;
}
}
And someone has geniously knocked up with Reactive extension, something such as this:
public class BlockingQueue<T>
{
private readonly Subject<T> _queue;
private readonly IEnumerator<T> _enumerator;
private readonly object _sync = new object();
public BlockingQueue()
{
_queue = new Subject<T>();
_enumerator = _queue.GetEnumerator();
}
public void Enqueue(T item)
{
lock (_sync)
{
_queue.OnNext(item);
}
}
public T Dequeue()
{
_enumerator.MoveNext();
return _enumerator.Current;
}
}
However, we can work more on the synchronization.
References:
Creatingg a blocking Queue<T> in .NET
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
WebView2를 Visual Studio 2017 Express에서 사용할 수 있을 때까지Evergreen .Net Framework SDK 4.8 VisualStudio2017에서 NuGet을 사용하기 때문에 패키지 관리 방법을 packages.config 대신 PackageReference를 사용해야...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.