자바 사용자 정의 스 레 드 탱크 와 스 레 드 총수 제어 작업

개술
지 화 는 흔히 볼 수 있 는 사상 이다.스 레 드 탱크 는 매우 전형 적 인 지 화 실현 이다.도 자바 중의 스 레 드 탱크 를 크게 설명 했다.본 고 는 간단 한 스 레 드 탱크 를 실현 한다.
2 핵심 클래스
【1】인터페이스 정의

public interface IThreadPool<Job extends Runnable> {
 /**
 *      
 */
 public void shutAlldown();
 
 /**
 *     
 * 
 * @param job   
 */
 public void execute(Job job);
 
 /**
 *      
 * 
 * @param addNum    
 */
 public void addWorkers(int addNum);
 
 /**
 *      
 * 
 * @param reduceNum     
 */
 public void reduceWorkers(int reduceNum);
}
【2】실현 류
스 레 드 탱크 의 핵심 은 작업 목록 1 개 와 작업 자 목록 1 개 를 유지 하 는 것 입 니 다.

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List; 
public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> { 
 //      
 private static int DEAFAULT_SIZE = 5;
 //      
 private static int MAX_SIZE = 10; 
 //     
 private LinkedList<Job> tasks = new LinkedList<Job>();
 //       
 private List<Worker> workers = Collections
  .synchronizedList(new ArrayList<Worker>()); 
 /**
 *       
 */
 public XYThreadPool() {
 initWokers(DEAFAULT_SIZE);
 } 
 /**
 *      
 * 
 * @param threadNums    
 */
 public XYThreadPool(int workerNum) {
 workerNum = workerNum <= 0 ? DEAFAULT_SIZE
  : workerNum > MAX_SIZE ? MAX_SIZE : workerNum;
 initWokers(workerNum);
 } 
 /**
 *       
 * 
 * @param threadNums    
 */
 public void initWokers(int threadNums) {
 for (int i = 0; i < threadNums; i++) {
  Worker worker = new Worker();
  worker.start();
  workers.add(worker);
 }
 //       
 Runtime.getRuntime().addShutdownHook(new Thread() {
  public void run() {
  shutAlldown();
  }
 });
 } 
 @Override
 public void shutAlldown() {
 for (Worker worker : workers) {
  worker.shutdown();
 }
 } 
 @Override
 public void execute(Job job) {
 synchronized (tasks) {
  //                  ,         
  tasks.addLast(job);
  tasks.notifyAll();
 }
 } 
 @Override
 public void addWorkers(int addNum) {
 //          ,               
 if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {
  initWokers(addNum);
 } else {
  System.out.println("addNum too large");
 }
 } 
 @Override
 public void reduceWorkers(int reduceNum) {
 if ((workers.size() - reduceNum <= 0))
  System.out.println("thread num too small");
 else {
  //           
  int count = 0;
  while (count != reduceNum) {
  for (Worker w : workers) {
   w.shutdown();
   count++;
  }
  }
 }
 } 
 /**
 *     
 */
 class Worker extends Thread { 
 private volatile boolean flag = true; 
 @Override
 public void run() {
  while (flag) {
  Job job = null;
  //   (     woker     ,             ,    )
  synchronized (tasks) {
   //       
   while (tasks.isEmpty()) {
   try {
    //   ,     ,   notify  
    tasks.wait();
    System.out.println("block when tasks is empty");
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   }
   //        
   job = tasks.removeFirst();
   System.out.println("get job:" + job + ",do biz");
   job.run();
  }
  }
 } 
 public void shutdown() {
  flag = false;
 }
 }
}
(1)wait()방법 을 호출 할 때 스 레 드 는 대상 자 물 쇠 를 포기 하고 이 대상 을 기다 리 는 잠 금 풀 에 들 어 갑 니 다.이 대상 에 게 notify()방법 을 호출 한 후에 야 이 스 레 드 는 대상 잠 금 풀 에 들 어가 준비 합 니 다.
(2)Object 의 방법:void notify():이 대상 을 기다 리 고 있 는 스 레 드 를 깨 웁 니 다.void notify All():이 대상 을 기다 리 고 있 는 모든 스 레 드 를 깨 웁 니 다.
notify All 은 이 대상 에서 notify 를 기다 리 던 모든 스 레 드 를 wait 상태 에서 종료 시 키 고 이 대상 의 잠 금 을 기다 리 는 것 으로 바 꾸 었 습 니 다.이 대상 이 잠 금 이 풀 리 면 경쟁 합 니 다.
notify 는 wait 상태 스 레 드 를 선택 하여 알림 을 하고 대상 의 자 물 쇠 를 가 져 옵 니 다.그러나 이 대상 에 게 notify 를 기다 리 는 다른 스 레 드 들 은 놀 라 지 않 습 니 다.첫 번 째 스 레 드 가 실 행 된 후에 대상 의 자 물 쇠 를 풀 어 줍 니 다.이때 이 대상 이 notify 문 구 를 다시 사용 하지 않 으 면 대상 이 비어 있 더 라 도...다른 wait 상태 에서 기다 리 는 스 레 드 는 이 대상 의 통 지 를 받 지 못 했 기 때문에 wait 상태 에 있 습 니 다.이 대상 이 notify 나 notify All 을 보 낼 때 까지 기다 리 는 것 은 자물쇠 가 아 닌 notify 나 notify All 입 니 다.
3.스 레 드 총 수 를 제어 할 필요 가 없습니다.
호출 할 때마다 10 개의 스 레 드 작업 자 를 가 진 스 레 드 풀 을 만 듭 니 다.

public class TestService1 {
 public static void main(String[] args) {
 //   10   
 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
 pool.execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====1 test====");
  }
 }); 
 }
} 
public class TestService2 {
 public static void main(String[] args) {
 //   10   
 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
 pool.execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====2 test====");
  }
 });
 }
}
4 제어 루틴 총수
프로젝트 의 모든 스 레 드 호출 은 일반적으로 고정 작업 자 수 크기 의 스 레 드 풀 을 함께 사용 합 니 다.

import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import com.xy.pool.XYThreadPool; 
/**
 *          
 */
@Component
public class XYThreadManager { 
 private XYThreadPool<Runnable> executorPool; 
 @PostConstruct
 public void init() {
 executorPool = new XYThreadPool<Runnable>(10);
 } 
 public XYThreadPool<Runnable> getExecutorPool() {
 return executorPool;
 }
} 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; 
@Service("testService3")
public class TestService3 { 
 @Autowired
 private XYThreadManager threadManager; 
 public void test() {
 threadManager.getExecutorPool().execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====3 test====");
  }
 });
 }
} 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; 
@Service("testService4")
public class TestService4 { 
 @Autowired
 private XYThreadManager threadManager; 
 public void test() {
 threadManager.getExecutorPool().execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====4 test====");
  }
 });
 }
} 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext; 
public class TestMain { 
 @SuppressWarnings("resource")
 public static void main(String[] args) {
 ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml"); 
 TestService3 t3 = (TestService3) atc.getBean("testService3");
 t3.test(); 
 TestService4 t4 = (TestService4) atc.getBean("testService4");
 t4.test();
 } 
}
추가:사용자 정의 Thread PoolExecutor 스 레 드 풀
머리말
스 레 드 탱크 는 모두 가 사용 한 적 이 있 을 것 입 니 다.JDK 의 Executors 도 스 레 드 탱크 를 가지 고 있 습 니 다.그런데 어떻게 하면 가장 우아 한 방식 으로 스 레 드 풀 을 사용 할 수 있 을 지 생각해 보 셨 나 요?생산 환경 은 어떻게 자신의 스 레 드 탱크 를 배치 해 야 합 리 적 입 니까?
오늘 주말 에 마침 자신 이 생각 하 는'우아 함'을 정리 할 시간 이 있 습 니 다.문제 가 있 으 면 여러분 의 지적 을 환영 합 니 다.
스 레 드 탱크 사용 규칙
스 레 드 탱크 를 잘 사용 하려 면 몇 가지 규칙 을 따라 야 합 니 다.
스 레 드 개수 크기 설정
스 레 드 탱크 관련 매개 변수 설정
훅 으로 끼 워 넣 는 행동.
스 레 드 탱크 의 닫 기
스 레 드 탱크 설정 관련
스 레 드 탱크 크기 설정
이것 은 사실 면접 의 시험 점 입 니 다.많은 면접 관 들 이 스 레 드 탱크 core Size 의 크기 를 물 어 스 레 드 탱크 에 대한 이 해 를 고찰 합 니 다.
먼저 이 문제 에 대해 우 리 는 우리 의 수요 가 밀집 형 인지 IO 밀집 형 인지 명 확 히 해 야 한다.이 점 을 알 아야 우 리 는 스 레 드 탱크 의 수량 을 더욱 잘 설정 하여 제한 할 수 있다.
1.계산 집약 형:
말 그대로 매우 많은 CPU 컴 퓨 팅 자원 이 필요 하 다 는 것 이다.다 핵 CPU 시대 에 우 리 는 모든 CPU 핵심 을 컴 퓨 팅 에 참여 시 키 고 CPU 의 성능 을 충분히 이용 해 야 서버 설정 을 낭비 하지 않 은 셈 이다.아주 좋 은 서버 설정 에 단일 스 레 드 프로그램 을 실행 하고 있다 면 얼마나 큰 낭비 가 될 까?밀집 형 을 계산 하 는 응용 은 완전히 CPU 의 핵 수 에 의 해 작 동 되 기 때문에 그의 장점 을 완전히 발휘 하고 과도 한 스 레 드 컨 텍스트 전환 을 피하 기 위해 이상 적 인 방안 은 다음 과 같다.
스 레 드 수=CPU 핵 수+1,CPU 핵 수*2 로 설정 할 수도 있 지만 JDK 버 전 및 CPU 설정(서버 의 CPU 에 오 버 스 레 드 가 있 음)을 봐 야 합 니 다.
일반적으로 CPU*2 를 설정 하면 됩 니 다.
2.IO 집약 형
우리 가 현재 하고 있 는 개발 은 대부분이 WEB 응용 프로그램 으로 대량의 네트워크 전송 과 관련된다.뿐만 아니 라 데이터베이스 와 캐 시 간 의 상호작용 도 IO 와 관련된다.IO 가 발생 하면 라인 은 대기 상태 에 있 고 IO 가 끝나 면 데이터 가 준 비 된 후에 야 라인 은 계속 실 행 될 것 이다.따라서 여기 서 알 수 있 듯 이 IO 밀집 형 응용 에 대해 우 리 는 스 레 드 탱크 의 스 레 드 수량 을 많이 설정 할 수 있다.그러면 IO 를 기다 리 는 동안 스 레 드 는 다른 일 을 하고 병행 처리 효율 을 높 일 수 있다.그럼 이 스 레 드 탱크 의 데 이 터 량 은 마음대로 설정 할 수 있 습 니까?물론 아 닙 니 다.스 레 드 컨 텍스트 전환 은 대가 가 있다 는 것 을 기억 하 세 요.현재 공식 을 총 결 하여 IO 밀집 형 응용 에 대해
스 레 드 수=CPU 핵심 수/(1-블록 계수)이 블록 계 수 는 보통 0.8~0.9 사이 이 고 0.8 또는 0.9 를 취 할 수 있 습 니 다.
공식 을 사용 하면 쌍 핵 CPU 에 있어 서 비교적 이상 적 인 스 레 드 수 는 20 이다.물론 이것 은 절대적 인 것 이 아니 라 실제 상황 과 실제 업무 에 따라 조정 해 야 한다.final int poolSize=(int)(cpu Core/(1-0.9)
차단 계수 에 대해,즉 에서 언급 한 말 이 있 습 니 다.
차단 계수 에 대해 서 는 먼저 추측 하거나 부 드 러 운 분석 도구 나 자바.lang.management API 를 사용 하여 스 레 드 가 시스템/IO 작업 에 사용 되 는 시간 과 CPU 밀집 작업 에 소모 되 는 시간 비례 를 확인 할 수 있 습 니 다.
스 레 드 탱크 관련 매개 변수 설정
이 점 에 대해 서 는 상한 제한 이 없 는 설정 항목 을 선택 하지 말 아야 한 다 는 점 을 명심 해 야 한다.
Executors 에서 스 레 드 를 만 드 는 방법 을 권장 하지 않 는 이유 다.
예 를 들 어 Executors.new CachedThreadPool 의 설정 과 무한 대기 열 설정 은 예상 치 못 한 상황 으로 인해 스 레 드 탱크 에 시스템 이상 이 발생 하여 스 레 드 가 급증 하거나 작업 대기 열 이 계속 팽창 하고 메모리 소모 로 인해 시스템 이 붕괴 되 고 이상 합 니 다.저 희 는 사용자 정의 스 레 드 탱크 를 사용 하여 이 문 제 를 피 하 는 것 을 추천 합 니 다.이것 도 스 레 드 탱크 규범 을 사용 하 는 가장 중요 한 원칙 입 니 다!큰 잘못 이 없 도록 조심 하 세 요.절대 지나치게 자신 하지 마 세 요!
Executors 에서 스 레 드 풀 을 만 드 는 네 가지 방법 을 볼 수 있 습 니 다.

//      
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
  }
 
//         
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                   60L, TimeUnit.SECONDS,
                   new SynchronousQueue<Runnable>());
  }
다른 것 은 더 이상 열거 하지 않 겠 습 니 다.여러분 은 스스로 소스 코드 를 찾 아 볼 수 있 습 니 다.
둘째,스 레 드 수량 과 스 레 드 의 남 은 회수 시간 을 합 리 적 으로 설정 하고 구체 적 인 작업 수행 주기 와 시간 에 따라 설정 하여 빈번 한 회수 와 생 성 을 피해 야 한다.비록 우리 가 스 레 드 탱크 를 사용 하 는 목적 은 시스템 성능 과 스루풋 을 향상 시 키 는 것 이지 만 시스템 의 안정성 도 고려 해 야 한다.그렇지 않 으 면 기대 할 수 없 는 문제 가 발생 하면 매우 번 거 로 울 것 이다!
셋째,실제 장면 에 따라 자신 에 게 적용 되 는 거부 전략 을 선택한다.보상 을 하고,JDK 가 지원 하 는 자동 보상 메커니즘 을 함부로 사용 하지 마 세 요!가능 한 한 사용자 정의 거부 정책 을 사용 하여 폭로 하 세 요!
넷 째,스 레 드 탱크 거부 정책,사용자 정의 거부 정책 은 Rejected ExecutionHandler 인 터 페 이 스 를 실현 할 수 있 습 니 다.
JDK 자체 거부 정책 은 다음 과 같 습 니 다.
AbortPolicy:시스템 의 정상 적 인 작 동 을 막 기 위해 이상 을 직접 던 집 니 다.
CallerRunsPolicy:스 레 드 탱크 가 닫 히 지 않 으 면 이 정책 은 호출 자 스 레 드 에서 현재 버 려 진 작업 을 직접 실행 합 니 다.
DiscardOldestPolicy:가장 오래된 요청 을 버 리 고 현재 작업 을 다시 제출 하려 고 합 니 다.
DiscardPolicy:처리 할 수 없 는 작업 을 버 리 고 처리 하지 않 습 니 다.
훅 활용
Hook 을 이용 하여 스 레 드 탱크 의 실행 궤적 을 남 깁 니 다:
Thread PoolExecutor 는 proctected 형식 으로 덮어 쓸 수 있 는 갈고리 방법 을 제공 하여 사용자 가 작업 을 수행 하기 전에 뭔 가 를 할 수 있 도록 합 니 다.우 리 는 이 를 통 해 ThreadLocal 초기 화,통계 정보 수집,기록 로그 등 작업 을 실현 할 수 있 습 니 다.이런 훅 은 beforeExecute 와 after Execute 와 같다.또 하나의 Hook 은 작업 이 실 행 될 때 rerminated 와 같은 논 리 를 삽입 할 수 있 습 니 다.
훅 방법 이 실 패 했 을 경우 내부 작업 라인 의 실행 이 실패 하거나 중 단 됩 니 다.
저 희 는 beforeExecute 와 after Execute 를 사용 하여 스 레 드 전과 후의 운행 상황 을 기록 할 수 있 고 실행 이 완 료 된 상 태 를 ELK 등 로그 시스템 에 직접 기록 할 수 있 습 니 다.
스 레 드 풀 닫 기
내용 은 스 레 드 탱크 가 인용 되 지 않 고 작업 스 레 드 수가 0 일 때 스 레 드 탱크 가 종 료 됩 니 다.우 리 는 또한 shutdown 을 호출 하여 스 레 드 탱크 를 수 동 으로 중지 할 수 있다.만약 우리 가 shutdown 을 호출 하 는 것 을 잊 어 버 리 면,스 레 드 자원 이 방출 되 기 위해 서,우 리 는 keepAliveTime 과 allowCoreThreadTimeOut 을 사용 하여 목적 을 달성 할 수 있 습 니 다!
물론,안정 적 인 방식 은 가상 컴퓨터 Runtime.getRuntime().addShutdown Hook 방법 을 사용 하여 스 레 드 탱크 의 닫 는 방법 을 수 동 으로 호출 하 는 것 입 니 다!
스 레 드 탱크 사용 실례
스 레 드 탱크 핵심 코드:

public class AsyncProcessQueue { 
 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
 /**
 * Task    <br>
 *               Executor      <br>
 */
 public static class TaskWrapper implements Runnable {
 private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class); 
 private final Runnable gift; 
 public TaskWrapper(final Runnable target) {
  this.gift = target;
 } 
 @Override
 public void run() {
 
  //     ,    Executor       
  if (gift != null) {
 
  try {
   gift.run();
  } catch (Exception e) {
   _LOGGER.error("Wrapped target execute exception.", e);
  }
  }
 }
 } 
 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 /**
 *        
 * 
 * @param task
 * @return
 */
 public static boolean execute(final Runnable task) {
 return AsyncProcessor.executeTask(new TaskWrapper(task));
 }
}
public class AsyncProcessor {
 static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class);
 
 /**
 *        <br>
 */
 private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
 
 /**
 *        
 */
 private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d";
 
 /**
 *       
 */
 private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
  .daemon(true).build();
 
 /**
 *       
 */
 private static final int DEFAULT_SIZE = 500;
 
 /**
 *         
 */
 private static final long DEFAULT_KEEP_ALIVE = 60L;
 
 /**NewEntryServiceImpl.java:689
 * Executor
 */
 private static ExecutorService executor;
 
 /**
 *     
 */
 private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE); 
 static {
 //    Executor
 //                 4  
 try {
  executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
   TimeUnit.SECONDS, executeQueue, FACTORY); 
  //        
  Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 
  @Override
  public void run() {
   AsyncProcessor.LOGGER.info("AsyncProcessor shutting down."); 
   executor.shutdown(); 
   try { 
   //   1     
   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
    AsyncProcessor.LOGGER.error("AsyncProcessor shutdown immediately due to wait timeout.");
    executor.shutdownNow();
   }
   } catch (InterruptedException e) {
   AsyncProcessor.LOGGER.error("AsyncProcessor shutdown interrupted.");
   executor.shutdownNow();
   } 
   AsyncProcessor.LOGGER.info("AsyncProcessor shutdown complete.");
  }
  }));
 } catch (Exception e) {
  LOGGER.error("AsyncProcessor init error.", e);
  throw new ExceptionInInitializerError(e);
 }
 } 
 /**
 *         
 */
 private AsyncProcessor() {
 } 
 /**
 *     ,      <br>
 *            {@link Executer}   
 * 
 * @param task
 * @return
 */
 public static boolean executeTask(Runnable task) { 
 try {
  executor.execute(task);
 } catch (RejectedExecutionException e) {
  LOGGER.error("Task executing was rejected.", e);
  return false;
 } 
 return true;
 } 
 /**
 *     ,             <br>
 *       ,    {@link }
 * 
 * @param task
 * @return
 */
 public static <T> Future<T> submitTask(Callable<T> task) { 
 try {
  return executor.submit(task);
 } catch (RejectedExecutionException e) {
  LOGGER.error("Task executing was rejected.", e);
  throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
 }
 }
}
사용 방법:

AsyncProcessQueue.execute(new Runnable() {
     @Override
     public void run() {
        //do something
    }
});
자신의 사용 장면 에 따라 유연 하 게 변경 할 수 있 습 니 다.저 는 beforeExecute 와 after Execute,거부 전략 을 사용 하지 않 았 습 니 다.이상 은 개인 적 인 경험 이 므 로 여러분 에 게 참고 가 되 기 를 바 랍 니 다.여러분 들 도 저 희 를 많이 응원 해 주시 기 바 랍 니 다.만약 잘못 이 있 거나 완전히 고려 하지 않 은 부분 이 있다 면 아낌없이 가르침 을 주시 기 바 랍 니 다.

좋은 웹페이지 즐겨찾기