스레드 탱크 운행 작업 후 막힘 문제 분석

배경


오늘 한 친구가 아날로그 코드가 다음과 같은 질문을 했습니다.
public class ThreadPoolDemo {

    public static void main(String[] args) {
        int nThreads = 10;
        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
        executorService.execute(() -> System.out.println("test"));
    }
}

실행이 끝난 후 프로그램이 막힌 것을 발견했다.
프로그램이 실행 중인 것을 볼 수 있다.
  • 그럼 집행 완료 후 왜 퇴장하지 않습니까?
  • JVM은 언제 종료됩니까?
  • 이 프로그램은 왜 막혔을까요? 어느 곳에서 막혔을까요?

  •  

    2. JVM이 종료되는 몇 가지 상황


    JVM의 일반적인 종료 원인은 4가지입니다.
    1,kill-9 pid 프로세스를 직접 죽입니다
    2、java.lang.System.exit(int status)
    3、java.lang.Runtime.exit(int status)
    4、비수호 라인 생존 없음
     

    분석


    그러면 우리는 위의 문제로 돌아가서 왜 프로그램이 끝나지 않았는지 분석한다.

    3.1 원본 코드 분석법


    우리는 정장거리 라인 탱크의 구조 함수를 보았다
    java.util.concurrent.Executors#newFixedThreadPool(int)
     /**
         * Creates a thread pool that reuses a fixed number of threads
         * operating off a shared unbounded queue.  At any point, at most
         * {@code nThreads} threads will be active processing tasks.
         * If additional tasks are submitted when all threads are active,
         * they will wait in the queue until a thread is available.
         * If any thread terminates due to a failure during execution
         * prior to shutdown, a new one will take its place if needed to
         * execute subsequent tasks.  The threads in the pool will exist
         * until it is explicitly {@link ExecutorService#shutdown shutdown}.
         *
         * @param nThreads the number of threads in the pool
         * @return the newly created thread pool
         * @throws IllegalArgumentException if {@code nThreads <= 0}
         */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue());
        }

    주석을 통해 우리는 이 스레드 탱크의 핵심 스레드와 최대 스레드 수가 같고 작업 대기열이 무계 대기열이라는 것을 발견했다.
    만약 모든 핵심 라인이 임무를 수행하고 있다면, 임무는 작업 대기열에 놓을 것이다.실행 중 한 라인이 끊기면 다음 작업을 수행하기 위해 라인을 새로 만듭니다.Executor Service#shutdown 함수가 호출될 때까지 스레드 풀의 스레드는 계속 존재합니다.
    우리는 밑바닥의ThreadPoolExecutor의 구조 함수를 다시 보았다
    java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue)
       /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and default thread factory and rejected execution handler.
         * It may be more convenient to use one of the {@link Executors} factory
         * methods instead of this general purpose constructor.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @throws IllegalArgumentException if one of the following holds:
    * {@code corePoolSize < 0}
    * {@code keepAliveTime < 0}
    * {@code maximumPoolSize <= 0}
    * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }

    주석을 통해 다음과 같은 몇 가지 핵심 매개변수의 의미를 확인할 수 있습니다.
    첫 번째 매개 변수: 코어Poolsize: 코어 상주 스레드 풀.만약 0과 같으면 임무를 수행하고 요청이 들어오지 않으면 라인을 삭제합니다.0보다 크면 로컬 작업이 완료된 경우에도 코어 스레드 풀은 제거되지 않습니다.이 파라미터 설정은 매우 관건적이다. 설정이 너무 크면 자원을 낭비하고 설정이 너무 작으면 라인이 빈번하게 생성되거나 폐기된다.
    두 번째 파라미터:maximumPoolsize는 스레드 탱크가 동시에 실행할 수 있는 최대 스레드 수를 나타낸다.
    만약 스레드 탱크의 스레드 수가 핵심 스레드 수보다 크고 대기열이 가득 차며, 스레드 수가 최대 스레드 수보다 적으면 새로운 스레드를 만들 것입니다.세 번째 매개 변수:keepAliveTime은 스레드 탱크의 스레드 여가 시간을 나타낸다. 여가 시간이keepAliveTime 값에 도달하면 스레드가 삭제되고corePoolsize 스레드만 남을 때까지 메모리와 핸들 자원을 낭비하지 않는다.
    자세한 참조:java.util.concurrent.ThreadPoolExecutor#execute의 주석 섹션입니다.
    기본적으로, 스레드 탱크의 스레드 수가 코어PoolSize보다 많을 때keepAliveTime이 작동합니다.
    그러나 ThreadPoolExecutor의allowCoreThreadTimeOut 변수가true로 설정되면 핵심 라인이 시간을 초과한 후에도 회수됩니다.
    네 번째 매개변수: TimeUnit은 시간 단위를 나타냅니다.keepAliveTime의 시간 단위는 보통 Time Unit입니다.SECONDS.
    다섯 번째 매개 변수:workQueue는 캐시 대기열을 나타냅니다.요청한 스레드 수가 maximumPoolsize보다 많을 때, 스레드는 BlockingQueue 차단 대기열에 들어갑니다.
    여섯 번째 매개변수: threadFactory는 스레드 플랜트를 나타냅니다.그것은 같은 임무의 라인을 생산하는 데 쓰인다.스레드 탱크의 이름은 이factory에 그룹 이름 접두사를 추가함으로써 이루어집니다.가상 머신 창고를 분석할 때 스레드 임무가 어느 스레드 공장에서 발생했는지 알 수 있다.
    일곱 번째 인자:handler는 거부 정책을 실행하는 대상을 표시합니다.다섯 번째 파라미터workQueue의 작업 캐시 영역 상한선을 초과하고maximumPoolSize에 도달했을 때 이 정책을 통해 요청을 처리할 수 있습니다.
    기본 거부 정책은 RejectedExecutionException 예외를 던지는 것입니다.
        /**
         * The default rejected execution handler
         */
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();

    소스:
      /**
         * A handler for rejected tasks that throws a
         * {@code RejectedExecutionException}.
         */
        public static class AbortPolicy implements RejectedExecutionHandler {
            /**
             * Creates an {@code AbortPolicy}.
             */
            public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }

    우리는 다시 문제 자체로 돌아가 코드를 분석했다.
        public static void main(String[] args) {
            int nThreads = 10;
            ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
            executorService.execute(() -> System.out.println("test"));
        }

    스레드 탱크는 1회 임무만 수행했고 핵심 스레드 탱크와 최대 스레드 탱크는 모두 10이다. 따라서 첫 번째 임무를 제출할 때 1개의 스레드를 만들어서 수행해야 한다. 임무가 끝나면 새로운 임무가 들어오지 않는다. 그러나 핵심 스레드 탱크는 시간을 초과하지 않기 때문에 이 스레드는 계속'살아있다'고 임무를 기다린다.
    코어 스레드 풀의 기본 제한 시간 없음 근거:
    java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut
      /**
         * If false (default), core threads stay alive even when idle.
         * If true, core threads use keepAliveTime to time out waiting
         * for work.
         */
        private volatile boolean allowCoreThreadTimeOut;

     
    더 잘 이해하기 위해 코드를 개편해 봅시다.
     public static void main(String[] args) {
            //       
            Runnable runnable = () -> {
                try {
                    TimeUnit.SECONDS.sleep(20L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("test");
            };
    
            //    10      
            int nThreads = 10;
            ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
            //         
            executorService.setThreadFactory(new NamedThreadFactory("     "));
    
            //       (             )
            executorService.execute(runnable);
            executorService.execute(runnable);
            //      
            System.out.println(executorService.getActiveCount());
        }

    효과가 더욱 뚜렷하도록 20초 동안 임무를 멈추고 연못의 이름을 지었다.
    위의 지식을 바탕으로 절차를 추측해 봅시다.
    주 스레드는 스레드 풀을 만들고 스레드 풀은 첫 번째 작업(위와 같다)을 수행하며 스레드 풀은 두 번째 작업(이 때 첫 번째 스레드sleep 20초)을 수행합니다. 핵심 스레드 수 10에 도달하지 않기 때문에 두 번째 스레드를 생성하여 두 번째 작업을 수행합니다. 두 번째 업무도sleep 20초입니다. 이때 주 스레드는 스레드 풀의 활성 스레드 수(작업을 수행하고 있는 스레드)를 인쇄할 때 2개가 되어야 합니다.
    결과는 구상한 것과 같다.
    그러면 우리는 어떻게 이 스레드 탱크에 두 개의 스레드가 있는지 없는지를 봅니까?

    3.2 JVM 명령 또는 도구


    VisualVM을 사용하여 프로그램을 살펴보겠습니다.
    발견하기 전에 우리는 두 개의 라인을 만들어서 먼저 실행(시간은 무시할 수 없음)하고 바로 슬리핑에 들어간 다음에 Runnable 상태로 실행(컨트롤러에서'test'를 출력하여 시간이 너무 짧아서 인터페이스에 표시할 수 없음)한 다음WAITING 상태로 들어갑니다
    그림과 같이
     
    라인덤프를 통해 라인이 링크드 블락큐어에서 미션을 받을 때 막혔음을 알 수 있어요.
    java.util.concurrent.LinkedBlockingQueue#take
     public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }

    이 줄에 있습니다: notempty.await();현재 라인을 막고 밑바닥에java를 사용했습니다.util.concurrent.locks.LockSupport#park(java.lang.Object).
    흥미가 있으면 자바를 보러 가셔도 됩니다.util.concurrent.locks.LockSupport#park(java.lang.object) 사용법 및 설명입니다.
    따라서 이 스레드 탱크의 두 핵심 스레드는 작업이 막힌 대기열에 들어와서 계속 처리되기를 기다리고 있습니다.
    우리는 또 하나의 임무를 더해서 나의 구상을 검증할 수 있다
      public static void main(String[] args) throws InterruptedException {
            //       
            Runnable runnable = () -> {
                try {
                    TimeUnit.SECONDS.sleep(20L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("test");
            };
    
            //    10      
            int nThreads = 10;
            ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
            //         
            executorService.setThreadFactory(new NamedThreadFactory("     "));
    
            //       (             )
            executorService.execute(runnable);
            executorService.execute(runnable);
            //      
            System.out.println(executorService.getActiveCount());
    
            TimeUnit.SECONDS.sleep(5L);
            executorService.execute(runnable);
        }

    스레드 실행 상태를 생각하고 VisualVM을 통해 동적으로 효과를 관찰합니다.

     


    위의 소개를 통해 알 수 있듯이 핵심 스레드 탱크가 시간을 초과하지 않기 때문에 만들어진 핵심 스레드가 계속 생존하고 핵심 스레드 탱크가 막힌 원인은 막힌 대기열에서 데이터를 추출할 때 막힌 대기열에 의해 막힌 것이다.
    비수호 라인이 계속 존재하기 때문에 가상 기기는 종료되지 않기 때문에 프로그램도 끝나지 않는다.
     
    아마도 누군가가 "스레드 탱크가 임무를 수행하고 나면 소각되지 않을 것"이라고 말할 것이다. 그렇지?다음 예를 살펴보십시오.
    그럼 다음 프로그램의 실행이 어떻게 되는지 다시 한 번 볼까요?
     public static void main(String[] args) throws InterruptedException {
            int nThreads =10;
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(nThreads);
            //          ,     2s
            executorService.setKeepAliveTime(2L, TimeUnit.SECONDS);
            executorService.allowCoreThreadTimeOut(true);
            executorService.execute(()-> System.out.println("test"));
        }

    실행 후test를 인쇄한 후 2s를 기다리면 작업이 없고 핵심 스레드 탱크의 스레드가 삭제됩니다. 비수호 스레드가 없기 때문에 가상 머신이 종료됩니다(exit code 0).
     

    3.3 단점 디버깅 학습법


    우리는 또한 단점을 통해 연못의 각종 속성을 배우고 운행 상태 등을 관찰할 수 있다.
     public static void main(String[] args) throws InterruptedException {
            //       
            Runnable runnable = () -> {
                try {
                    TimeUnit.SECONDS.sleep(20L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("test");
            };        //    10      
            int nThreads = 10;
            ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);        //         
            executorService.setThreadFactory(new NamedThreadFactory("     "));        //       (             )
            executorService.execute(runnable);
            executorService.execute(runnable);
            //      
            System.out.println(executorService.getActiveCount());
        }

    인쇄문에서 중단점이 있습니다. 중단점은 Thread:
    그렇지 않으면 모든 노선이 끊어질 것이다.
    효과는 다음과 같습니다.
    핵심 라인의 시간 초과를 허용하는지, 완성된 작업 수를 볼 수 있으며,workers를 통해 작업의 라인 상태를 볼 수 있습니다.
    대기 조건 및 대기 대기열 등의 정보를 볼 수도 있습니다.
    학습을 병행하면 디버깅을 많이 할 수 있고 여러 가지 학습 수단을 결합하면 효과가 더욱 좋다.
    우리는 작업을 수행하는 스레드가 스레드 풀의 Worker 객체로 봉인되어 있음을 발견했습니다.
     /**
         * Set containing all worker threads in pool. Accessed only when
         * holding mainLock.
         */
        private final HashSet workers = new HashSet();

    java.util.concurrent.ThreadPoolExecutor.Worker
    AbstractQueuedSynchronizer(AQS)를 계승하여 Runnable 인터페이스를 실현하였다.
    흥미가 있으면 여러분은 원본 코드를 보고 디버깅 정보 등에 따라 깊이 있게 공부할 수 있습니다.
     
     

    4. 총결산

  • 우리는 원본에서 지식을 많이 배워야 한다. 원본은 가장 권위 있고 전면적인 학습 자료이다.
  • 우리는 자바 조립 도구를 잘 사용해야 한다. IDEAD의 단점 디버깅 도구, JVM 모니터링 도구, 그리고 자바 컴파일링과 어셈블리 도구 등을 포함한다.
  • 문제에 부딪히면 많이 생각하고 DEMO 검증을 작성합니다.
  • 한 문제는 N개의 지식점을 넓힐 수 있다. 하나의'작은 문제'의 이해가 없고 그 뒤에 일련의 지식이 튼튼하지 못하기 때문에 기회를 빌려 공고히 해야 한다.

  •  
     
    창작이 쉽지 않다. 만약에 본고가 당신에게 도움이 된다고 생각한다면 칭찬을 환영하고 저를 주목해 주십시오. 만약에 보충 환영 평론과 교류가 있다면 저는 더 좋은 글을 창작하려고 노력할 것입니다.
     
     

    좋은 웹페이지 즐겨찾기