추적 분석 ThreadPoolExecutor 소스 코드
14076 단어 자바
0 FBI WARNING
문장 이 대단히 수 다스 럽 고 빙빙 돈다.
1 버 전
JDK 버 전: OpenJDK 11.0.1
IDE : idea 2018.3
2 ThreadPoolExecutor 소개
Thread PoolExecutor 는 jdk 4 에 추 가 된 도구 로 jdk 자체 테이프 의 Executors 프레임 워 크 에 봉 인 된 자바 에서 가장 전형 적 인 스 레 드 풀 기술 입 니 다.
Thread PoolExecutor 류 는 concurrent 패키지 에서 다른 스 레 드 도구 류 와 마찬가지 로 Doug Lea 대신 이 조작 합 니 다.
[Spring ioc 와 Gson 을 보고 조금 피곤 해 졌 습 니 다. 입맛 을 바 꿔 서 jdk 의 소스 코드 를 보 세 요.]
3 Demo
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static void main(String[] args){
//
// , 5
ExecutorService executorService = Executors.newFixedThreadPool(5);
for(int i = 0 ; i < 100 ; i ++){
final int ii = i;
// Runnable
Runnable r = () -> System.out.println(ii);
//
executorService.execute(r);
}
}
}
일 선 연못 의 초기 화
스 레 드 탱크 의 초기 화 호출 된 Executors 프레임 워 크 의 정적 방법:
//Executors.class
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
이 구조 방법 을 계속 추적 하 세 요:
//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
계속 추적:
//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
//
this.corePoolSize = corePoolSize;
//
// ,
this.maximumPoolSize = maximumPoolSize;
//
// LinkedBlockingQueue ,
this.workQueue = workQueue;
//keepAliveTime :
// ,
//
// , keepAliveTime = 0,
this.keepAliveTime = unit.toNanos(keepAliveTime);
//
this.threadFactory = threadFactory;
//handler task
// , ThreadPoolExecutor defaultHandler
this.handler = handler;
}
2 노동자
Worker 는 Thread PoolExecutor 의 내부 클래스 로 Runnable 의 프 록 시 클래스 라 고 볼 수 있 습 니 다.
//ThreadPoolExecutor.class
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
// task
volatile long completedTasks;
Worker(Runnable firstTask) {
// AbstractQueuedSynchronizer ,
//-1 ,
setState(-1);
// task
// worker task
this.firstTask = firstTask;
//worker Runnable
//
this.thread = getThreadFactory().newThread(this);
}
// , worker run()
public void run() {
//runWorker(...) ThreadPoolExecutor
runWorker(this);
}
//
protected boolean isHeldExclusively() {
return getState() != 0;
}
// AbstractQueuedSynchronizer tryAcquire(...)
//
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// AbstractQueuedSynchronizer tryRelease(...)
//
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//
public void lock() {
acquire(1);
}
//
public boolean tryLock() {
return tryAcquire(1);
}
//
public void unlock() {
release(1);
}
//
public boolean isLocked() {
return isHeldExclusively();
}
//
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
runWorker (...) 방법 추적 하기:
//ThreadPoolExecutor.class
final void runWorker(Worker w) {
//
Thread wt = Thread.currentThread();
// task
Runnable task = w.firstTask;
// task
w.firstTask = null;
//
w.unlock();
// , true
boolean completedAbruptly = true;
try {
// while , task
//getTask() task
// task null, task ,
while (task != null || (task = getTask()) != null) {
// ,
w.lock();
// , ,
if ((runStateAtLeast(ctl.get(), STOP)
|| (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
//beforeExecute(...) afterExecute(...) ThreadPoolExecutor
// ,
beforeExecute(wt, task);
try {
// task
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
// task
task = null;
// task 1
w.completedTasks++;
//
w.unlock();
}
}
completedAbruptly = false;
} finally {
// worker
// task Worker
processWorkerExit(w, completedAbruptly);
}
}
Worker 는 스 레 드 탱크 에서 업무 논 리 를 진정 으로 완성 하 는 구성 요소 로 작업 과 스 레 드 의 패키지 입 니 다.
삼 스 레 드 탱크 의 상태 제어
스 레 드 탱크 의 상 태 는 주로 ctl 변수 에 의 해 제어 된다.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 은 AtomicInteger 형식의 변수 로 사실은 int 값 으로 간단하게 이해 할 수 있 습 니 다. AtomicInteger 는 높 은 병발 의 원자 화 작업 에 적응 할 수 있 을 뿐 입 니 다.
ctl 의 앞 29 자 리 는 스 레 드 (Worker) 의 수량 을 나타 내 고 뒤의 세 자 리 는 스 레 드 탱크 의 상 태 를 나타 낸다.
스 레 드 탱크 의 상 태 는 Running, Shutdown, Stop, Tidying, Terminate 등 다섯 가지 로 단어 에 따라 대충 알 아 맞 힐 수 있다.
주의해 야 할 것 은 이 다섯 가지 상태 온라인 프로 세 스 탱크 에 모두 int 변수의 형식 으로 존재 하고 예전 부터 나중에 차례대로 커지 며 상태 에 대한 비교 에 일련의 방법 이 있다 는 것 이다.
//ThreadPoolExecutor.class
private static boolean runStateLessThan(int c, int s) {
//c s
return c < s;
}
//ThreadPoolExecutor.class
private static boolean runStateAtLeast(int c, int s) {
//c s
return c >= s;
}
//ThreadPoolExecutor.class
private static boolean isRunning(int c) {
// RUNNING SHUTDOWN
return c < SHUTDOWN;
}
이러한 방법 에서 들 어 오 는 매개 변수 c 는 일반적으로 현재 스 레 드 탱크 상 태 를 말 하 며 s 는 비교 하 는 참조 상 태 를 말한다.
4 스 레 드 탱크 의 실행
이 파 트 의 시작 점:
executorService.execute(r);
execute (...) 방법 추적 하기:
public void execute(Runnable command) {
//
if (command == null)
throw new NullPointerException();
//ctl AtomicInteger ,
int c = ctl.get();
//workerCountOf(...) Worker
if (workerCountOf(c) < corePoolSize) {
//Worker
// Worker task
if (addWorker(command, true))
return;
// Worker task,
// ,
//
c = ctl.get();
}
// Worker , Worker ,
// , task
// , else , addWorker(...) , task
//reject(...) handler
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}else if (!addWorker(command, false))
reject(command);
}
1 reject
여기 서 먼저 reject (...) 방법 을 언급 합 니 다.
//ThreadPoolExecutor.class
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
본질은 handler 대상 의 관련 방법 을 호출 한 것 이다.이 예 에서 handler 대상 은 default Handler 를 가리 키 고 있 습 니 다.
//ThreadPoolExecutor.class
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
default Handler 는 AbortPolicy 형식의 대상 이 고 AbortPolicy 는 Thread PoolExecutor 의 정적 내부 클래스 입 니 다.
AbortPolicy 가 작용 하 는 방법 은 rejected Execution (...) 방법 입 니 다.
//AbortPolicy.class
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " + e.toString());
}
즉, task 가 너무 많은 상황 에서 AbortPolicy 의 대응 전략 은 이상 을 던 지 는 것 이다.
2 addWorker
핵심 방법 addWorker (...) 를 살 펴 보 겠 습 니 다.
//ThreadPoolExecutor.class
private boolean addWorker(Runnable firstTask, boolean core) {
// for ,
retry:
//
for (int c = ctl.get();;) {
// , , task , false
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// Worker false
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// ctl 1,
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// SHUTDOWN
// for SHUTDOWN
// SHUTDOWN false
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Worker
w = new Worker(firstTask);
//
final Thread t = w.thread;
if (t != null) {
// ,
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
// c RUNNING, [c RUNNING SHUTDOWN firstTask null]
//
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
// ,
if (t.isAlive())
throw new IllegalThreadStateException();
//workers , Worker
workers.add(w);
// Worker
int s = workers.size();
//largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
// Worker
workerAdded = true;
}
} finally {
//
mainLock.unlock();
}
// Worker workers
if (workerAdded) {
//
t.start();
// Worker
workerStarted = true;
}
}
} finally {
// Worker , Worker
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
다섯 시 잔소리
먼저 스 레 드 탱크 의 업무 논 리 를 정리 하 세 요.
1 task ( Runnable ) [execute(...) ]
2 task Worker [execute(...) ]
2.1 Worker ->
2.2 Worker -> , task Worker
2.3 task , -> (handler)
3 Worker Thread start() [addWorker(...) ]
4 start() Worker run() [Worker.class run() ]
5 Worker run() task run() [runWorker(...) ]
메 인 업무 논 리 는 복잡 하지 않 고 비교적 어 려 운 것 은 데이터 의 일치 성 을 확보 하기 위해 스 레 드 탱크 코드 에는 대량의 상태 판단 과 잠 금 체제 가 가득 하 다.
또한 성능 문 제 를 고려 하기 위해 스 레 드 탱크 의 디자인 은 비관 적 인 잠 금 (synchronized 키워드) 을 사용 하지 않 고 ASQ 와 ReetrentLock 체 제 를 대량으로 사용 했다.
본 고 는 개인의 학습 노트 일 뿐 오류 나 표현 이 명확 하지 않 은 부분 이 있 을 수 있 으 며 인연 이 있 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Is Eclipse IDE dying?In 2014 the Eclipse IDE is the leading development environment for Java with a market share of approximately 65%. but ac...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.