java에서 ThreadPoolExecutor 원리 분석

13999 단어 javaThreadPoolExecutor
java에서 ThreadPoolExecutor 원리 분석
스레드 풀 소개
Java 스레드 풀은 개발에서 자주 사용하는 도구입니다. 우리가 비동기적이고 병행적인 작업을 처리할 때 스레드 풀을 자주 사용하거나 서버를 실현할 때 연결 처리 요청을 수신하기 위해 스레드 풀을 사용해야 합니다.
스레드 풀 사용
JDK에서 제공하는 스레드 풀은java에 있습니다.util.concurrent.ThreadPoolExecutor.사용할 때, 통상적으로 Executor 서비스 인터페이스를 사용하는데,submit,invokeAll,shutdown 등 일반적인 방법을 제공한다.
온라인 스레드 탱크 설정에 있어 Executors 클래스에서 자주 사용하는 장면을 제공할 수 있는 정적 방법을 제공했다. 예를 들어 new Fixed Thread Pool, new Cached Thread Pool, new Single Thread Executor 등이다. 이런 방법은 최종적으로thread Pool Executor의 구조 함수에 호출되었다.
ThreadPoolExecutor의 모든 매개변수를 포함하는 구조 함수는

/**
   * @param corePoolSize the number of threads to keep in the pool, even
   *    if they are idle, unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *    pool
   * @param keepAliveTime when the number of threads is greater than
   *    the core, this is the maximum time that excess idle threads
   *    will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *    executed. This queue will hold only the {@code Runnable}
   *    tasks submitted by the {@code execute} method.
   * @param threadFactory the factory to use when the executor
   *    creates a new thread
   * @param handler the handler to use when execution is blocked
   *    because the thread bounds and queue capacities are reached
  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
      throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
  }
  • corePoolSize는 스레드 풀의 핵심 스레드 수를 설정합니다. 새 작업을 추가할 때 스레드 풀의 스레드 수가 corePoolSize보다 작으면 현재 스레드가 비어 있든 없든 작업을 수행하기 위해 새로운 스레드를 만듭니다..
  • maximunPoolSize는 스레드 탱크에서 허용하는 최대 스레드 수입니다
  • WorkQueue는 줄 서기 작업을 저장하는 데 사용됩니다
  • keep AliveTime은 corePoolSize보다 큰 라인이 유휴된 시간 초과 시간입니다
  • handler는 작업 탈출, 스레드 풀이 닫혔을 때의 작업 처리에 사용됩니다. 스레드 풀의 스레드 증가 전략은 현재 스레드 수가corePoolSize보다 적을 때, 새로운 스레드를 추가합니다. 스레드 수=corePoolSize와 corePoolSize일 때,workQueue가 새로운 작업을 저장할 수 없을 때만 새 스레드를 만들고, 초과된 스레드는 유휴keepaliveTime 후에 삭제합니다.
  • 구현(JDK1.8 기반)
    ThreadPoolExecutor에 저장된 상태는
    현재 라인 풀 상태는 RUNNING, SHUTDOWN, STOP, TIDYING,TERMINATED를 포함한다.
    현재 유효한 운행 라인의 수량입니다.
    이 두 상태를 int 변수에 넣고 앞의 세 자리는 스레드 탱크 상태이고 뒤의 29자리는 스레드 수량이다.
    예를 들어 0b11100000000000000001은 RUNNING, 하나의 라인을 나타낸다.
    HashSet을 통해 작업자 집합을 저장합니다. 이 HashSet에 접근하기 전에 보호 상태의 mainLock: ReentrantLock을 받아야 합니다.
    submit、execute
    excute의 실행 방식은 현재 작업자 수를 먼저 검사하고 코어PoolSize보다 작으면 코어 작업자를 추가합니다.스레드 탱크는 유지 보수 스레드 수량과 상태 검사에 있어 대량의 검사를 했다.
    
    public void execute(Runnable command) {
        int c = ctl.get();
        //  corePoolSize
        if (workerCountOf(c) < corePoolSize) {
          //  worker
          if (addWorker(command, true))
            return;
          c = ctl.get();
        }
        //  
        if (isRunning(c) && workQueue.offer(command)) {
          //  , 
          int recheck = ctl.get();
          if (! isRunning(recheck) && remove(command))
            reject(command);
          //  down 
          else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
        }
        else if (!addWorker(command, false))
          reject(command);
      }
    
    addWorker 메서드 구현
    
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
          int c = ctl.get();
          int rs = runStateOf(c);
          // Check if queue empty only if necessary.
          if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
              firstTask == null &&
              ! workQueue.isEmpty()))
            return false;
          for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
              wc >= (core ? corePoolSize : maximumPoolSize))
              return false;
            if (compareAndIncrementWorkerCount(c))
              break retry;
            c = ctl.get(); // Re-read ctl
            if (runStateOf(c) != rs)
              continue retry;
            // else CAS failed due to workerCount change; retry inner loop
          }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          w = new Worker(firstTask);
          final Thread t = w.thread;
          if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              // Recheck while holding lock.
              // Back out on ThreadFactory failure or if
              // shut down before lock acquired.
              int rs = runStateOf(ctl.get());
              if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                  throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                  largestPoolSize = s;
                workerAdded = true;
              }
            } finally {
              mainLock.unlock();
            }
            if (workerAdded) {
              //  , , Worker run ,Worker run runWorker(Worker)
              t.start();
              workerStarted = true;
            }
          }
        } finally {
          if (! workerStarted)
            addWorkerFailed(w);
        }
        return workerStarted;
      }
    
    Worker 클래스는 AbstractQueuedSynchronizer를 계승하여 동기화 대기 기능을 얻었다.
    
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
      {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
          setState(-1); // inhibit interrupts until runWorker
          this.firstTask = firstTask;
          this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker */
        public void run() {
          runWorker(this);
        }
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        protected boolean isHeldExclusively() {
          return getState() != 0;
        }
        protected boolean tryAcquire(int unused) {
          if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
          }
          return false;
        }
        protected boolean tryRelease(int unused) {
          setExclusiveOwnerThread(null);
          setState(0);
          return true;
        }
        public void lock()    { acquire(1); }
        public boolean tryLock() { return tryAcquire(1); }
        public void unlock()   { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        void interruptIfStarted() {
          Thread t;
          if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
              t.interrupt();
            } catch (SecurityException ignore) {
            }
          }
        }
    
    runWorker(Worker)는 Worker의 폴링 실행 논리로 작업 대기열에서 작업을 계속 가져와 실행합니다.Worker는 작업을 수행할 때마다interrupt되지 않도록 lock을 수행해야 합니다.
    
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
          while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
               (Thread.interrupted() &&
               runStateAtLeast(ctl.get(), STOP))) &&
              !wt.isInterrupted())
              wt.interrupt();
            try {
              beforeExecute(wt, task);
              Throwable thrown = null;
              try {
                task.run();
              } catch (RuntimeException x) {
                thrown = x; throw x;
              } catch (Error x) {
                thrown = x; throw x;
              } catch (Throwable x) {
                thrown = x; throw new Error(x);
              } finally {
                afterExecute(task, thrown);
              }
            } finally {
              task = null;
              w.completedTasks++;
              w.unlock();
            }
          }
          completedAbruptly = false;
        } finally {
          processWorkerExit(w, completedAbruptly);
        }
      }
    
    ThreadPoolExecutor의submit 방법은Callable을FutureTask로 포장한 후execute 방법에 건네줍니다.
    FutureTask
    FutureTask는 Runnable 및 Future에서 상속되며 FutureTask가 정의하는 몇 가지 상태는
    NEW, 아직 실행 안 함
    COMPLETING, 실행 중
    NORMAL, 정상 실행 완료 결과
    예외, 예외 던지기 실행
    CANCELLED, 실행 취소
    INTERRUPTING, 실행 중단
    INTERRUPTED, 중단되었습니다.
    그중의 관건적인 get 방법
    
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
          s = awaitDone(false, 0L);
        return report(s);
      }
    
    현재 상태를 가져오고 아직 실행되지 않고 정상적이면 결과를 기다리는 프로세스에 들어갑니다.awaitDone에서 현재 상태를 계속 순환해서 가져옵니다. 결과가 없으면 CAS를 통해 체인 테이블의 머리 부분에 추가하고, 시간 초과를 설정하면 LockSupport입니다.parkNanos에서 지정된 시간까지
    
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
      }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
          if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
          }
          int s = state;
          if (s > COMPLETING) {
            if (q != null)
              q.thread = null;
            return s;
          }
          else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
          else if (q == null)
            q = new WaitNode();
          else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                               q.next = waiters, q);
          else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
              removeWaiter(q);
              return state;
            }
            LockSupport.parkNanos(this, nanos);
          }
          else
            LockSupport.park(this);
        }
      }
    
    FutureTask의run 방법은 작업을 수행하고 결과의 위치를 설정하는 것입니다. 우선 현재 상태가 NEW인지 판단하고 현재 라인을 실행 라인으로 설정한 다음 Callable의call을 호출하여 결과를 얻은 후 설정 결과를 FutureTask 상태로 수정합니다.
    
    public void run() {
        if (state != NEW ||
          !UNSAFE.compareAndSwapObject(this, runnerOffset,
                         null, Thread.currentThread()))
          return;
        try {
          Callable<V> c = callable;
          if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
              result = c.call();
              ran = true;
            } catch (Throwable ex) {
              result = null;
              ran = false;
              setException(ex);
            }
            if (ran)
              set(result);
          }
        } finally {
          // runner must be non-null until state is settled to
          // prevent concurrent calls to run()
          runner = null;
          // state must be re-read after nulling runner to prevent
          // leaked interrupts
          int s = state;
          if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
        }
      }
    
    읽어주셔서 감사합니다. 여러분에게 도움이 되었으면 좋겠습니다. 본 사이트에 대한 지지에 감사드립니다!

    좋은 웹페이지 즐겨찾기