추적 분석 ThreadPoolExecutor 소스 코드

14076 단어 자바
영전 기 준비
0 FBI WARNING
문장 이 대단히 수 다스 럽 고 빙빙 돈다.
1 버 전
JDK 버 전: OpenJDK 11.0.1
IDE : idea 2018.3
2 ThreadPoolExecutor 소개
Thread PoolExecutor 는 jdk 4 에 추 가 된 도구 로 jdk 자체 테이프 의 Executors 프레임 워 크 에 봉 인 된 자바 에서 가장 전형 적 인 스 레 드 풀 기술 입 니 다.
Thread PoolExecutor 류 는 concurrent 패키지 에서 다른 스 레 드 도구 류 와 마찬가지 로 Doug Lea 대신 이 조작 합 니 다.
[Spring ioc 와 Gson 을 보고 조금 피곤 해 졌 습 니 다. 입맛 을 바 꿔 서 jdk 의 소스 코드 를 보 세 요.]
3 Demo
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {

    public static void main(String[] args){
        //     
        //             ,     5
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for(int i = 0 ; i < 100 ; i ++){
            final int ii = i;
            //   Runnable         
            Runnable r = () -> System.out.println(ii);
            //  
            executorService.execute(r);
        }
    }
}

일 선 연못 의 초기 화
스 레 드 탱크 의 초기 화 호출 된 Executors 프레임 워 크 의 정적 방법:
//Executors.class
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue());
}

이 구조 방법 을 계속 추적 하 세 요:
//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

계속 추적:
//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) {

    //                                
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();

    //        
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();

    //   
    this.corePoolSize = corePoolSize;
    //     
    //              ,             
    this.maximumPoolSize = maximumPoolSize;
    //         
    //     LinkedBlockingQueue      ,     
    this.workQueue = workQueue;
    //keepAliveTime       :
    //                ,            
    //             
    //           ,   keepAliveTime = 0,      
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    //          
    this.threadFactory = threadFactory;
    //handler      task          
    //          ,     ThreadPoolExecutor    defaultHandler   
    this.handler = handler;
}

2 노동자
Worker 는 Thread PoolExecutor 의 내부 클래스 로 Runnable 의 프 록 시 클래스 라 고 볼 수 있 습 니 다.
//ThreadPoolExecutor.class
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
    private static final long serialVersionUID = 6138294804551838833L;
    final Thread thread;
    Runnable firstTask;
    //   task       
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        //      AbstractQueuedSynchronizer     ,       
        //-1                 ,      
        setState(-1);
        //               task
        //         worker                       task  
        this.firstTask = firstTask;
        //worker        Runnable          
        //              
        this.thread = getThreadFactory().newThread(this);
    }

    //        ,       worker   run()   
    public void run() {
        //runWorker(...)     ThreadPoolExecutor  
        runWorker(this);
    }

    //      
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    //    AbstractQueuedSynchronizer    tryAcquire(...)   
    //    
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    //    AbstractQueuedSynchronizer    tryRelease(...)   
    //     
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    //       
    public void lock() { 
        acquire(1); 
    }
    //    
    public boolean tryLock() { 
        return tryAcquire(1); 
    }
    //        
    public void unlock() { 
        release(1); 
    }
    //       
    public boolean isLocked() { 
        return isHeldExclusively(); 
    }
    //    
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

runWorker (...) 방법 추적 하기:
//ThreadPoolExecutor.class
final void runWorker(Worker w) {
    //              
    Thread wt = Thread.currentThread();
    //   task
    Runnable task = w.firstTask;
    //       task   
    w.firstTask = null;
    //     
    w.unlock();
    //   ,     true               
    boolean completedAbruptly = true;
    try {
        //      while   ,        task
        //getTask()             task      
        //  task   null,           task    ,      
        while (task != null || (task = getTask()) != null) {
            //  ,    
            w.lock();
            //           ,           ,       
            if ((runStateAtLeast(ctl.get(), STOP) 
                    || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();

            try {
                //beforeExecute(...)   afterExecute(...)     ThreadPoolExecutor       
                //           ,          
                beforeExecute(wt, task);
                try {
                    //     task
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                //     task   
                task = null;
                //      task     1
                w.completedTasks++;
                //   
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //         worker
        //           task        Worker
        processWorkerExit(w, completedAbruptly);
    }
}

Worker 는 스 레 드 탱크 에서 업무 논 리 를 진정 으로 완성 하 는 구성 요소 로 작업 과 스 레 드 의 패키지 입 니 다.
삼 스 레 드 탱크 의 상태 제어
스 레 드 탱크 의 상 태 는 주로 ctl 변수 에 의 해 제어 된다.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 은 AtomicInteger 형식의 변수 로 사실은 int 값 으로 간단하게 이해 할 수 있 습 니 다. AtomicInteger 는 높 은 병발 의 원자 화 작업 에 적응 할 수 있 을 뿐 입 니 다.
ctl 의 앞 29 자 리 는 스 레 드 (Worker) 의 수량 을 나타 내 고 뒤의 세 자 리 는 스 레 드 탱크 의 상 태 를 나타 낸다.
스 레 드 탱크 의 상 태 는 Running, Shutdown, Stop, Tidying, Terminate 등 다섯 가지 로 단어 에 따라 대충 알 아 맞 힐 수 있다.
주의해 야 할 것 은 이 다섯 가지 상태 온라인 프로 세 스 탱크 에 모두 int 변수의 형식 으로 존재 하고 예전 부터 나중에 차례대로 커지 며 상태 에 대한 비교 에 일련의 방법 이 있다 는 것 이다.
//ThreadPoolExecutor.class
private static boolean runStateLessThan(int c, int s) {
    //c         s
    return c < s;
}
//ThreadPoolExecutor.class
private static boolean runStateAtLeast(int c, int s) {
    //c            s
    return c >= s;
}
//ThreadPoolExecutor.class
private static boolean isRunning(int c) {
    //      RUNNING     SHUTDOWN  
    return c < SHUTDOWN;
}

이러한 방법 에서 들 어 오 는 매개 변수 c 는 일반적으로 현재 스 레 드 탱크 상 태 를 말 하 며 s 는 비교 하 는 참조 상 태 를 말한다.
4 스 레 드 탱크 의 실행
이 파 트 의 시작 점:
executorService.execute(r);

execute (...) 방법 추적 하기:
public void execute(Runnable command) {
    //     
    if (command == null)
        throw new NullPointerException();
    
    //ctl     AtomicInteger      ,          
    int c = ctl.get();
    
    //workerCountOf(...)            Worker    
    if (workerCountOf(c) < corePoolSize) {
        //Worker               
        //     Worker     task    
        if (addWorker(command, true))
            return;
        //   Worker         task,        
        //            ,       
        //      
        c = ctl.get();
    }
    //  Worker               ,     Worker          ,        
    //              ,  task              
    //     ,    else    ,        addWorker(...)   ,         task
    //reject(...)       handler      
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }else if (!addWorker(command, false))
        reject(command);
}

1 reject
여기 서 먼저 reject (...) 방법 을 언급 합 니 다.
//ThreadPoolExecutor.class
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

본질은 handler 대상 의 관련 방법 을 호출 한 것 이다.이 예 에서 handler 대상 은 default Handler 를 가리 키 고 있 습 니 다.
//ThreadPoolExecutor.class
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

default Handler 는 AbortPolicy 형식의 대상 이 고 AbortPolicy 는 Thread PoolExecutor 의 정적 내부 클래스 입 니 다.
AbortPolicy 가 작용 하 는 방법 은 rejected Execution (...) 방법 입 니 다.
//AbortPolicy.class
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                        " rejected from " + e.toString());
}

즉, task 가 너무 많은 상황 에서 AbortPolicy 의 대응 전략 은 이상 을 던 지 는 것 이다.
2 addWorker
핵심 방법 addWorker (...) 를 살 펴 보 겠 습 니 다.
//ThreadPoolExecutor.class
private boolean addWorker(Runnable firstTask, boolean core) {
    //      for   ,      
    retry:
    //                   
    for (int c = ctl.get();;) {
        //       ,          ,     task  ,     false
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            //   Worker                  false
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            //  ctl       1,           
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            //      SHUTDOWN                 
            //           for              SHUTDOWN
            //     SHUTDOWN       false  
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //     Worker
        w = new Worker(firstTask);
        //      
        final Thread t = w.thread;
        if (t != null) {
            //  ,           
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                //      c   RUNNING,   [c   RUNNING    SHUTDOWN   firstTask   null]           
                //
                if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                    //              ,     
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //workers      ,     Worker   
                    workers.add(w);
                    //   Worker    
                    int s = workers.size();
                    //largestPoolSize                 
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //   Worker      
                    workerAdded = true;
                }
            } finally {
                //   
                mainLock.unlock();
            }
            //    Worker          workers   
            if (workerAdded) {
                //              
                t.start();
                //   Worker        
                workerStarted = true;
            }
        }
    } finally {
        //       Worker       ,        Worker
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

다섯 시 잔소리
먼저 스 레 드 탱크 의 업무 논 리 를 정리 하 세 요.
1     task (     Runnable        ) [execute(...)   ]

2   task         Worker    [execute(...)   ]
    2.1    Worker                 ->   
    2.2    Worker                ->      ,    task           Worker    
    2.3    task        ,      ->       (handler)

3 Worker              Thread       start()    [addWorker(...)   ]

4   start()        Worker   run()    [Worker.class    run()   ]

5 Worker   run()           task   run()    [runWorker(...)   ]

메 인 업무 논 리 는 복잡 하지 않 고 비교적 어 려 운 것 은 데이터 의 일치 성 을 확보 하기 위해 스 레 드 탱크 코드 에는 대량의 상태 판단 과 잠 금 체제 가 가득 하 다.
또한 성능 문 제 를 고려 하기 위해 스 레 드 탱크 의 디자인 은 비관 적 인 잠 금 (synchronized 키워드) 을 사용 하지 않 고 ASQ 와 ReetrentLock 체 제 를 대량으로 사용 했다.
본 고 는 개인의 학습 노트 일 뿐 오류 나 표현 이 명확 하지 않 은 부분 이 있 을 수 있 으 며 인연 이 있 습 니 다.

좋은 웹페이지 즐겨찾기