ThreadPoolExecutor 설정

[본 고 는 자바 Concurrency In Practice C08 에 대한 요약 과 요약 입 니 다. 전 재 는 작가 와 출처 를 밝 혀 주 십시오. 오류 가 있 으 면 댓 글 에서 지적 해 주 십시오.]
Executors 의 정적 방법 new CachedThreadPool,new Fixed ThreadPool,new Scheduled ThreadPool 이 되 돌아 오 는 스 레 드 풀 은 모두 ThreadPoolExecutor 대상 또는 하위 클래스 대상 입 니 다.ThreadPoolExecutor 는 다양한 설정 을 제공 하여 실제 에 따라 적합 한 스 레 드 풀 을 맞 출 수 있 습 니 다.
 
스 레 드 생 성 및 소각
Thread PoolExecutor 구조 함수 중의 core PoolSize,maximum PoolSize,keepAliveTime 매개 변 수 는 스 레 드 의 생 성 과 소각 과 관련 이 있 습 니 다. 
core PoolSize 는 ThreadPoolExecutor 에서 가지 고 있 는 핵심 스 레 드 수 를 지정 합 니 다.task 대기 열 이 가득 찬 경 우 를 제외 하고 ThreadPoolExecutor 는 핵심 스 레 드 수 를 초과 하 는 스 레 드 를 만 들 지 않 습 니 다(core PoolSize 가 0 일 때 특수 한 상황 입 니 다.이때 task 대기 열 이 포화 되 지 않 아 도 스 레 드 탱크 에 task 를 처음 제출 할 때 새로운 스 레 드 를 만 듭 니 다).핵심 스 레 드 는 만 들 면 소각 되 지 않 습 니 다.allowCoreThreadTimeOut(true)을 설정 하거나 스 레 드 풀 을 닫 지 않 는 한.
maximumPoolSize 지정 스 레 드 탱크 에 있 는 최대 스 레 드 수 입 니 다.핵심 스 레 드 수 를 초과 한 스 레 드 에 대해 서 는 지정 한 시간 초과 에 사용 되 지 않 으 면 삭 제 됩 니 다.
keepAliveTime 시간 초과 지정.
Executors 클래스 의 정적 방법 으로 스 레 드 탱크 의 소스 코드 를 만 듭 니 다:
public static ExecutorService newCachedThreadPool() {
	//       0,       Integer.MAX_VALUE,      60s
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
	//                     nThreads,      0
	return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
			new LinkedBlockingQueue<Runnable>());
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
	//            ,       Integer.MAX_VALUE,      0
	return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue());
} 

 
task 대기 열
스 레 드 탱크 내부 에 task 대기 열 이 있 습 니 다.task 의 제출 속도 가 task 의 실행 속 도 를 초과 할 때 task 는 task 대기 열 에 캐 시 되 어 스 레 드 가 사용 가능 할 때 실 행 됩 니 다.ThreadPoolExecutor 는 생 성 할 때 task 대기 열 을 지정 할 수 있 습 니 다.개발 자 는 보통 세 가지 선택 이 있 습 니 다.경계 대기 열 이 있 습 니 다.무한 대기 열 및 동기 화 대기 열.Executors.new Fixed ThreadPool 과 Executors.new Scheduled ThreadPool 이 돌아 오 는 ThreadPoolExecutor 대상 은 무한 대기 열 을 사용 하고 Executors.new CashedThreadPool 이 돌아 오 는 ThreadPoolExecutor 대상 은 동기 화 대기 열 을 사용 합 니 다.
스 레 드 수가 많 지 않 은 스 레 드 탱크 에 용량 이 큰 대기 열(또는 무한 대기 열)을 지정 하면 스 레 드 간 전환,CPU 등의 소 모 를 줄 이 는 데 도움 이 되 며,대 가 는 스루풋 이 떨 어 질 수 있 습 니 다.경계 대기 열 을 사용 하면 대기 열 이 채 워 질 수 있 습 니 다.이 때 지정 한 포화 전략 에 따라 처리 합 니 다(다음 설명 참조).
스 레 드 수가 많은 스 레 드 탱크 에 대해 서 는 동기 화 대기 열 을 사용 할 수 있 습 니 다.동기 화 대기 열(SynchronousQueue)은 사실 하나의 대기 열 이 라 고 할 수 없습니다.동기 화 대기 열 에 캐 시 역할 이 없 기 때 문 입 니 다.동기 화 대기 열 을 사용 할 때 task 가 제출 될 때 스 레 드 탱크 의 스 레 드 로 직접 연결 합 니 다.이 스 레 드 탱크 에 사용 가능 한 스 레 드 가 없 으 면스 레 드 탱크 는 새로운 스 레 드 를 만 들 것 입 니 다.스 레 드 탱크 가 새로운 스 레 드 를 만 들 수 없다 면(예 를 들 어 스 레 드 수가 maximumPoolSize 에 도 착 했 습 니 다)지정 한 포화 전략 에 따라 처리 합 니 다(같은 설명 참조).
 
포화 전략
스 레 드 탱크 가 경계 대기 열 을 사용 하면 경계 대기 열 이 가득 찼 을 때 task 를 계속 제출 할 때 포화 정책 이 실 행 됩 니 다.
스 레 드 탱크 가 동기 화 대기 열 을 사용 하면 스 레 드 탱크 가 새로운 스 레 드 를 만 들 수 없 을 때 포화 정책 이 실 행 됩 니 다.
스 레 드 탱크 가 닫 힌 후에 도 task 를 제출 할 때 포화 정책 이 실 행 됩 니 다.
ThreadPoolExecutor.set Rejected Execution Handler 방법 은 포화 정책 을 설정 하 는 데 사 용 됩 니 다.이 방법 은 Rejected Execution Handler 대상 을 매개 변수 로 받 아들 입 니 다.Rejected Execution Handler 는 하나의 방법 만 정의 합 니 다.rejected Execution(Runnable r,ThreadPoolExecutor).rejected Execution 방법 은 포화 정책 이 실 행 될 때 시스템 에서 반 환 됩 니 다.
Thread PoolExecutor 클래스 에서 여러 Rejected Execution Handler 의 실현 클래스 를 미리 정 의 했 습 니 다:AbortPolicy,CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy.
AbortPolicy 는 기본 포화 정책 입 니 다.rejected Execution 방법 은 다음 과 같 습 니 다.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	throw new RejectedExecutionException();
} 

기본적으로 포화 정책 을 실행 할 때 Rejected Execution Exception 이상 을 던 집 니 다.
CallerRuns Policy.포화 시 task 를 제출 하 는 스 레 드 에서 task 를 실행 합 니 다.스 레 드 탱크 의 스 레 드 가 아 닌 task 를 실행 합 니 다.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	if (!e.isShutdown()) {
		r.run();
	}
}

CallerRunsPolicy 를 사용 하 는 예:
class LifecycleWebServer {
	// MAX_THREAD_COUNT MAX_QUEUE_COUNT             
	private static final int MAX_THREAD_COUNT = 100;
	private static final int MAX_QUEUE_COUNT = 1000;

	//         task  ,        ,        
	private final ThreadPoolExecutor exec = new ThreadPoolExecutor(0, MAX_THREAD_COUNT, 60L, TimeUnit.SECONDS,
			new ArrayBlockingQueue<Runnable>(MAX_QUEUE_COUNT));

	public void start() throws IOException {
		//        CallerRunsPolicy
		exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		ServerSocket socket = new ServerSocket(80);
		while (!exec.isShutdown()) {
			try {
				final Socket conn = socket.accept();
				exec.execute(new Runnable() {
					public void run() {
						handleRequest(conn);
					}
				});
			} catch (RejectedExecutionException e) {
				if (!exec.isShutdown())
					log("task submission rejected", e);
			}
		}
	}

	public void stop() {
		exec.shutdown();
	}

	void handleRequest(Socket connection) {
		Request req = readRequest(connection);
		if (isShutdownRequest(req))
			stop();
		else
			dispatchRequest(req);
	}
	
	public static void main(String[] args) {
		LifecycleWebServer server = new LifecycleWebServer();
		try {
			//  main     server
			server.start();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
} 

LifecycleWebServer 의 스 레 드 풀 은 CallerRunsPolicy 를 포화 정책 으로 사용 합 니 다.스 레 드 풀 이 포화 되 었 을 때 main 스 레 드 가 스 레 드 풀 에 task 를 제출 하면 task 는 main 에서 실 행 됩 니 다.main 스 레 드 가 task 를 실행 하 는 데 시간 이 걸 립 니 다.그러면 스 레 드 풀 에 숨 을 쉴 수 있 는 기 회 를 주 고 main 스 레 드 는 task 를 실행 하 는 시간 내 에 socket 연결 을 받 아들 일 수 없습니다.따라서 socket 연결 요청 은 tcp 층 에 캐 시 됩 니 다.server 과부하 가 오래 지속 되면 tcp 층 의 캐 시 가 부족 합 니 다.tcp 캐 시 는 정책 에 따라 일부 요청 을 버 립 니 다.이렇게 되면,전체 시스템 의 과부하 압력 이 점차 밖으로 확산 되 고 있 습 니 다.스 레 드 탱크-스 레 드 탱크 의 대기 열-main 스 레 드-tcp 층-client.이러한 시스템 은 과부하 가 발생 할 때 비교적 우아 합 니 다.너무 많은 요청 으로 인해 시스템 자원 이 소모 되 지 않 을 뿐만 아니 라 과부하 가 발생 할 때 서 비 스 를 거부 하지 않 습 니 다.장시간 시스템 이 과부하 되 었 을 때 만 클 라 이언 트 가 연결 할 수 없 는 상황 이 발생 합 니 다.
DiscardPolicy.이 정책 은 최근 에 제출 한 task 를 버 립 니 다.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	//   ,       
} 

DiscardOldestPolicy.이 정책 은 대기 열 에 있 는 task 를 버 리 고 최신 task 를 다시 제출 하려 고 합 니 다.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
	e.getQueue().poll();
	e.execute(r);
    }
} 

DiscardOldestPolicy 와 Priority BlockingQueue 를 결합 하여 사용 할 때 좋 지 않 은 결 과 를 가 져 올 수 있 습 니 다.Priority BlockingQueue 에 있 는 task 는 우선 순위 가 가장 높 은 task 이기 때문에 포화 가 발생 할 때 오히려 우선 순위 가 높 은 task 를 버 리 는 것 이 수요 에 부합 되 지 않 을 수 있 습 니 다.
Thread PoolExecutor 는 포화 시 차단 정책 을 제공 하지 않 았 지만 개발 자 는 Semaphore 와 결합 하여 실현 할 수 있 습 니 다.
public class BoundedExecutor {
	private final Executor exec;
	private final Semaphore semaphore;

	public BoundedExecutor(Executor exec, int bound) {
		this.exec = exec;
		//      permit   
		this.semaphore = new Semaphore(bound);
	}

	public void submitTask(final Runnable command) throws InterruptedException {
		//   task    permit,        permit,   submitTask       ,    permit  
		semaphore.acquire();
		try {
			exec.execute(new Runnable() {
				public void run() {
					try {
						command.run();
					} finally {
						//      ,   task   permit
						semaphore.release();
					}
				}
			});
		} catch (RejectedExecutionException e) {
			//         ,      permit
			semaphore.release();
		}
	}
}

 
ThreadFactory
Thread PoolExecutor 를 만 들 때 ThreadFactory 를 지정 할 수 있 습 니 다.스 레 드 탱크 가 새로운 스 레 드 를 만 들 때 ThreadFactory 의 new Thread 방법 을 사용 합 니 다.기본 ThreadFactory 가 만 든 스 레 드 는 nonDaemon 이 고 스 레 드 우선 순 위 는 NORM 입 니 다.PRIORITY 의 스 레 드 와 식별 가능 한 스 레 드 이름 을 지정 합 니 다.
public Thread newThread(Runnable r) {
	Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
	if (t.isDaemon())
		t.setDaemon(false);
	if (t.getPriority() != Thread.NORM_PRIORITY)
		t.setPriority(Thread.NORM_PRIORITY);
	return t;
} 

개발 자 는 자신의 필요 에 따라 ThreadPoolExecutor 에 사용자 정의 ThreadFactory 를 지정 할 수 있 습 니 다.예 를 들 어:
public class MyThreadFactory implements ThreadFactory {
	private final String poolName;

	public MyThreadFactory(String poolName) {
		this.poolName = poolName;
	}

	public Thread newThread(Runnable runnable) {
		return new MyAppThread(runnable, poolName);
	}
}

public class MyAppThread extends Thread {
	public static final String DEFAULT_NAME = "MyAppThread";
	private static volatile boolean debugLifecycle = false;
	private static final AtomicInteger created = new AtomicInteger();
	private static final AtomicInteger alive = new AtomicInteger();
	private static final Logger log = Logger.getAnonymousLogger();

	public MyAppThread(Runnable r) {
		this(r, DEFAULT_NAME);
	}

	public MyAppThread(Runnable runnable, String name) {
		//      Thread       
		super(runnable, name + "-" + created.incrementAndGet());
		//   UncaughtExceptionHandler. UncaughtExceptionHandler uncaughtException                      
		setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
			public void uncaughtException(Thread t, Throwable e) {
				log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
			}
		});
	}

	public void run() {
		// Copy debug flag to ensure consistent value throughout. 
		boolean debug = debugLifecycle;
		if (debug)
			log.log(Level.FINE, "Created " + getName());
		try {
			alive.incrementAndGet();
			super.run();
		} finally {
			alive.decrementAndGet();
			if (debug)
				log.log(Level.FINE, "Exiting " + getName());
		}
	}

	public static int getThreadsCreated() {
		return created.get();
	}

	public static int getThreadsAlive() {
		return alive.get();
	}

	public static boolean getDebug() {
		return debugLifecycle;
	}

	public static void setDebug(boolean b) {
		debugLifecycle = b;
	}
}

 
확장 ThreadPoolExecutor
Thread PoolExecutor 류 는 여러 개의'갈고리'방법 을 제공 하여 하위 클래스 의 실현 을 제공 합 니 다.예 를 들 어 beforeExecute,after Execute,terminated 등 입 니 다.이른바'갈고리'는 기본 클래스 가 예약 한 것 을 말 하지만 구체 적 인 실현 방법 을 제공 하지 않 았 습 니 다.그 방법 체 는 비어 있 습 니 다.하위 클래스 는 수요 에 따라'갈고리'에 구체 적 인 실현 을 제공 할 수 있 습 니 다.
beforeExecute 와 after Execute 방법 은 각각 task 실행 전후 에 호출 됩 니 다.
private void runTask(Runnable task) {
	final ReentrantLock runLock = this.runLock;
	runLock.lock();
	try {
		if (runState < STOP && Thread.interrupted() && runState >= STOP)
			thread.interrupt();
		boolean ran = false;
		beforeExecute(thread, task);
		try {
			task.run();
			ran = true;
			afterExecute(task, null);
			++completedTasks;
		} catch (RuntimeException ex) {
			if (!ran)
				afterExecute(task, ex);
			throw ex;
		}
	} finally {
		runLock.unlock();
	}
} 

beforeExecute 와 after Execute 방법 은 로그,통계 데이터 등 을 기록 하 는 데 사용 할 수 있 습 니 다.
terminated 방법 온라인 탱크 가 닫 힌 후 호출 됩 니 다.terminated 방법 은 스 레 드 탱크 가 신청 한 자원 을 방출 하 는 데 사용 할 수 있 습 니 다.

좋은 웹페이지 즐겨찾기