스레드 풀 작업 대기열
unit uPool;
{***********************************************************************
+
==========================================================
| ----- ---------------------- |
| | | | ←---- | ⑴ |
| | | ---------------------- |
| | | ↑ |
| | |--①-- , |
| | | | |
| | | ↓② ---------------- |
| | | ----------------------- | | |
| | | | | | | |
| | | ----------------------- ---------------- |
| | | ③ | | |
| ----- ←----------| | |
| | | |
| -----------------------------------------| |
| ④ |
| |
==========================================================
:
pool = TThreadPool.Create;
pool.MinNums := 2; //
pool.MaxNums := 6; //
pool.TasksCacheSize := 10; //
,
pool.AddWorkTask(Task);
。
,
,
。
, TWorkTask 。
exectask; 。 ,
;
for i := 0 to 5000 do
begin
if tk.WorkState = tsFinished then break;
inc(k);
//caption := inttostr(k);
edit2.Text := inttostr(k);
end;
:TWirteFileTask = Class(TWorkTask);
9-23:
BUG
1. MIN 。
2. BUG。
3. 。
:
@RightCopy fsh
QQ: 19985430
date: 2012-09-22
Email:[email protected]
***********************************************************************}
interface
uses
Classes,Windows,SysUtils,Messages,SyncObjs;
Const
PRE_NUM = 5;
MAX_NUM = 100;
AUTO_FREE = 2;
MAX_TASKNUM = 100;
ONEMINUTE = 10000;//60000;
type
TLogLevel = (lDebug,lInfo,lError);
ILog = interface
procedure WriteLog(Const Msg:String;Level:TLogLevel = lDebug);
end;
TPoolLog = Class(TInterfacedObject,ILog)
private
procedure WriteLog(Const Msg:String;Level:TLogLevel = lDebug);
public
procedure OutputLog(Const Msg:String;Level:TLogLevel);virtual;
End;
TPoolException = class(Exception)
end;
Thandles = Array of Cardinal;
// 。
TTaskLevel = (tlLower,tlNormal,tlHigh);
TTaskState = (tsNone,tsDoing,tsWaiting,tsReStart,tsStop,tsFinished);
TWorkTask = Class
private
Work:TThread;
// ID
hTask:TCriticalSection;
FWorkId:Cardinal;
FWorkName:String;
FWorkLevel:TTaskLevel; //
FWorkState : TTaskState;
procedure setWorkState(Const Value:TTaskState);
public
Constructor Create;
Destructor Destroy;override;
procedure execTask;virtual; abstract;
property WorkId:Cardinal read FWorkId write FWorkId;
property WorkName:String read FWorkName write FWorkName;
property WorkLevel:TTaskLevel read FWorkLevel write FWorkLevel;
property WorkState : TTaskState read FWorkState write setWorkState;
End;
TWorkTaskQueue = Array of TWorkTask;
TThreadPool = Class;
TWorkThreadState = (wtIdle,wtRunning,wtStop,wtFinished);
// ( task)
TWorkThread = Class(TThread)
private
FPool:TThreadPool;
FState:TWorkThreadState;
procedure SetDefault;
protected
procedure Execute;override;
public
Constructor Create(Const pool:TThreadPool);
property State : TWorkThreadState read FState write FState;
End;
TWorkThreadQueue = Array of TWorkThread;
//
TListenCacheInfoEvent = procedure (Sender:TObject;Const IdleCount,BusyCount,TaskCount:Integer) of Object;
TTaskQueueFullEvent = procedure (Sender:TObject) of Object;
//
TTaskFinishedEvent = procedure (Const cTast:TWorkTask) of object;
//
TTaskWillDoBeforeEvent = procedure (Const thId:Cardinal;Const cTast:TWorkTask) of Object;
// , , 。
TSortTaskQueueEvent = procedure (Sender:TObject;var taskQueue:TWorkTaskQueue) of object;
TThreadPool = Class
private
Log:TPoolLog;
//
FAuto:Boolean;
//
FWaitFlag:Boolean;
//
Waiting:TWorkThread;
//
entTaskNotify:Tevent;
// HANDLE
hTimeJump:Cardinal;
//
FSorted:Boolean;
//
hIDleLock:TCriticalSection;
//
hBusyLock:TCriticalSection;
//
hTaskLock:TCriticalSection;
// 5 Max
FMinNums:Integer;
// , 100
FMaxNums:Integer;
// 100
FTasksCache:Integer;
// XX :
FRecoverInterval:Integer;
// ( ),
FIsAllowTheSameTask:Boolean;
// ( ) 100 。 100 ,
// , 。
TaskQueue:TWorkTaskQueue;
//
BusyQueue:TWorkThreadQueue;
//
IdleQueue:TWorkThreadQueue;
//************************ **********************//
//
FOnSortTask:TSortTaskQueueEvent;
FOnTaskWillDo:TTaskWillDoBeforeEvent;
FOnTaskFinished:TTaskFinishedEvent;
FOnTaskFull:TTaskQueueFullEvent;
FOnListenInfo:TListenCacheInfoEvent;
//*****************************************************//
//************************Get/Set *******************//
procedure SetMinNums(Const Value:Integer);
procedure SetMaxNums(Const Value:Integer);
function getTaskQueueCount: Integer;
function getBusyQueueCount: Integer;
function getIdleQueueCount: Integer;
//*****************************************************//
//*********************** ********************//
procedure CreateLock;
procedure FreeLock;
//*****************************************************//
//
procedure SetDefault;
//
procedure DoTaskFull;
//******************** **********************//
//
procedure ClearQueue(var Queue:TWorkThreadQueue);
//
function QueueSize(Const Queue:TWorkThreadQueue):Integer;
//
procedure DelQueueOfIndex(var Queue:TWorkThreadQueue;Const Index:Integer);
// ;
procedure MoveQueue(Const wt:TWorkThread;flag:Integer);
//
procedure RemoveFromQueue(var Queue:TWorkThreadQueue;Const re:TWorkThread);
//*****************************************************//
//******************** **********************//
// 。//
procedure SortTask(var Queue:TWorkTaskQueue);
//
procedure DelTaskOfIndex(var Queue:TWorkTaskQueue;Const Index:Integer);
//
function TaskSzie(Const Queue:TWorkTaskQueue):Integer;
//*****************************************************//
// ( , )
function FindTask(Const tsk:TWorkTask):Integer;
//
procedure QuikeSortTask(var Queue:TWorkTaskQueue;Const s,e:Integer);
//
procedure RecoverIDle(Const wait:TWorkThread);
//
procedure switch(var Queue: TWorkTaskQueue; m, n: Integer);
//
function WaitAutoRecover(Const curThread:TWorkThread):Boolean;
protected
//
function Smaller(Const expresion:Boolean;Const tureValue,falseValue:Integer):Integer;
//
function PickupTask:TWorkTask;
//
procedure CreateIdleThread(Const Nums:Integer = 1);
//
procedure AddThreadToIdleQueue(Const idle:TWorkThread);
//
procedure AddThreadToBusyQueue(Const busy:TWorkThread);
//
procedure PostNewTaskSign;
public
Constructor Create;
Destructor Destroy;override;
//*********************** ******************************//
//
procedure StopAll;
//
procedure StartAll;
//
procedure CleanTasks;
//
function SwitchTasks(Const aTask,bTask:TWorkTask):Boolean;
//
procedure RemoveTask(Const tk:TWorkTask);//
//
procedure ListenPool;
//******************************************************************//
//
function AddWorkTask(Const wtask:TWorkTask):Integer;
property MinNums:Integer read FMinNums write SetMinNums;
property MaxNums:Integer read FMaxNums write SetMaxNums;
property TasksCacheSize:Integer read FTasksCache write FTasksCache;
property RecoverInterval:Integer read FRecoverInterval
write FRecoverInterval;
property IsAllowTheSameTask:Boolean read FIsAllowTheSameTask
write FIsAllowTheSameTask;
property Sorted:Boolean read FSorted write FSorted;
property TaskQueueCount:Integer read getTaskQueueCount;
property IdleQueueCount:Integer read getIdleQueueCount;
property BusyQueueCount:Integer read getBusyQueueCount;
property OnSortTask:TSortTaskQueueEvent read FOnSortTask write FOnSortTask;
property OnTaskWillDo:TTaskWillDoBeforeEvent read FOnTaskWillDo write FOnTaskWillDo;
property OnTaskFinished:TTaskFinishedEvent read FOnTaskFinished write FOnTaskFinished;
property OnTaskFull:TTaskQueueFullEvent read FOnTaskFull write FOnTaskFull;
property OnListenInfo:TListenCacheInfoEvent read FOnListenInfo write FOnListenInfo;
End;
implementation
{ TThreadPool }
constructor TThreadPool.Create;
var
tpError:Cardinal;
begin
Log:=TPoolLog.Create;
SetDefault;
CreateLock;
tpError := 0;
entTaskNotify:=Tevent.create(nil,false,false, 'TaskNotify');//
hTimeJump := CreateEvent(nil,False,False,'Timer');//
if hTimeJump = 0 then
tpError := GetLastError;
//the same name of sign exists.
Case tpError of
ERROR_ALREADY_EXISTS:
begin
hTimeJump := 0;
Log.WriteLog('CreateTimerEvent Fail,the Same Name of Event Exists');
end;
End;
//
CreateIdleThread(FMinNums);
Log.WriteLog('Thread Pool start run.',lInfo);
end;
destructor TThreadPool.Destroy;
begin
ClearQueue(IdleQueue);
ClearQueue(BusyQueue);
FreeLock;
if hTimeJump > 0 then
CloseHandle(hTimeJump);
entTaskNotify.Free;
Log.Free;
inherited;
Log.WriteLog('Thread Pool end run.',lInfo);
end;
procedure TThreadPool.DoTaskFull;
begin
if Assigned(FOnTaskFull) then
FOnTaskFull(self);
end;
procedure TThreadPool.SetDefault;
begin
FMinNums := PRE_NUM;
FMaxNums := MAX_NUM;
FTasksCache := MAX_TASKNUM;
FRecoverInterval := AUTO_FREE;
FIsAllowTheSameTask := False;
FAuto :=False;
FWaitFlag := True;
Waiting := nil;
FSorted := False;
end;
procedure TThreadPool.CreateLock;
begin
hIDleLock := TCriticalSection.Create;
hBusyLock := TCriticalSection.Create;
hTaskLock := TCriticalSection.Create;
end;
procedure TThreadPool.FreeLock;
begin
hIDleLock.Free;
hBusyLock.Free;
hTaskLock.Free;
end;
function TThreadPool.getBusyQueueCount: Integer;
begin
Result := QueueSize(BusyQueue);
end;
function TThreadPool.getIdleQueueCount: Integer;
begin
Result := QueueSize(IdleQueue);
end;
function TThreadPool.getTaskQueueCount: Integer;
begin
Result := TaskSzie(TaskQueue);
end;
procedure TThreadPool.CleanTasks;
begin
hTaskLock.Enter;
SetLength(TaskQueue,0);
hTaskLock.Leave;
end;
procedure TThreadPool.ListenPool;
begin
// , ,
if Assigned(FOnListenInfo) then
FOnListenInfo(self,IdleQueueCount,BusyQueueCount,TaskQueueCount);
end;
procedure TThreadPool.ClearQueue(var Queue: TWorkThreadQueue);
var
i:Integer;
sc:Integer;
begin
sc := Length(Queue);
for i := 0 to sc - 1 do
begin
TWorkThread(Queue[i]).Terminate;
PostNewTaskSign;
//TWorkThread(Queue[i]).Free; // FreeOnTerminate TRUE 。
end;
SetLength(Queue,0);
end;
procedure TThreadPool.SetMaxNums(const Value: Integer);
begin
if Value FMaxNums then
//to do tips Error;
else if Value <= 0 then
FMinNums := PRE_NUM
else
FMinNums := Value;
ClearQueue(IDleQueue);
CreateIdleThread(FMinNums);
Log.WriteLog('Reset MinNums Numbers is ' + inttostr(FMinNums) + ' .',lInfo);
end;
function TThreadPool.Smaller(const expresion: Boolean; const tureValue,
falseValue: Integer): Integer;
begin
if expresion then
result := tureValue
else
result := falseValue;
end;
procedure TThreadPool.DelQueueOfIndex(var Queue: TWorkThreadQueue;
const Index: Integer);
var
i:integer;
ic:integer;
begin
ic := Length(Queue);
for i := Index to ic - 1 do
Queue[i] := Queue[i+1];
setLength(Queue,ic-1);
end;
procedure TThreadPool.DelTaskOfIndex(var Queue: TWorkTaskQueue;
const Index: Integer);
var
i:integer;
ic:integer;
begin
ic := length(Queue);
for i := Index to ic -1 do
Queue[i] := Queue[i+1];
setLength(Queue,ic-1);
end;
procedure TThreadPool.MoveQueue(const wt: TWorkThread; flag: Integer);
var
k:integer;
begin
if flag = 0 then
begin
hIDleLock.Enter;
for k := Low(IdleQueue) to High(IdleQueue) do
begin
if IdleQueue[k]=wt then
begin
AddThreadToBusyQueue(wt);
DelQueueOfIndex(IdleQueue,k);
end;
end;
hIDleLock.Leave;
end
else
begin
hBusyLock.Enter;
for k := Low(BusyQueue) to High(BusyQueue) do
begin
if BusyQueue[k]=wt then
begin
AddThreadToIdleQueue(wt);
DelQueueOfIndex(BusyQueue,k);
end;
end;
hBusyLock.Leave;
end;
end;
function TThreadPool.SwitchTasks(const aTask, bTask: TWorkTask): Boolean;
var
aIndex,bIndex:Integer;
begin
Result := true;
hTaskLock.Enter;
aIndex := FindTask(aTask);
bIndex := FindTask(bTask);
if (aIndex = -1) or (bIndex = -1) then
begin
Result := false;
hTaskLock.Leave;
exit;
end;
switch(TaskQueue,aIndex,bIndex);
hTaskLock.Leave;
end;
function TThreadPool.TaskSzie(const Queue: TWorkTaskQueue): Integer;
begin
Result := Length(Queue);
end;
function TThreadPool.WaitAutoRecover(const curThread: TWorkThread): Boolean;
begin
Result := Waiting = curThread;
end;
procedure TThreadPool.CreateIdleThread(const Nums: Integer);
var
WorkThread:TWorkThread;
i:integer;
begin
hIDleLock.Enter;
for i := 0 to Nums - 1 do
begin
WorkThread := TWorkThread.Create(self);
WorkThread.FreeOnTerminate := true;
AddThreadToIdleQueue(WorkThread);
end;
hIDleLock.Leave;
end;
procedure TThreadPool.AddThreadToBusyQueue(const busy: TWorkThread);
var
sz:integer;
begin
sz := QueueSize(BusyQueue);
setLength(BusyQueue,sz + 1);
BusyQueue[sz] := busy;
end;
procedure TThreadPool.AddThreadToIdleQueue(const idle: TWorkThread);
var
sz:integer;
begin
sz := Length(IdleQueue);
setLength(IdleQueue,sz + 1);
IdleQueue[sz] := idle;
end;
function TThreadPool.PickupTask: TWorkTask;
begin
//
hTaskLock.enter;
if FSorted then
SortTask(TaskQueue);
if length(TaskQueue) > 0 then
begin
Result := TaskQueue[0];
DelTaskOfIndex(TaskQueue,0);
end
else
Result := Nil;
hTaskLock.Leave;
end;
function TThreadPool.AddWorkTask(Const wtask: TWorkTask):Integer;
var
sz,ic,bc:Integer;
begin
sz := Length(TaskQueue);
if sz >= FTasksCache then
begin
Result := -1;
DoTaskFull;
exit;
end;
setLength(TaskQueue,sz+1);
wtask.WorkState := tsWaiting;
TaskQueue[sz] := wtask;
Result := sz + 1;
//
ic := IdleQueueCount;
bc := BusyQueueCount;
// ic + bc = MaxNums
if (ic = 0) and (ic+ bc < FMaxNums) then
CreateIdleThread();
FAuto := True;
//
PostNewTaskSign;
Log.WriteLog('Add a task to queue.',lInfo);
end;
function TThreadPool.FindTask(const tsk: TWorkTask): Integer;
var
l:Integer;
begin
Result := -1;
for l := Low(TaskQueue) to High(TaskQueue) do
if TaskQueue[l] = tsk then
begin
Result := l;
Break;
end;
end;
procedure TThreadPool.PostNewTaskSign;
begin
entTaskNotify.SetEvent;
end;
procedure TThreadPool.switch(var Queue:TWorkTaskQueue;m,n:Integer);
var
tem:TWorkTask;
begin
tem := Queue[m];
Queue[m] := Queue[n];
Queue[n] := tem;
end;
procedure TThreadPool.QuikeSortTask(var Queue: TWorkTaskQueue; const s,
e: Integer);
var
key:Integer;
k,j:Integer;
begin
key := ord(Queue[s].WorkLevel);
if s > e then exit;
k := s;
j := e;
while (k <> j) do
begin
while (k < j) and (ord(Queue[j].WorkLevel) <= key) do // >=
dec(j);
switch(Queue,k,j);
while (k < j) and (ord(Queue[k].WorkLevel) >= key) do // <=
inc(k);
Switch(Queue,j,k);
end;
if s < k-1 then
QuikeSortTask(Queue,s,k-1);
if k+1 < e then
QuikeSortTask(Queue,k+1,e);
end;
procedure TThreadPool.SortTask(var Queue: TWorkTaskQueue);
var
f,l:Integer;
ic:Integer;
begin
ic := Length(Queue);
if ic = 0 then exit;
if Assigned(FOnSortTask) then
FOnSortTask(self,Queue)
else
begin
f := 0;
l := ic-1;
QuikeSortTask(Queue,f,l);
end;
end;
procedure TThreadPool.StartAll;
var
I:Integer;
begin
hBusyLock.Enter;
for I := Low(BusyQueue) to High(BusyQueue) do
begin
BusyQueue[i].Resume;
BusyQueue[i].State := wtRunning;
end;
hBusyLock.Leave;
hIDleLock.Enter;
for I := Low(IdleQueue) to High(IdleQueue) do
begin
IdleQueue[i].Resume;
IdleQueue[i].State := wtRunning;
end;
hIDleLock.Leave;
end;
procedure TThreadPool.StopAll;
var
I:Integer;
begin
hBusyLock.Enter;
for I := Low(BusyQueue) to High(BusyQueue) do
begin
BusyQueue[i].Suspend;
BusyQueue[i].State := wtStop;
end;
hBusyLock.Leave;
hIDleLock.Enter;
for I := Low(IdleQueue) to High(IdleQueue) do
begin
IdleQueue[i].Suspend;
IdleQueue[i].State := wtStop;
end;
hIDleLock.Leave;
end;
function TThreadPool.QueueSize(const Queue: TWorkThreadQueue):Integer;
begin
Result := Length(Queue);
end;
//
procedure TThreadPool.RecoverIDle(Const wait:TWorkThread);
var
k:Integer;
begin
FAuto:=False;
//
FWaitFlag := False;
Waiting := wait;
hBusyLock.Enter;
RemoveFromQueue(BusyQueue,wait);
hBusyLock.Leave;
//
CreateIdleThread();
WaitforSingleObject(hTimeJump,FRecoverInterval*ONEMINUTE);
// , ,
if (IdleQueueCount > 0)
and (BusyQueueCount = 0) //
and (TaskQueueCount = 0) then
begin
hTaskLock.Enter;
//
for k := High(IdleQueue) Downto FMinNums do
begin
TWorkThread(IdleQueue[k]).Terminate;
PostNewTaskSign;
end;
SetLength(IdleQueue,FMinNums);
hTaskLock.Leave;
end;
//
wait.Terminate;
FWaitFlag := True;
end;
procedure TThreadPool.RemoveFromQueue(var Queue: TWorkThreadQueue;
const re: TWorkThread);
var
index ,i: integer;
begin
index := -1;
for i := Low(Queue) to High(Queue) do
begin
if Queue[i] = re then
begin
index := i;
break;
end;
end;
if Index<>-1 then
DelQueueOfIndex(Queue,index);
end;
procedure TThreadPool.RemoveTask(const tk: TWorkTask);
var
index:Integer;
begin
index := FindTask(tk);
if index = -1 then Exit;
hTaskLock.Enter;
DelTaskOfIndex(TaskQueue,index);
hTaskLock.Leave;
end;
{ TWorkThread }
constructor TWorkThread.Create(const pool: TThreadPool);
begin
FPool := pool;
SetDefault;
inherited Create(false);
end;
procedure TWorkThread.Execute;
var
hd:Array[0..0] of Cardinal;
ret:Cardinal;
task:TWorkTask;
nc:Integer;
begin
//
hd[0]:= fPool.entTaskNotify.Handle;
while not Terminated do
begin
// , MinNums
ret := WaitForMultipleObjects(1,@hd,false,INFINITE);
if Terminated then break;
Case ret - WAIT_OBJECT_0 of
WAIT_OBJECT_0:
begin
if state <> wtRunning then
begin
try
//
task := FPool.PickupTask;
if assigned(task) then
begin
// , 。
task.hTask.Enter;
// ,
fPool.MoveQueue(self,0);
state := wtRunning;
//
if Assigned(fPool.FOnTaskWillDo) then
fPool.FOnTaskWillDo(self.ThreadID,task);
//
task.Work := self;
task.WorkState := tsDoing;
task.execTask;
state := wtFinished;
task.WorkState := tsFinished;
task.Work := nil;
task.hTask.leave;
//
if Assigned(fPool.FOnTaskFinished) then
fPool.FOnTaskFinished(task);
end;
finally
end;
end;
end;
WAIT_OBJECT_0 + 1:;//Terminate don't to do something
End;
nc := fPool.TaskQueueCount;
if (nc > 0) then
fpool.PostNewTaskSign
else if (fPool.FAuto) and (fPool.FWaitFlag) and (fPool.BusyQueueCount=1) then
fPool.RecoverIDle(self);// ,
state := wtIdle;
//
if not fPool.WaitAutoRecover(self) then //
fPool.MoveQueue(self,1)
else
fPool.Waiting := nil;
end;
end;
procedure TWorkThread.SetDefault;
begin
FState := wtIdle;
end;
{ TWorkTask }
constructor TWorkTask.Create;
begin
hTask := TCriticalSection.Create;
WorkState := tsNone;
FWorkLevel := tlNormal;
Work := nil;
end;
destructor TWorkTask.Destroy;
begin
WorkState := tsFinished;
if Assigned(Work) then
Work.Resume;
hTask.Free;
inherited;
end;
procedure TWorkTask.setWorkState(Const Value:TTaskState);
begin
FWorkState := Value;
case value of
tsReStart:
begin
if Assigned(Work) and (Work.Suspended) then
begin
FWorkState := tsDoing;
Work.Resume;
end;
end;
tsStop:
begin
if Assigned(Work) then
Work.Suspend;
end;
end;
end;
{ TPoolLog }
procedure TPoolLog.OutputLog(const Msg: String; Level: TLogLevel);
begin
// to implement at sub class.
end;
procedure TPoolLog.WriteLog(const Msg: String; Level: TLogLevel);
var
dt:TDatetime;
begin
dt := now;
OutputLog(datetimetostr(dt) + ' : ' + Msg,Level);
end;
end.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[Delphi] TStringBuilder그리고 꼭 사용해야만 할까? 그림처럼 Heap 영역에 "Hello" 공간을 생성하고 포인팅을 한다. "Hello World" 공간을 새로 생성한 후 포인팅을 하게 된다. 결국 "Hello" 라는 String 객체가 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.