Limited Concurrency Level Task Scheduler에 대한 의혹
9797 단어 concurrency
1. Limited Concurrency LevelTaskScheduler 소개
이TaskScheduler가 사용한 것은 마이크로소프트에서 시작된 작업 스케줄러입니다. 이 스케줄러의 코드는 매우 간단하고 이해하기 쉽습니다. 그러나 제가 이해하지 못한 것은 그가 어떻게 병발수를 제한하는지 먼저 그 코드를 붙이고 여러분이 먼저 익히세요.
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
/// <summary>Whether the current thread is processing work items.</summary>
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
/// <summary>The list of tasks to be executed.</summary>
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
/// <summary>The maximum concurrency level allowed by this scheduler.</summary>
private readonly int _maxDegreeOfParallelism;
/// <summary>Whether the scheduler is currently processing work items.</summary>
private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
/// <summary>
/// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
/// specified degree of parallelism.
/// </summary>
/// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
/// <summary>
/// current executing number;
/// </summary>
public int CurrentCount { get; set; }
/// <summary>Queues a task to the scheduler.</summary>
/// <param name="task">The task to be queued.</param>
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
Console.WriteLine("Task Count : {0} ", _tasks.Count);
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
int executingCount = 0;
private static object executeLock = new object();
/// <summary>
/// Informs the ThreadPool that there's work to be executed for this scheduler.
/// </summary>
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
/// <summary>Attempts to execute the specified task on the current thread.</summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued"></param>
/// <returns>Whether the task could be executed on the current thread.</returns>
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued) TryDequeue(task);
// Try to run the task.
return base.TryExecuteTask(task);
}
/// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
/// <param name="task">The task to be removed.</param>
/// <returns>Whether the task could be found and removed.</returns>
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
/// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
/// <returns>An enumerable of the tasks currently scheduled.</returns>
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks.ToArray();
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
간단한 사용
다음은 호출 코드입니다.
static void Main(string[] args)
{
TaskFactory fac = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(5));
//TaskFactory fac = new TaskFactory();
for (int i = 0; i < 1000; i++)
{
fac.StartNew(s => {
Thread.Sleep(1000);
Console.WriteLine("Current Index {0}, ThreadId {1}",s,Thread.CurrentThread.ManagedThreadId);
}, i);
}
Console.ReadKey();
}
호출은 간단합니다. 디버깅 호출 순서에 따라 알 수 있습니다.Limited Concurrency Level Task Scheduler를 사용하여Task Factory를 만든 후 이Task Facotry를 호출합니다.StartNew 메소드 이후.Limited Concurrency Level Task Scheduler의Queue Task 방법에 들어갑니다.
/// <summary>Queues a task to the scheduler.</summary>
/// <param name="task">The task to be queued.</param>
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
Console.WriteLine("Task Count : {0} ", _tasks.Count);
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
코드는 간단합니다. 새로 만든Task를 작업 대기열에 추가한 다음 현재 실행 중인 작업의 수량과 설정된 최대 병렬 수를 비교합니다. 이 값보다 작으면 마운트 중인 작업의 실행을 알립니다.나의 의문은 주로 Notify Thread Pool Offending Work라는 방법에 있다.
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
코드에서 본 뜻은 계속 사순환을 하며 끊임없이tasks에서 Task를 꺼내서 까지 실행합니다task가 비어 있을 때까지 순환을 종료합니다.여기서 병발수를 제한하는 제한을 보지 못했다. Queue Task에서 호출할 때만 간단한 제한이 있지만 아무런 소용이 없는 것 같다. Notify Thread Pool Offending Work 방법이 시작되면 모든 Task가 실행될 때까지 계속 뛰기 때문이다.그럼 그의 병발수는 어떻게 제한됩니까?
항상 헷갈려요. 제가 잘못 이해했나 봐요. 아신 하나님께 헷갈려요.
~~ 정말 취했어요.markdown을 잘 사용하지 못해서 문제가 있습니다.불편하면 여기https://www.zybuluo.com/kevinsforever/note/115066를 보고 편집기로 썼지만 복사해서 제대로 표시할 수 없습니다.3Q .
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Java의 가상 스레드변경 사항은 최소이며 클래식 ThreadPool 대신 newVirtualThreadPerTaskExecutor를 사용할 수 있습니다. 그게 다야! 이제 Java 19 덕분에 경량 동시성 모델(Kotlin의 코루틴과 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.