개체 풀 및 스레드 풀
23661 단어 Java 멀티스레드 프로그래밍
개체 풀
우선 대상지를 말씀드리겠습니다.대상 탱크는 직렬 폐쇄의 개념을 이용했다. 대상 O를 하나의 요청 라인 T1에 빌려주고 T1을 사용한 후에 대상 탱크에 돌려주며'함부로 이 대상을 발표하지 않았음'과'앞으로 사용하지 않음'을 보증한다.대상지에서 대상자 O를 회수한 후 T2가 빌려올 때 다시 T2에게 빌려주고 대상자의 사용권 전달을 완성한다.
다음은 간략한 버전의 대상 탱크이다
public abstract class AbstractObjectPool {
protected final int min;
protected final int max;
protected final List usings = Lists.newLinkedList();
protected final List buffer = Lists.newLinkedList();
private volatile boolean inited = false;
public AbstractObjectPool(int min, int max) {
this.min = min;
this.max = max;
if (this.min < 0 || this.min > this.max) {
throw new IllegalArgumentException(String.format("need 0<=min<=max<=Integer.MAX_VALUE,given min%s,max:%s", this.min, this.max));
}
}
public void init() {
for (int i = 0; i < min; i++) {
buffer.add(newObject());
}
inited = true;
}
public void checkInited() {
if (!inited) {
throw new IllegalStateException("not inited");
}
}
abstract protected T newObject();
public synchronized T getObject() {
checkInited();
if (usings.size() == max) {
return null;
}
if (buffer.size() == 0) {
T newObject = newObject();
usings.add(newObject);
return newObject;
}
T oldObject = buffer.remove(0);
usings.add(oldObject);
return oldObject;
}
public synchronized void freeObject(T object) {
checkInited();
if (!usings.contains(object)) {
throw new IllegalArgumentException(String.format("object not in using queue:%s", object));
}
usings.remove(usings.indexOf(object));
buffer.add(object);
}
}
AbstractObjectPool의 특성은 다음과 같습니다.
간단하지만 시간이 민감하고 자원이 풍부한 장면에 많이 쓸 수 있다.만약 시간이 더욱 민감해지면 getObject(),freeObject()를 병발 정도가 높은 버전으로 바꿀 수 있으나 안전하게 발표하여 안전하게 회수할 수 있도록 기억합니다.자원이 충분하지 않으면 대상의 회수 전략을 적당히 늘릴 수 있다.
객체 풀의 기본 동작은 다음과 같습니다.
스레드 풀
먼저 결론을 내렸다. 스레드 탱크는 대상 탱크 모델을 섞었지만 핵심 원리는 생산자-소비자 모델이다.
상속 구조는 다음과 같습니다.
사용자는 Runnable (또는 Callables) 실례를 스레드 탱크에 제출할 수 있으며, 스레드 탱크는 이 작업을 비동기적으로 수행하여 응답의 결과 (완성/반환 값) 를 되돌려줍니다.
내가 가장 좋아하는 것은submit(Callable task) 방법이다.우리는 이 방법에서 착안하여 함수 창고에 점차 깊이 들어가 연못의 실현 원리를 탐구한다.
submit()
submit () 방법은 Executor 서비스 인터페이스에서 정의되고 추상적인 클래스인 AbstractExecutor 서비스가 실현되며ThreadPoolExecutor가 직접 계승됩니다.
public abstract class AbstractExecutorService implements ExecutorService {
...
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
AbstractExecutorService #newTaskFor()는 RunnableFutrue 유형의FutureTask를 만듭니다.핵심은 excute () 방법
execute () 방법
execute () 방법은 인터페이스 Executor에서 정의하고 ThreadPool Executor에서 실행합니다.
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
...
}
우리는 스레드 탱크의 연못화 전략을 잠시 소홀히 한다.가장 간단한 장면을 주목하고 스레드 탱크의 임무가 어떻게 집행되는지 보자.
핵심은 addWorker() 방법입니다.8번째 행위의 경우, 이 때, 스레드 탱크의 스레드 수가 최소 스레드 탱크의 크기에 미치지 못하면, 보통 9줄에서 바로 되돌아갈 수 있다.
addWorker()
addWorker () 방법은 ThreadPoolExecutor의 개인적인 방법입니다.
public class ThreadPoolExecutor extends AbstractExecutorService {
....
private boolean addWorker(Runnable firstTask, boolean core) {
...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN){
workers.add(w);
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
....
}
여기에는 스레드 탱크를 관리하고 스레드 안전을 유지하는 데 쓰이는 코드가 많이 없어졌다.스레드 탱크가 닫히지 않고 워크맨 (즉 w, 이하) 이 추가되었다고 가정하면 워크맨을 워크맨에 추가합니다.workers는 HashSet입니다.
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet workers = new HashSet();
대상 풀 위치
대상 탱크와 관련이 있다면workers와 대상 탱크 실례 코드에 해당하는 using만 대상 탱크 모델을 응용했다.다만 이곳의 using은 최대 스레드 탱크의 크기인 maximum Pool Size까지 성장해 왔다.
그러나 분명히 스레드 탱크는 스레드를 발표하지 않았고workers는 using 저장 스레드의 역할만 완성했다.그렇다면 스레드 탱크의 임무는 어떻게 집행됩니까?스레드 탱크랑 상관 없나요?
어디가 대상지도 아닌데?
위 addWorker() 코드의 9, 17, 24행 코드를 주의하십시오
핵심은 바로 이 세 줄에 있다. 스레드 탱크는ddWorker()에서 우리가 전송한firstTask를 직접 시작하지 않고 워크맨을 시작하지 않는다. 최종 임무는 반드시 시작된다. 그러면 우리는 워크맨이 어떻게 이 임무를 시작하는지 계속 본다.
Worker
Worker는 Runnable 인터페이스를 구현합니다.
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
...
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
...
}
Worker를 구성할 때 매개변수의 이름을 firstTask로 지정하는 이유는 무엇입니까?작업task를 실행하기 위해 새로운 워커를 만들어야만 구조 함수를 호출할 수 있기 때문입니다.따라서 작업 task는 새 Worker의 첫 번째 작업 firstTask입니다.
워커의 실현은 매우 간단하다. 자신을 Runable의 실례로 구성할 때 내부에 하나의 라인thread를 만들고 가지고 있다.Thread와 Runable의 사용은 익숙합니다. 핵심은 워커의run 방법입니다. 이것은 직접runWorker() 방법을 호출합니다.
runWorker()
포인트가 왔습니다.단순화는 다음과 같습니다.
public class ThreadPoolExecutor extends AbstractExecutorService {
...
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
...
}
우리는 앞에서 실행할 작업을firstTask에 값을 부여하고 5-6줄에서 작업task를 먼저 꺼내고firstTask를null로 설정합니다.다음에 task를 실행해야 하기 때문에 firstTask 필드는 쓸모가 없습니다.
중점은 10-31 줄의while 순환입니다.다음은 상황에 따라 토론하겠습니다.
case1: 첫 번째 순환,task는null이 아닙니다
case1은 앞의 모든 가설에 대응하여 프로그램이 이while 순환에 정상적으로 실행됩니다.
첫 번째 순환에 들어갈 때task=firstTask,null이 아니라 순환체에 직접 들어간다.따라서 16줄의firstTask의run() 방법을 실행한다.이상 처리는 우리가 여기서 먼저 고려하지 않는다.마지막으로finally 코드 블록에서task는null로 설정되어 다음 순환은case2에 들어갑니다.
case2: 순환체에 처음 들어가지 않음,task는null
case2는 더욱 보편적인 상황으로 스레드 탱크의 핵심이다.
case1에서task는null로 설정되어 10줄의 볼 표현식이 두 번째 부분을 실행합니다: (task = getTask ()!null (주: getTask () 방법은 나중에 설명하고 사용자가 제출한 작업을 되돌려줍니다.task에서 제출한 작업을 가져오면 16줄이 새로 얻은 작업의run () 방법을 실행합니다.뒤에는case1이 통하고 마지막task도null로 설정되며 이후 순환은case2에 들어갑니다.
getTask()
case2에서 매번 순환할 때마다 제출한 임무를 새로 가져옵니다. 그 임무는 어디에서 옵니까?간략화는 다음과 같다.
public class ThreadPoolExecutor extends AbstractExecutorService {
...
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
...
}
우리 먼저 간단한 7 -11 줄을 봅시다.
우선workQueue는 라인이 안전한 BlockingQueue로 대부분 사용하는 실현 클래스는 링크드 BlockingQueue이다.
private final BlockingQueue workQueue;
timed가false라고 가정하면, 막힌take () 방법을 사용합니다. 되돌아오는 것은null이 아닙니다. 11줄return에서 종료하고, 가져온task를 어떤worker 라인에 넘겨서 실행합니다.
workQueue의 요소는 어디에서 오나요?이것은 최초의 excute () 방법으로 돌아가야 한다.
execute()
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
...
}
앞에서 8줄의 매개 변수를 예로 들면, 이 때, 스레드 탱크의 스레드 수가 최소 스레드 탱크의 크기corePoolsize에 이르지 않고, 보통 이때는 10줄에서 바로 되돌아온다.
8행의 조건이 충족되지 않으면 13행으로 실행됩니다.isRunning(c)는 스레드 탱크가 닫히지 않았는지 판단합니다. 여기서 우리는 닫히지 않은 상황만 주목합니다.다음은 부울 표현식의 두 번째 부분workQueue를 실행합니다.작업 대기열workQueue에 작업을 넣으려고 하는 offer (command).
workQueue.offer () 의 행위는 스레드 탱크가 가지고 있는 BlockingQueue 실례에 달려 있습니다.Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor () 에서 만든 스레드 풀은 링크드 블록Queue를 사용하고 Executors를 사용합니다.새 CachedThreadPool () 에서 만든 스레드 풀은 SynchronousQueue를 사용합니다.
public class Executors {
...
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
}
Linked BlockingQueue를 예로 들면, 만들 때 용량을 설정하지 않습니다. 즉, 무계 대기열로 만들면 Linked BlockingQue#offer () 는true로 영원히 되돌아와execute () 13-19 줄에 들어갑니다.
workQueue.offer ()가true로 되돌아올 때 작업command를 대기열workQueue에 넣었습니다.미래의 어느 순간에, 한 워크맨이 임무를 완성하면 getTask () 를 통해 워크맨Queue에서 작업이 끝날 때까지 계속 실행됩니다.
case2 매듭
실제로 스레드 탱크의 핵심 원리는 대상 탱크 모델과 무관하고 생산자-소비자 모델임을 알 수 있다.
갈고리 방법
runWorker () 방법으로 돌아가서 작업을 수행하는 과정에서 루틴 풀에는 beforeExecute (), afterExecute () 와 같은 갈고리 방법이 보존되어 있습니다.사용자는 자신의 스레드 탱크를 실현할 때, 복사 갈고리 방법을 통해 스레드 탱크에 기능을 추가할 수 있다.
총결산
대상 탱크와 스레드 탱크는 관계가 크지 않지만 자주 혼동될 수 있다.연못의 실현은 매우 재미있다.원본 코드를 거슬러 올라가기 전에 나는 스레드 탱크가 스레드를 저장하고 사용할 때 임무를 수행하는 것이라고 생각했다.원본을 보고서야 그것이 이렇게 절묘하게 이루어지고 간결하고 우아하며 효율이 높다는 것을 알았다.원본이야말로 가장 좋은 선생님이다.