JAVA 병렬 - ThreadPoolExecutor 스레드 풀 사용

5373 단어
블로그
AVA병발-Executor 임무 집행 프레임워크에서 말했듯이 Executors에는 4가지 공장 방법이 있는데 그것이 바로 newFixedThreadPool, newCachedThreadPool, newSingleThreadExecutor, newScheduledThreadPool이다.
이러한 공장 방법은 사실 모두 서로 다른 정책을 실행하는 ThreadPoolExecutor를 만들었다.
사용자 지정 정책을 실행하는 ThreadPoolExecutor를 만들 수 있습니다.ThreadPoolExecutor의 공통 구조 함수는 다음과 같습니다.
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

위의 구조 함수에는 다음과 같은 7개의 매개변수가 있습니다.
스레드 탱크 기본 수량, 스레드 탱크 최대 수량, 스레드 활성화 시간, 스레드 활성화 시간 단위, 작업 대기 대기열, 스레드 공장, 포화 전략.
스레드 탱크의 기본 수량: 스레드 탱크에 상주하는 스레드의 수량.
스레드 탱크 최대 수량: 스레드 탱크가 만들 수 있는 스레드 최대 수량입니다.
코드에서 보통 스레드 탱크의 크기를 고정시키지 않고 특정한 설정 메커니즘을 통해 설정하거나 런타임을 통해 설정한다.availableProcessors에서 동적으로 설정합니다.일반적으로 같은 스레드 탱크는 같은 유형의 작업만 수행하는 데 사용되기 때문에 전략적 조정을 더욱 잘 할 수 있다.
스레드 풀의 크기를 정확하게 설정하려면 CPU 수(핵심 수), 메모리 크기, 작업 유형(계산 집약형 또는 IO 집약형), JDBC 연결과 같은 희귀 자원이 필요한지 등을 고려해야 한다.
일반적으로 스레드 풀의 크기가 Ncpu+1(코어 +1)으로 설정된 집약적 작업을 계산합니다.IO 집약형 작업으로 스레드 풀의 크기를 Ncpu*(W+C)/C(W는 평균 IO 대기 시간, C는 평균 계산 시간)로 설정합니다.JAVA에서는 런타임을 통과할 수 있습니다.getRuntime().availableProcessors () 를 사용하면 CPU의 수량 (즉 전체 핵심 수) 을 얻을 수 있습니다.때때로 스레드 탱크에서 수행하는 임무는 일부 희귀한 자원을 필요로 한다. 이때: 모든 임무가 이 자원에 대한 수요량을 계산한 다음에 자원의 총량을 매 임무의 수요량으로 나누면 얻은 결과는 병발할 수 있는 임무 총량(즉 최대 스레드 수량)이다.
스레드 활성화 시간: 현재 스레드 탱크의 스레드 수량이 기본 수량보다 많고 스레드의 여가 시간이 활성화 시간보다 많으면 이 스레드들은 소각됩니다.
작업 대기열: 작업의 도착 속도가 스레드 탱크의 처리 속도를 초과하면 실행을 기다리는 작업은 대기 대기열에 저장됩니다. 이것은 캐시 정책입니다.그렇지만
만약 작업이 계속 급증한다면, 작업 대기열은 메모리 자원을 소모할 수 있습니다. 이 때 경계 대기열을 사용하는 것을 고려하십시오.ThreadPoolExecutor의 기본 작업 대기열 방법은 세 가지가 있는데 그것이 바로 무계 대기열, 유계 대기열, 동기화 대기열이다.
포화 정책: 작업 대기열이 가득 차면 다음 작업에 포화 정책을 사용합니다. (닫힌 Executor에 작업이 제출될 때도 포화 정책을 사용합니다.)
자주 사용하는 포화 정책은 AbortPolicy,CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy 네 가지가 있다.AbortPolicy는 확인되지 않은 RejectedExecutionException을 제거하는 기본 포화 정책입니다.CallerRunnsPolicy는 호출자 루틴을 사용하여 루틴 풀에서 실행된 작업을 수행합니다.DiscardPolicy 는 현재 작업을 포기합니다.DiscardOldestPolicy는 대기열에서 대기열의 첫 번째 작업을 버리고 현재 작업을 대기열에 추가합니다.포화 정책은 다음과 같이 설정됩니다.
ExecutorService es=new ThreadPoolExecutor(3, 5, 0L, TimeUnit.MILLISECONDS,
				new LinkedBlockingQueue<Runnable>(), 
				new ThreadPoolExecutor.CallerRunsPolicy());

공장 방법: 스레드 탱크에 새로운 스레드가 필요할 때마다 스레드 공장 방법을 통해 완성한다.사용자는 자신의 공장 방법을 사용자 정의할 수 있고 ThreadFactory 인터페이스를 실현하면 된다.
public interface ThreadFactory {
    Thread newThread(Runnable r);
}

BeforeExecute (), afterExecute (),terminated () 방법 등 상하문 방법도 제공했다.여기서: beforeExecute () 는execute () 이전에 호출되고,afterExecute ()는execute () 이후에 호출되며,terminated ()는 온라인 프로세스가 종료될 때 호출됩니다.다음 예는 다음과 같습니다.

class MyThreadPoolExecutor extends ThreadPoolExecutor{
	public MyThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            RejectedExecutionHandler handler) {
		// TODO Auto-generated constructor stub
		super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,handler);
	}
	
	private static ThreadLocal<Long> startTime=new ThreadLocal<Long>();
	private AtomicLong taskNums=new AtomicLong();
	private AtomicLong taskTimes=new AtomicLong();
	
	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		super.beforeExecute(t, r);
		startTime.set(System.nanoTime());
	}
	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		long time=System.nanoTime()-startTime.get();
		taskNums.incrementAndGet();
		taskTimes.addAndGet(time);
	}
	@Override
	protected void terminated() {
		super.terminated();
		System.out.println(taskTimes.get());
		System.out.println(taskNums.get());
		System.out.println(taskTimes.get()/taskNums.get());
	}
}

위의 스레드 탱크 MyThreadPoolExecutor는 작업 수, 총 작업 시간을 통계하는 기능을 추가했습니다.
이 중 모든 작업의 시작 시간과 종료 시간은 같은 라인의beforeExecute () 와afterExecute () 방법에서 얻을 수 있기 때문에ThreadLocal 형식의 변수를 사용하여 시작 시간을 기록합니다.작업 수와 작업 총 시간은 공유 변수로 라인의 안전성을 확보해야 하기 때문에 원자 변수를 사용한다.
또한 신호 양을 사용하여 작업을 전송하는 속도를 제어할 수도 있습니다.
class MyExecutor {
	private Executor executor;
	private Semaphore sem;
	public MyExecutor(Executor exe,int taskNums){
		this.executor=exe;
		this.sem=new Semaphore(taskNums);
	}
	public void submit(final Runnable task) throws InterruptedException{
		try{
			sem.acquire();
			executor.execute(new Runnable() {
				@Override
				public void run() {
					try{
						task.run();						
					}finally{
						sem.release();
					}
				}
			});
			//      AbortPolicy  ,      RejectedExecutionExecution
		}catch(RejectedExecutionException e){
			sem.release();
		}
	}
}

좋은 웹페이지 즐겨찾기