자바 사용자 정의 스 레 드 탱크 와 스 레 드 총수 제어 작업
지 화 는 흔히 볼 수 있 는 사상 이다.스 레 드 탱크 는 매우 전형 적 인 지 화 실현 이다.도 자바 중의 스 레 드 탱크 를 크게 설명 했다.본 고 는 간단 한 스 레 드 탱크 를 실현 한다.
2 핵심 클래스
【1】인터페이스 정의
public interface IThreadPool<Job extends Runnable> {
/**
*
*/
public void shutAlldown();
/**
*
*
* @param job
*/
public void execute(Job job);
/**
*
*
* @param addNum
*/
public void addWorkers(int addNum);
/**
*
*
* @param reduceNum
*/
public void reduceWorkers(int reduceNum);
}
【2】실현 류스 레 드 탱크 의 핵심 은 작업 목록 1 개 와 작업 자 목록 1 개 를 유지 하 는 것 입 니 다.
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> {
//
private static int DEAFAULT_SIZE = 5;
//
private static int MAX_SIZE = 10;
//
private LinkedList<Job> tasks = new LinkedList<Job>();
//
private List<Worker> workers = Collections
.synchronizedList(new ArrayList<Worker>());
/**
*
*/
public XYThreadPool() {
initWokers(DEAFAULT_SIZE);
}
/**
*
*
* @param threadNums
*/
public XYThreadPool(int workerNum) {
workerNum = workerNum <= 0 ? DEAFAULT_SIZE
: workerNum > MAX_SIZE ? MAX_SIZE : workerNum;
initWokers(workerNum);
}
/**
*
*
* @param threadNums
*/
public void initWokers(int threadNums) {
for (int i = 0; i < threadNums; i++) {
Worker worker = new Worker();
worker.start();
workers.add(worker);
}
//
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
shutAlldown();
}
});
}
@Override
public void shutAlldown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
@Override
public void execute(Job job) {
synchronized (tasks) {
// ,
tasks.addLast(job);
tasks.notifyAll();
}
}
@Override
public void addWorkers(int addNum) {
// ,
if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {
initWokers(addNum);
} else {
System.out.println("addNum too large");
}
}
@Override
public void reduceWorkers(int reduceNum) {
if ((workers.size() - reduceNum <= 0))
System.out.println("thread num too small");
else {
//
int count = 0;
while (count != reduceNum) {
for (Worker w : workers) {
w.shutdown();
count++;
}
}
}
}
/**
*
*/
class Worker extends Thread {
private volatile boolean flag = true;
@Override
public void run() {
while (flag) {
Job job = null;
// ( woker , , )
synchronized (tasks) {
//
while (tasks.isEmpty()) {
try {
// , , notify
tasks.wait();
System.out.println("block when tasks is empty");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//
job = tasks.removeFirst();
System.out.println("get job:" + job + ",do biz");
job.run();
}
}
}
public void shutdown() {
flag = false;
}
}
}
(1)wait()방법 을 호출 할 때 스 레 드 는 대상 자 물 쇠 를 포기 하고 이 대상 을 기다 리 는 잠 금 풀 에 들 어 갑 니 다.이 대상 에 게 notify()방법 을 호출 한 후에 야 이 스 레 드 는 대상 잠 금 풀 에 들 어가 준비 합 니 다.(2)Object 의 방법:void notify():이 대상 을 기다 리 고 있 는 스 레 드 를 깨 웁 니 다.void notify All():이 대상 을 기다 리 고 있 는 모든 스 레 드 를 깨 웁 니 다.
notify All 은 이 대상 에서 notify 를 기다 리 던 모든 스 레 드 를 wait 상태 에서 종료 시 키 고 이 대상 의 잠 금 을 기다 리 는 것 으로 바 꾸 었 습 니 다.이 대상 이 잠 금 이 풀 리 면 경쟁 합 니 다.
notify 는 wait 상태 스 레 드 를 선택 하여 알림 을 하고 대상 의 자 물 쇠 를 가 져 옵 니 다.그러나 이 대상 에 게 notify 를 기다 리 는 다른 스 레 드 들 은 놀 라 지 않 습 니 다.첫 번 째 스 레 드 가 실 행 된 후에 대상 의 자 물 쇠 를 풀 어 줍 니 다.이때 이 대상 이 notify 문 구 를 다시 사용 하지 않 으 면 대상 이 비어 있 더 라 도...다른 wait 상태 에서 기다 리 는 스 레 드 는 이 대상 의 통 지 를 받 지 못 했 기 때문에 wait 상태 에 있 습 니 다.이 대상 이 notify 나 notify All 을 보 낼 때 까지 기다 리 는 것 은 자물쇠 가 아 닌 notify 나 notify All 입 니 다.
3.스 레 드 총 수 를 제어 할 필요 가 없습니다.
호출 할 때마다 10 개의 스 레 드 작업 자 를 가 진 스 레 드 풀 을 만 듭 니 다.
public class TestService1 {
public static void main(String[] args) {
// 10
XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("====1 test====");
}
});
}
}
public class TestService2 {
public static void main(String[] args) {
// 10
XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("====2 test====");
}
});
}
}
4 제어 루틴 총수프로젝트 의 모든 스 레 드 호출 은 일반적으로 고정 작업 자 수 크기 의 스 레 드 풀 을 함께 사용 합 니 다.
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import com.xy.pool.XYThreadPool;
/**
*
*/
@Component
public class XYThreadManager {
private XYThreadPool<Runnable> executorPool;
@PostConstruct
public void init() {
executorPool = new XYThreadPool<Runnable>(10);
}
public XYThreadPool<Runnable> getExecutorPool() {
return executorPool;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("testService3")
public class TestService3 {
@Autowired
private XYThreadManager threadManager;
public void test() {
threadManager.getExecutorPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("====3 test====");
}
});
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("testService4")
public class TestService4 {
@Autowired
private XYThreadManager threadManager;
public void test() {
threadManager.getExecutorPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("====4 test====");
}
});
}
}
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestMain {
@SuppressWarnings("resource")
public static void main(String[] args) {
ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml");
TestService3 t3 = (TestService3) atc.getBean("testService3");
t3.test();
TestService4 t4 = (TestService4) atc.getBean("testService4");
t4.test();
}
}
추가:사용자 정의 Thread PoolExecutor 스 레 드 풀머리말
스 레 드 탱크 는 모두 가 사용 한 적 이 있 을 것 입 니 다.JDK 의 Executors 도 스 레 드 탱크 를 가지 고 있 습 니 다.그런데 어떻게 하면 가장 우아 한 방식 으로 스 레 드 풀 을 사용 할 수 있 을 지 생각해 보 셨 나 요?생산 환경 은 어떻게 자신의 스 레 드 탱크 를 배치 해 야 합 리 적 입 니까?
오늘 주말 에 마침 자신 이 생각 하 는'우아 함'을 정리 할 시간 이 있 습 니 다.문제 가 있 으 면 여러분 의 지적 을 환영 합 니 다.
스 레 드 탱크 사용 규칙
스 레 드 탱크 를 잘 사용 하려 면 몇 가지 규칙 을 따라 야 합 니 다.
스 레 드 개수 크기 설정
스 레 드 탱크 관련 매개 변수 설정
훅 으로 끼 워 넣 는 행동.
스 레 드 탱크 의 닫 기
스 레 드 탱크 설정 관련
스 레 드 탱크 크기 설정
이것 은 사실 면접 의 시험 점 입 니 다.많은 면접 관 들 이 스 레 드 탱크 core Size 의 크기 를 물 어 스 레 드 탱크 에 대한 이 해 를 고찰 합 니 다.
먼저 이 문제 에 대해 우 리 는 우리 의 수요 가 밀집 형 인지 IO 밀집 형 인지 명 확 히 해 야 한다.이 점 을 알 아야 우 리 는 스 레 드 탱크 의 수량 을 더욱 잘 설정 하여 제한 할 수 있다.
1.계산 집약 형:
말 그대로 매우 많은 CPU 컴 퓨 팅 자원 이 필요 하 다 는 것 이다.다 핵 CPU 시대 에 우 리 는 모든 CPU 핵심 을 컴 퓨 팅 에 참여 시 키 고 CPU 의 성능 을 충분히 이용 해 야 서버 설정 을 낭비 하지 않 은 셈 이다.아주 좋 은 서버 설정 에 단일 스 레 드 프로그램 을 실행 하고 있다 면 얼마나 큰 낭비 가 될 까?밀집 형 을 계산 하 는 응용 은 완전히 CPU 의 핵 수 에 의 해 작 동 되 기 때문에 그의 장점 을 완전히 발휘 하고 과도 한 스 레 드 컨 텍스트 전환 을 피하 기 위해 이상 적 인 방안 은 다음 과 같다.
스 레 드 수=CPU 핵 수+1,CPU 핵 수*2 로 설정 할 수도 있 지만 JDK 버 전 및 CPU 설정(서버 의 CPU 에 오 버 스 레 드 가 있 음)을 봐 야 합 니 다.
일반적으로 CPU*2 를 설정 하면 됩 니 다.
2.IO 집약 형
우리 가 현재 하고 있 는 개발 은 대부분이 WEB 응용 프로그램 으로 대량의 네트워크 전송 과 관련된다.뿐만 아니 라 데이터베이스 와 캐 시 간 의 상호작용 도 IO 와 관련된다.IO 가 발생 하면 라인 은 대기 상태 에 있 고 IO 가 끝나 면 데이터 가 준 비 된 후에 야 라인 은 계속 실 행 될 것 이다.따라서 여기 서 알 수 있 듯 이 IO 밀집 형 응용 에 대해 우 리 는 스 레 드 탱크 의 스 레 드 수량 을 많이 설정 할 수 있다.그러면 IO 를 기다 리 는 동안 스 레 드 는 다른 일 을 하고 병행 처리 효율 을 높 일 수 있다.그럼 이 스 레 드 탱크 의 데 이 터 량 은 마음대로 설정 할 수 있 습 니까?물론 아 닙 니 다.스 레 드 컨 텍스트 전환 은 대가 가 있다 는 것 을 기억 하 세 요.현재 공식 을 총 결 하여 IO 밀집 형 응용 에 대해
스 레 드 수=CPU 핵심 수/(1-블록 계수)이 블록 계 수 는 보통 0.8~0.9 사이 이 고 0.8 또는 0.9 를 취 할 수 있 습 니 다.
공식 을 사용 하면 쌍 핵 CPU 에 있어 서 비교적 이상 적 인 스 레 드 수 는 20 이다.물론 이것 은 절대적 인 것 이 아니 라 실제 상황 과 실제 업무 에 따라 조정 해 야 한다.final int poolSize=(int)(cpu Core/(1-0.9)
차단 계수 에 대해,즉 에서 언급 한 말 이 있 습 니 다.
차단 계수 에 대해 서 는 먼저 추측 하거나 부 드 러 운 분석 도구 나 자바.lang.management API 를 사용 하여 스 레 드 가 시스템/IO 작업 에 사용 되 는 시간 과 CPU 밀집 작업 에 소모 되 는 시간 비례 를 확인 할 수 있 습 니 다.
스 레 드 탱크 관련 매개 변수 설정
이 점 에 대해 서 는 상한 제한 이 없 는 설정 항목 을 선택 하지 말 아야 한 다 는 점 을 명심 해 야 한다.
Executors 에서 스 레 드 를 만 드 는 방법 을 권장 하지 않 는 이유 다.
예 를 들 어 Executors.new CachedThreadPool 의 설정 과 무한 대기 열 설정 은 예상 치 못 한 상황 으로 인해 스 레 드 탱크 에 시스템 이상 이 발생 하여 스 레 드 가 급증 하거나 작업 대기 열 이 계속 팽창 하고 메모리 소모 로 인해 시스템 이 붕괴 되 고 이상 합 니 다.저 희 는 사용자 정의 스 레 드 탱크 를 사용 하여 이 문 제 를 피 하 는 것 을 추천 합 니 다.이것 도 스 레 드 탱크 규범 을 사용 하 는 가장 중요 한 원칙 입 니 다!큰 잘못 이 없 도록 조심 하 세 요.절대 지나치게 자신 하지 마 세 요!
Executors 에서 스 레 드 풀 을 만 드 는 네 가지 방법 을 볼 수 있 습 니 다.
//
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
다른 것 은 더 이상 열거 하지 않 겠 습 니 다.여러분 은 스스로 소스 코드 를 찾 아 볼 수 있 습 니 다.둘째,스 레 드 수량 과 스 레 드 의 남 은 회수 시간 을 합 리 적 으로 설정 하고 구체 적 인 작업 수행 주기 와 시간 에 따라 설정 하여 빈번 한 회수 와 생 성 을 피해 야 한다.비록 우리 가 스 레 드 탱크 를 사용 하 는 목적 은 시스템 성능 과 스루풋 을 향상 시 키 는 것 이지 만 시스템 의 안정성 도 고려 해 야 한다.그렇지 않 으 면 기대 할 수 없 는 문제 가 발생 하면 매우 번 거 로 울 것 이다!
셋째,실제 장면 에 따라 자신 에 게 적용 되 는 거부 전략 을 선택한다.보상 을 하고,JDK 가 지원 하 는 자동 보상 메커니즘 을 함부로 사용 하지 마 세 요!가능 한 한 사용자 정의 거부 정책 을 사용 하여 폭로 하 세 요!
넷 째,스 레 드 탱크 거부 정책,사용자 정의 거부 정책 은 Rejected ExecutionHandler 인 터 페 이 스 를 실현 할 수 있 습 니 다.
JDK 자체 거부 정책 은 다음 과 같 습 니 다.
AbortPolicy:시스템 의 정상 적 인 작 동 을 막 기 위해 이상 을 직접 던 집 니 다.
CallerRunsPolicy:스 레 드 탱크 가 닫 히 지 않 으 면 이 정책 은 호출 자 스 레 드 에서 현재 버 려 진 작업 을 직접 실행 합 니 다.
DiscardOldestPolicy:가장 오래된 요청 을 버 리 고 현재 작업 을 다시 제출 하려 고 합 니 다.
DiscardPolicy:처리 할 수 없 는 작업 을 버 리 고 처리 하지 않 습 니 다.
훅 활용
Hook 을 이용 하여 스 레 드 탱크 의 실행 궤적 을 남 깁 니 다:
Thread PoolExecutor 는 proctected 형식 으로 덮어 쓸 수 있 는 갈고리 방법 을 제공 하여 사용자 가 작업 을 수행 하기 전에 뭔 가 를 할 수 있 도록 합 니 다.우 리 는 이 를 통 해 ThreadLocal 초기 화,통계 정보 수집,기록 로그 등 작업 을 실현 할 수 있 습 니 다.이런 훅 은 beforeExecute 와 after Execute 와 같다.또 하나의 Hook 은 작업 이 실 행 될 때 rerminated 와 같은 논 리 를 삽입 할 수 있 습 니 다.
훅 방법 이 실 패 했 을 경우 내부 작업 라인 의 실행 이 실패 하거나 중 단 됩 니 다.
저 희 는 beforeExecute 와 after Execute 를 사용 하여 스 레 드 전과 후의 운행 상황 을 기록 할 수 있 고 실행 이 완 료 된 상 태 를 ELK 등 로그 시스템 에 직접 기록 할 수 있 습 니 다.
스 레 드 풀 닫 기
내용 은 스 레 드 탱크 가 인용 되 지 않 고 작업 스 레 드 수가 0 일 때 스 레 드 탱크 가 종 료 됩 니 다.우 리 는 또한 shutdown 을 호출 하여 스 레 드 탱크 를 수 동 으로 중지 할 수 있다.만약 우리 가 shutdown 을 호출 하 는 것 을 잊 어 버 리 면,스 레 드 자원 이 방출 되 기 위해 서,우 리 는 keepAliveTime 과 allowCoreThreadTimeOut 을 사용 하여 목적 을 달성 할 수 있 습 니 다!
물론,안정 적 인 방식 은 가상 컴퓨터 Runtime.getRuntime().addShutdown Hook 방법 을 사용 하여 스 레 드 탱크 의 닫 는 방법 을 수 동 으로 호출 하 는 것 입 니 다!
스 레 드 탱크 사용 실례
스 레 드 탱크 핵심 코드:
public class AsyncProcessQueue {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
* Task <br>
* Executor <br>
*/
public static class TaskWrapper implements Runnable {
private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class);
private final Runnable gift;
public TaskWrapper(final Runnable target) {
this.gift = target;
}
@Override
public void run() {
// , Executor
if (gift != null) {
try {
gift.run();
} catch (Exception e) {
_LOGGER.error("Wrapped target execute exception.", e);
}
}
}
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
*
*
* @param task
* @return
*/
public static boolean execute(final Runnable task) {
return AsyncProcessor.executeTask(new TaskWrapper(task));
}
}
public class AsyncProcessor {
static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class);
/**
* <br>
*/
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/**
*
*/
private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d";
/**
*
*/
private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
.daemon(true).build();
/**
*
*/
private static final int DEFAULT_SIZE = 500;
/**
*
*/
private static final long DEFAULT_KEEP_ALIVE = 60L;
/**NewEntryServiceImpl.java:689
* Executor
*/
private static ExecutorService executor;
/**
*
*/
private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);
static {
// Executor
// 4
try {
executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, executeQueue, FACTORY);
//
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
AsyncProcessor.LOGGER.info("AsyncProcessor shutting down.");
executor.shutdown();
try {
// 1
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
AsyncProcessor.LOGGER.error("AsyncProcessor shutdown immediately due to wait timeout.");
executor.shutdownNow();
}
} catch (InterruptedException e) {
AsyncProcessor.LOGGER.error("AsyncProcessor shutdown interrupted.");
executor.shutdownNow();
}
AsyncProcessor.LOGGER.info("AsyncProcessor shutdown complete.");
}
}));
} catch (Exception e) {
LOGGER.error("AsyncProcessor init error.", e);
throw new ExceptionInInitializerError(e);
}
}
/**
*
*/
private AsyncProcessor() {
}
/**
* , <br>
* {@link Executer}
*
* @param task
* @return
*/
public static boolean executeTask(Runnable task) {
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
return false;
}
return true;
}
/**
* , <br>
* , {@link }
*
* @param task
* @return
*/
public static <T> Future<T> submitTask(Callable<T> task) {
try {
return executor.submit(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
}
}
}
사용 방법:
AsyncProcessQueue.execute(new Runnable() {
@Override
public void run() {
//do something
}
});
자신의 사용 장면 에 따라 유연 하 게 변경 할 수 있 습 니 다.저 는 beforeExecute 와 after Execute,거부 전략 을 사용 하지 않 았 습 니 다.이상 은 개인 적 인 경험 이 므 로 여러분 에 게 참고 가 되 기 를 바 랍 니 다.여러분 들 도 저 희 를 많이 응원 해 주시 기 바 랍 니 다.만약 잘못 이 있 거나 완전히 고려 하지 않 은 부분 이 있다 면 아낌없이 가르침 을 주시 기 바 랍 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
JPA + QueryDSL 계층형 댓글, 대댓글 구현(2)이번엔 전편에 이어서 계층형 댓글, 대댓글을 다시 리팩토링해볼 예정이다. 이전 게시글에서는 계층형 댓글, 대댓글을 구현은 되었지만 N+1 문제가 있었다. 이번에는 그 N+1 문제를 해결해 볼 것이다. 위의 로직은 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.