Java 동시 프로그래밍 실전 노트 2.0

1. 자물쇠로 하는 최고의 실천

1.     
2.     
3. 

2. 신호량 모델

 : , , 。

init(): 카운터의 초기 값을 설정합니다.down (): 계수기의 값을 1로 줄이기;만약 이 계수기의 값이 0보다 작다면, 현재 라인은 막힐 것입니다. 그렇지 않으면 현재 라인은 계속 실행할 수 있습니다.up (): 계수기의 값을 1 더하기;만약 계수기의 값이 0보다 작거나 같으면 대기 대기열의 한 라인을 깨우고 대기 대기열에서 제거합니다.

class Semaphore{
  //  
  int count;
  //  
  Queue queue;
  //  
  Semaphore(int c){
    this.count=c;
  }
  // 
  void down(){
    this.count--;
    if(this.count<0){
      // 
      // 
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
      // T
      // T
    }
  }
}


3. 읽기 쓰기 자물쇠


모든 읽기와 쓰기 자물쇠는 다음과 같은 세 가지 기본 원칙을 준수한다.\1.여러 라인이 공유 변수를 동시에 읽을 수 있도록 허용하기;    2.하나의 스레드 쓰기 공유 변수만 허용;    3.만약 쓰기 스레드가 쓰기 작업을 실행하고 있다면, 읽기 스레드 읽기 공유 변수를 금지합니다.   읽기와 쓰기 자물쇠와 배제 자물쇠의 중요한 차이점 중 하나는 읽기와 쓰기 자물쇠가 여러 라인에서 공유 변수를 동시에 읽을 수 있고 배제 자물쇠는 허용하지 않는다는 것이다. 이것은 읽기와 쓰기 자물쇠가 읽기와 쓰기가 적은 장면에서 서로 배제하는 자물쇠보다 성능이 좋은 관건이다.그러나 읽기 자물쇠의 쓰기 동작은 서로 배척된다. 한 라인이 공유 변수를 쓸 때 다른 라인이 쓰기 동작과 읽기 동작을 실행하는 것을 허용하지 않는다.
읽기 및 쓰기 잠금 상태 내리기

class CachedData {
  Object data;
  volatile boolean cacheValid;
  final ReadWriteLock rwl =
    new ReentrantReadWriteLock();
  //    
  final Lock r = rwl.readLock();
  // 
  final Lock w = rwl.writeLock();
  
  void processCachedData() {
    //  
    r.lock();
    if (!cacheValid) {
      //  , 
      r.unlock();
      //  
      w.lock();
      try {
        //    
        if (!cacheValid) {
          data = ...
          cacheValid = true;
        }
        //  , 
        //  
        r.lock();} finally {
        //  
        w.unlock(); 
      }
    }
    //  
    try {use(data);} 
    finally {r.unlock();}
  }
}


4. CountDownLatch 및 CyclicBarrier


CountDownLatch는 주로 한 라인이 여러 라인을 기다리는 장면을 해결하는 데 사용된다.CyclicBarrier는 스레드 세트로 서로 기다립니다.그 외에 CountDownLatch의 계수기는 순환적으로 사용할 수 없다. 즉, 계수기가 0으로 줄어들면 await () 를 호출하는 라인이 있으면 이 라인은 바로 통과한다.그러나 CyclicBarrier의 카운터는 순환적으로 이용할 수 있고 자동 리셋 기능이 있어 카운터가 0으로 줄어들면 당신이 설정한 초기 값으로 자동으로 리셋됩니다.

5. 스레드 풀


1. SingleThreadExecutor의 의미
SingleThreadExecutor 내부에 Thread가 생성됩니다. 이 Thread의 작업은 한 대기열에서 사용자가 제출한 작업을 꺼내서 실행하는 것입니다. 만약 실행 과정에서 검사되지 않은 이상이 발생하면singleThreadExecutor는 자동으로 한 라인을 다시 시작하고 작업을 계속합니다. 이것은 자신이 만든 라인보다 스스로 관리하는 것이 훨씬 쉽고 작업 대기열을 유지할 필요가 없습니다.
스레드 탱크 관리의 스레드의 몇 가지 의미: 1. 캐시 스레드, 탱크화로 스레드의 중복 이용을 실현하고 중복 생성과 소각으로 인한 성능 비용을 피할 수 있다.2. 스레드 스케줄링 작업에 이상이 발생하면 이상이 발생한 스레드를 대체하는 스레드를 다시 만듭니다.3. 임무 집행은 규정된 스케줄 규칙에 따라 집행한다.스레드 탱크는 대기열 형식을 통해 임무를 수신한다.다시 빈 라인을 통해 하나하나 꺼내서 임무 스케줄링을 진행한다.즉, 스레드 탱크는 임무 스케줄링의 집행 순서를 제어할 수 있다.4. 거부 전략을 제정할 수 있다.즉, 작업 대기열이 가득 찼을 때 이후 작업에 대한 거부 처리 규칙입니다.이상의 의미는singleThreadExecutor에게도 적용된다.일반 스레드와 스레드 탱크에서 만들어진 스레드의 가장 큰 차이점은 관리자가 스레드를 관리하는지 여부이다.
2.1 스레드 탱크 구조 방법 중 각 파라미터의 의미

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial parameters and default thread factory.
     *
     * @param corePoolSize the number of threads to keep in the pool, even if they are idle,   
     *          unless {@code allowCoreThreadTimeOut} is set
     *       , , allowCoreThreadTimeOut
     * @param maximumPoolSize the maximum number of threads to allow in the pool
     *         
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     *       
     * @param unit the time unit for the {@code keepAliveTime} argument
     *       
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     *       
     * @param threadFactory the factory to use when the executor creates a new thread
     *       ,Executors Executors.defaultThreadFactory()
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     *       ,handler 
     * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); }

테스트 클래스

package com.lhc.concurrent.executor.param;

import java.util.concurrent.*;

public class Constructor {
    public static void main(String[] args) throws InterruptedException{
        Runnable runnable = new Runnable(){
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " run ! " + System.currentTimeMillis());
                    Thread.sleep(1000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        };

        ThreadPoolExecutor executor = getPoolBySynchronousQueue();

        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);

        Thread.sleep(300);

        System.out.println("first out core:" + executor.getCorePoolSize());
        System.out.println("first out :" + executor.getPoolSize());
        System.out.println("first out queue:" + executor.getQueue().size());

        Thread.sleep(10000);

        System.out.println("second out core:" + executor.getCorePoolSize());
        System.out.println("second out :" + executor.getPoolSize());
        System.out.println("second out queue:" + executor.getQueue().size());
    }

    /**
     *  LinkedBlockingDeque ,, corePoolSize ,
     *  ,maximumPoolSize 
     * keepAliveTime 
     * @return
     */
    public static ThreadPoolExecutor getPoolByLinkedBlockingDeque(){
        return new ThreadPoolExecutor(7, 8, 5,
                TimeUnit.SECONDS, new LinkedBlockingDeque());
    }

    /**
     *  SynchronousQueue ,maximumPoolSize 
     * keepAliveTime 
     *  maximumPoolSize , , 
     * @return
     */
    public static ThreadPoolExecutor getPoolBySynchronousQueue(){
        return new ThreadPoolExecutor(7, 8, 5,
                TimeUnit.SECONDS, new SynchronousQueue());
    }
}


2.2 코어 Pool Size + Linked Blocking Deque에 대한 설명size

/**
 *   corePoolSize + LinkedBlockingDeque.size  ,
 *  ,   maxSize + + LinkedBlockingDeque.size  ,
 *  , 
 */
package com.lhc.concurrent.executor.queue;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyLinked {
    public static void main(String[] args){
        Runnable runnable = new Runnable(){
            @Override
            public void run() {
                try{
                    System.out.println("begin " + System.currentTimeMillis());
                    Thread.sleep(1000);
                    System.out.println("end " + System.currentTimeMillis());
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        };

        LinkedBlockingDeque deque = new LinkedBlockingDeque<>(2);
        System.out.println("deque "+ deque.size());
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 5,
                TimeUnit.SECONDS, deque);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        System.out.println("deque " + deque.size());
        System.out.println("poolSize " + executor.getPoolSize());
    }
}


Executors를 사용하지 않는 가장 중요한 이유는 Executors가 제공하는 많은 방법들이 기본적으로 사용하는 것은 무계한 Linked Blocking Queue이기 때문이다. 높은 부하 상황에서 무계 대기열은 OOM을 초래하기 쉽고 OOM은 모든 요청을 처리할 수 없기 때문에 이것은 운명적인 문제이다.그래서 유계 대열을 사용하는 것을 강력히 권장합니다.유계 대기열을 사용하면 작업이 너무 많으면 스레드 풀에서 거부 정책을 실행합니다. 스레드 풀의 기본 거부 정책은throw RejectedExecutionException입니다. 이것은 실행할 때 이상합니다. 실행할 때 이상 컴파일러는catch를 강제하지 않기 때문에 개발자가 무시하기 쉽습니다.따라서 기본 거부 정책은 신중하게 사용해야 합니다.만약 스레드 탱크에서 처리하는 작업이 매우 중요하다면 자신의 거부 정책을 사용자 정의하는 것을 권장합니다.또한 실제 작업에서 사용자 정의 거부 정책은 종종 강등 정책과 함께 사용된다.    스레드 풀을 사용할 때 비정상적인 처리 문제도 주의해야 한다. 예를 들어 ThreadPoolExecutor 대상의excute() 방법을 통해 임무를 제출할 때 임무를 수행하는 과정에서 실행할 때 이상이 발생하면 임무를 수행하는 스레드가 종료될 수 있다.그러나 가장 치명적인 것은 임무가 이상하지만 아무런 통지도 받지 못한다는 것이다. 이것은 임무를 정상적으로 수행하고 있다고 착각하게 할 것이다.비록 스레드 탱크는 이상 처리에 사용되는 많은 방법을 제공했지만 가장 온당하고 간단한 방안은 모든 이상을 포획하고 수요에 따라 처리하는 것이다.

6.생산자-소비자모델


   단계별 제출을 지원하여 성능을 향상시키고 생산자-소비자 모델을 이용하며 단계별 제출의 응용 장면을 쉽게 지원할 수 있다.파일을 쓸 때 동기화 디스크의 성능이 매우 느리기 때문에 중요하지 않은 데이터에 대해서는 종종 비동기 디스크 방식을 채택한다.어떤 프로젝트에서 그 중의 로그 구성 요소는 스스로 실현된 것이고 비동기적인 디스크 갱신 방식을 사용한다. 디스크를 갱신하는 시기는 ERROR급의 로그는 즉시 디스크를 갱신해야 한다.데이터가 500개까지 쌓이면 즉시 디스크를 갱신해야 한다.디스크를 갱신하지 않은 데이터가 존재하고 5초 동안 디스크를 갱신하지 않았으니 즉시 갱신해야 한다.위조 코드는 다음과 같습니다.


class Logger {
  //   
  final BlockingQueue<LogMsg> bq = new BlockingQueue<>();
  //flush   
  static final int batchSize=500;
  // 
  ExecutorService es = Executors.newFixedThreadPool(1);
  // 
  void start(){
    File file=File.createTempFile("foo", ".log");
    final FileWriter writer = new FileWriter(file);
    this.es.execute(()->{
      try {
        // 
        int curIdx = 0;
        long preFT=System.currentTimeMillis();
        while (true) {
          LogMsg log = bq.poll(5, TimeUnit.SECONDS);
          // 
          if (log != null) {
            writer.write(log.toString());
            ++curIdx;
          }
          // , 
          if (curIdx <= 0) {
            continue;
          }
          // 
          if (log!=null && log.level==LEVEL.ERROR ||
              curIdx == batchSize ||
              System.currentTimeMillis()-preFT>5000){
            writer.flush();
            curIdx = 0;
            preFT=System.currentTimeMillis();
          }
        }
      }catch(Exception e){
        e.printStackTrace();
      } finally {
        try {
          writer.flush();
          writer.close();
        }catch(IOException e){
          e.printStackTrace();
        }
      }
    });  
  }
  // INFO 
  void info(String msg) {
    bq.put(new LogMsg(LEVEL.INFO, msg));
  }
  // ERROR 
  void error(String msg) {
    bq.put(new LogMsg(LEVEL.ERROR, msg));
  }
}
// 
enum LEVEL {
  INFO, ERROR
}
class LogMsg {
  LEVEL level;
  String msg;
  // 
  LogMsg(LEVEL lvl, String msg){}
  // toString() 
  String toString(){}
}


7.HiKaricP 데이터베이스 연결 풀


HiKaricP에 대한 두 가지 데이터 구조는 하나는FastList이고, 다른 하나는ConcurrentBag이다.
HiKaricP의 FastList가 ArrayList에 비해 최적화된 점 중 하나는 Remove(Object element) 방법의 검색 순서를 역순 검색으로 바꾸는 것이다.이 밖에도 FastList는 get(int index) 방법이 index 매개 변수에 대한 경계 검사를 하지 않았기 때문에 HiKaricP는 경계를 넘지 않을 것을 보장할 수 있기 때문에 매번 경계 검사를 하지 않아도 된다.
ConcurrentBag에서 가장 관건적인 속성은 네 가지가 있는데 그것이 바로 모든 데이터베이스 연결을 저장하는 공유 대기열sharedList, 루트 로컬 저장threadList, 데이터베이스 연결을 기다리는 루트수waiters, 데이터베이스 연결을 분배하는 도구handoffQueue이다.그 중에서 HandoffQueue는 자바SDK가 제공하는 SynchronousQueue를 사용하는데 SynchronousQueue는 주로 라인 간에 데이터를 전달하는 데 사용된다.

좋은 웹페이지 즐겨찾기