쓰레드 thread 2

40399 단어 JavaJava

📚 9. 쓰레드의 동기화

멀티쓰레드 프로세스의 경우 여러 쓰레드가 같은 프로세스 내의 자원을 공유해서 작업하기 때문에 서로의 작업에 영향을 주게 된다.

한 쓰레드가 특정 작업을 끝마치기 전까지 다른 쓰레드에 의해 방해받지 않도록 하는 것이 필요하다. 그래서 도입된 개념이 바로 임계 영역(critical section)잠금(락, lock)이다.

  • 공유 데이터를 사용하는 코드 영역을 임계 영역으로 지정해놓는다.
  • 공유 데이터(객체)가 가지고 있는 lock을 획득한 단 하나의 쓰레드만 이 영역 내의 코드를 수행할 수 있게 한다.
  • 그리고 해당 쓰레드가 임계 영역 내의 모든 코드를 수행하고 벗어나서 lock을 반납해야만 다른 쓰레드가 반납된 lock을 획득하여 임계 영역의 코드를 수행할 수 있게 된다.

이처럼 한 쓰레드가 진행 중인 작업을 다른 쓰레드가 간섭하지 못하도록 막는 것을 쓰레드의 동기화(synchronization) 라고 한다.

 

💡 참고
JDK1.5부터는 java.util.concurrent.locksjava.util.concurrent.atomic패키지를 통해서 다양한 방식으로 동기화를 구현할 수 있도록 지원하고 있다.

 

📖 A. synchronized를 이용한 동기화

synchronized 키워드를 이용한 동기화 : 임계 영역을 설정하는데 사용된다.

1. 메서드 전체를 임계 영역으로 지정

public synchronized void calcSum(){ // 임계영역(critical section)
	// ...
}

synchronized를 붙이면 메서드 전체가 임계 영역으로 설정된다.
쓰레드는 synchronized메서드가 호출된 시점부터 해당 메서드가 포함된 객체의 lock을 얻어 작업을 수행하다가 메서드가 종료되면 lock을 반환한다.

 

2. 특정한 영역을 임계 영역으로 지정

synchronized(객체의 참조변수){ // 임계영역(critical section)
	// ...
}

메서드 내의 코드 일부를 블럭{}으로 감싸고 블럭 앞에 synchronized(참조변수)를 붙이는 것인데, 이때 참조변수는 락을 걸고자하는 객체를 참조하는 것이어야 한다.
이 블럭을 synchronized블럭 이라고 부르며, 이 블럭의 영역 안으로 들어가면서부터 쓰레드는 지정된 객체의 lock을 얻게 되고, 이 블럭을 벗어나면 lock을 반납한다.

 

두 방법 모두 lock의 획득과 반납이 모두 자동적으로 이루어지므로 우리가 해야할 일은 그저 임계 영역만 설정해주는 것 뿐이다!

 

💡 참고
임계 영역은 멀티쓰레드 프로그램의 성능을 좌우하기 때문에 가능하면 메서드 전체에 락을 거는 것보다 synchronized블럭으로 임계 영역을 최소화해서 보다 효율적인 프로그램이 되도록 노력해야 한다.

 

synchronized를 이용한 동기화는 지정된 영역의 코드를 한 번에 하나의 쓰레드가 수행하는 것을 보장한다.

 

📖 B. wait()과 notify()

  • 동기화된 임계영역의 코드를 수행하다가 작업을 더 이상 진행할 상황이 아니면, 일단 wait()을 호출하여 쓰레드가 락을 반납하고 기다리게 한다.
  • 그러면 다른 쓰레드가 락을 얻어 해당 객체에 대한 작업을 수행할 수 있게 된다. 나중에 작업을 진행할 수 있는 상황이 되면 notify()를 호출해서, 작업을 중단했던 쓰레드가 다시 락을 얻어 작업을 진행할 수 있게 한다.

notifyAll()은 기다리고 있는 모든 쓰레드에게 통보를 하지만, lock을 얻을 수 있는 것은 하나의 쓰레드일 뿐이고 나머지 쓰레드는 통보를 받긴 했지만, lock을 얻지 못하면 다시 lock을 기다리는 신세가 된다.

wait()notify()는 특정 객체에 대한 것이므로 Object 클래스에 정의되어있다.

void wait()
void wait(long timeout)
void wait(long timeout, int nanos)
void notify()
void notifyAll()

wait()notify() 또는 notifyAll()이 호출될 때까지 기다리지만, 매개변수가 있는 wait()은 지정된 시간동안만 기다린다.

wait(), notify(), notifyAll()
- Object에 정의되어 있다.
- 동기화 블록(synchronized블록)내에서만 사용할 수 있다.
- 보다 효율적인 동기화를 가능하게 한다.

 

✔️ 기아 현상과 경쟁 상태

쓰레드는 계속 통지를 받지 못하고 오랫동안 기다리게 되는데, 이것을 기아(starvation) 현상이라고 한다.

  • 이 현상을 막으려면, notify()대신 notifyAll()을 사용해야 한다.

여러 쓰레드가 lock을 얻기 위해 서로 경쟁하는 것을 경쟁 상태(race condition) 라고 한다.

LockCondition을 이용하면 wait() & notify()로는 불가능한 선별적인 통지가 가능하다.

 

📖 C. Lock과 Condition을 이용한 동기화

동기화할 수 있는 방법은 synchronized블럭 외에도 java.util.concurrent.locks 패키지가 제공하는 lock클래스들을 이용하는 방법이 있다.

같은 메서드 내에서만 lock을 걸 수 있다는 제약이 불편하기도 하다. 그럴 때 이 lock 클래스를 사용한다. lock 클래스의 종류는 다음과 같이 3가지가 있다.


ReentrantLock :	재진입이 가능한 lock. 가장 일반적인 배타 lock
ReentrantReadWriteLock :	읽기에는 공유적이고, 쓰기에는 배타적인 lock
StampedLock :	ReentrantReadWriteLock에 낙관적인 lock기능을 추가

 

✔️ ReentrantLock
ReentrantLock은 가장 일반적인 lock이다. 특정 조건에서 lock을 풀고 나중에 다시 lock을 얻고 임계영역으로 들어와서 이후의 작업을 수행할 수 있기 때문이다. lock과 일치한다고 보면 된다.

 

✔️ ReentrantReadWriteLock
ReentrantReadWriteLock은 이름에서 알 수 있듯이, 읽기를 위한 lock과 쓰기를 위한 lock을 제공한다. ReentrantLock은 배타적인 lock이라서 무조건 lock이 있어야만 임계영역의 코드를 수행할 수 있지만, ReentrantReadWriteLock은 읽기 lock이 걸려있으면, 다른 쓰레드가 읽기 lock을 중복해서 걸고 읽기를 수행할 수 있다.

  • 읽기는 내용을 변경하지 않으므로 동시에 여러 쓰레드가 읽어도 문제가 되지 않는다. 그러나 읽기 lock이 걸린 상태에서 쓰기 lock을 거는 것은 허용되지 않는다.
  • 반대의 경우도 마찬가지다. 읽기를 할 때는 읽기 lock을 걸고, 쓰기 할 때는 쓰기 lock을 거는 것일 뿐 lock을 거는 방법은 같다.

 

✔️ StampLock

StampedLocklock을 걸거나 해지할 때 스탬프(long타입의 정수값) 를 사용하며, 읽기와 쓰기를 위한 lock외에 낙관적 읽기 lock(optimistic reading lock) 이 추가된 것이다.

  • 읽기 lock이 걸려있으면, 쓰기 lock을 얻기 위해서는 읽기 lock이 풀릴 때까지 기다려야하는데 비해 낙관적 읽기 lock 은 쓰기 lock에 의해 바로 풀린다. 그래서 낙관적 읽기에 실패하면, 읽기 lock을 얻어서 다시 읽어 와야 한다.
  • 무조건 읽기 lock을 걸지 않고, 쓰기와 읽기가 충돌할 때만 쓰기가 끝난 후에 읽기 lock을 거는 것이다.

가장 일반적인 StampedLock을 이용한 낙관적 읽기의 예이다.

int getBalance(){
	long stamp = lock.tryOptimisticRead(); // 낙관적 읽기 lock을 건다.

	int curBalance = this.balance; // 공유 데이터인 balance를 읽어온다.
	
	if(!lock.validate(stamp)){ // 쓰기 lock에 의해 낙관적 읽기 lock이 풀렸는지 확인
		stamp = lock.readLock(); // lock이 풀렸으면, 읽기 lock을 얻으려고 기다린다.

		try{
			curBalance = this.balance; // 공유 데이터를 다시 읽어온다.
		}finally{
			lock.unlockRead(stamp); // 읽기 lock을 푼다.
		}
	}
	return curBalance; // 낙관적 읽기 lock이 풀리지 않았으면 곧바로 읽어온 값을 반환
}

 

✔️ ReentrantLock의 생성자

ReentrantLock()
ReentrantLock(boolean fair)
  • 생성자의 매개변수를 true로 주면, lock이 풀렸을 때 가장 오래 기다린 쓰레드가 lock을 획득할 수 있게, 즉 공정(fair)하게 처리한다.
  • 그러나 공정하게 처리하려면 어떤 쓰레드가 가장 오래 기다렸는지 확인하는 과정을 거칠 수 밖에 없으므로 성능은 떨어진다.

 

void lock() :	lock을 잠근다.
void unlock() :	lock을 해지한다.
boolean isLocked() :	lock이 잠겼는지 확인한다.

자동적으로 lock의 잠금과 해제가 관리되는 synchronized블럭과 달리, ReentrantLock과 같은 lock클래스들은 수동으로 lock을 잠그고 해제해야 한다. 그래도 lock을 잠그고 푸는 것은 간단하다. 그저 메서드를 호출하기만 하면 될 뿐이다. lock()을 걸고 나서 푸는 것을 잊어버리는 실수를 하지 않도록 주의를 기울여야 한다는 것은 잊지말자!

 

unlock()try-finally문으로 감싸는 것이 일반적이다.
참조변수 lockReentrantLock 객체를 참조한다고 가정하였다.

lock.lock(); // ReentrantLock lock = new ReentrantLock();
try{
   // 임계 영역
} finally {
   lock.unlock();
}

이렇게 하면, try블럭 내에서 어떤 일이 발생해도 finally블럭에 있는 unlock()이 수행되어 lock이 풀리지 않는 일은 발생하지 않는다. 대부분의 경우 lock() & unlock() 대신 synchronized블럭을 사용할 수 있으며, 그럴 때는 그냥 synchronized블럭을 사용하는 것이 더 나을 수 있다.

 

✔️ tryLock()
다른 쓰레드에 의해 lock이 걸려 있으면 lock을 얻으려고 기다리지 않는다. 또는 지정된 시간만큼만 기다린다.
lock을 얻으면 true를 반환하고, 얻지 못하면 false를 반환한다.

boolean tryLock();
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException
  • lock()lock을 얻을 때까지 쓰레드를 블락(block)시키므로 쓰레드의 응답성이 나빠질 수 있다.
  • 응답성이 중요한 경우, tryLock()을 이용해서 지정된 시간동안 lock을 얻지 못하면 다시 작업을 시도할 것인지 포기할 것인지를 결정할 수 있는 것이 좋다.
  • 그리고 이 메서드는 InterruptException을 발생시킬 수 있는데, 이것은 지정된 시간동안 lock을 얻으려고 기다리는 중에 interrupt()에 의해 작업을 취소될 수 있도록 코드를 작성할 수 있다는 뜻이다.

 

✔️ ReentrantLock과 Condition
wait() & notify() 예제에 요리사가 손님 쓰레드를 구분해서 통지하지 못한다는 단점을 가지고 있었다. Condition은 이 문제점을 해결하기 위한 것이다.

wait() & notify()로 쓰레드의 종류를 구분하지 않고, 공유 객체의 waiting pool에 같이 몰아넣는 대신, 손님 쓰레드를 위한 Condition과 요리사 쓰레드를 위한 Condition을 만들어서 각각의 waiting pool에서 따로 기다리도록 하면 문제는 해결된다.

Condition은 이미 생성된 lock으로부터 newCondition()을 호출해서 생성한다.

private ReentrantLock lock = new ReentrantLock(); // lock을 생성

// lock으로 condition을 생성
private Condition forCook = lock.newCondition();
private Condition forCust = lock.newCondition();

두 개의 Condition을 생성했는데, 하나는 요리사 쓰레드를 위한 것이고 다른 하나는 손님 쓰레드를 위한 것이다. 그 다음엔, wait() & notify()대신 Conditionawait() & signal()을 사용하면 그걸로 끝이다.

ObjectCondition
void wait()void await(), void awaitUninterruptibly()
void wait(long timeout)boolean await(long time, TimeUnit unit), long awaitNanos(long nanosTimeout), boolean awaitUntil(Date deadline)
void notify()void signal()
void notifyAll()void signalAll()

 

아래의 코드는 테이블에 음식 추가하는 add()인데, wait()notify()대신 await()signal()을 사용하였다.

public void add(String dish){ // synchronized를 추가
    lock.lock(); 
    try{
      while(dishes.size() >= MAX_FOOD){
          String name = Thread.currentThread().getName();
          System.out.println(name+" is waiting");
          try{
              forCook.await(); // wait(); // 음식이 가득찼으므로 COOK쓰레드를 기다리게 한다.
          }catch (InterruptedException e){}
      }
      dishes.add(dish);
      forCust.signal(); // notify(); // 음식이 추가되면 기다리고 있는 CUST를 깨우게 함
      System.out.println("Dishes : " + dishes.toString());
    } finally {
      lock.unlock();
    }
}

전에는 손님 쓰레드를 기다리게 할 때와 요리사 쓰레드를 기다리게 할 때 모두 wait()을 사용했는데, 이제는 wait()대신 forCook.await()forCust.await()을 사용함으로써 대기와 통지의 대상이 명확히 구분된다.

 

📖 D. volatile

멀티 코어 프로세서의 캐시(cache)와 메모리간의 통신

사진 참고

코어는 메모리에서 읽어온 값을 캐시에 저장하고 캐시에서 값을 읽어서 작업한다. 다시 같은 값을 읽어올 때는 먼저 캐시에 있는지 확인하고 없을 때만 메모리에서 읽어온다.

 

boolean suspended = false;
boolean stopped = false;

// volatile
volatile boolean suspended = false;
volatile boolean stopped = false;

변수 앞에 volatile을 붙이면, 코어가 변수의 값을 읽어올 때 캐시가 아닌 메모리에서 읽어오기 때문에 캐시와 메모리간의 값이 불일치가 해결된다.

 

public void stop(){
	stopped = true;
}

// synchronized블럭 
public synchronized void stop(){
	stopped = true;
}

변수에 volatile을 붙이는 대신에 synchronized블럭을 사용해도 같은 효과를 얻을 수 있다. 쓰레드가 synchronized블럭으로 들어갈 때와 나올 때, 캐시와 메모리간의 동기화가 이루어지기 때문에 값의 불일치가 해소되기 때문이다.

 

✔️ volatile로 long과 double로 원자화
크기가 8 bytelongdouble타입의 변수는 하나의 명령어로 값을 읽거나 쓸 수 없기 때문에, 변수의 값을 읽는 과정에서 다른 쓰레드가 끼어들 여지가 있다.

이러한 변수를 선언할 때 volatile를 붙이면 해결된다.

 

💡 참고
상수에는 volatile을 붙일 수 없다. 즉 변수에 finalvolatile을 같이 붙일 수 없다. 사실 상수는 변하지 않는 값이므로 멀티쓰레드에 안전(thread-safe)하다. 그래서 volatile을 붙일 필요가 없다.

 

volatile long sharedVal; // long타입의 변수(8 byte)를 원자화
volatile double sharedVal; // double타입의 변수(8 byte)를 원자화

volatile은 해당 변수에 대한 읽거나 쓰기가 원자화된다. 원자화라는 것은 작업을 더 이상 나눌 수 없게 한다는 의미인데, synchronized블럭도 일종의 원자화라고 할 수 있다.
즉, synchronized블럭은 여러 문장을 원자화함으로써 쓰레드의 동기화를 구현한 것이라고 보면 된다.

volatile은 변수의 읽거나 쓰기를 원자화할 뿐, 동기화하는 것은 아니라는 점에 주의하자!
동기화가 필요할 때 synchronized블럭 대신 volatile을 쓸 수 없다.

volatile long balance; // 인스턴스 변수 balance를 원자화한다.

synchronized int getBalance(){ // balance의 값을 반환한다.
	return balance;
}

synchronized void withdraw(int money){ // balance의 값을 변경
	if(balance >= money){
		balance -= money;
	}
}

인스턴스변수 balancevolatile로 원자화했으니깐, 이 값을 읽어서 반환하는 메서드 getBalance()를 동기화할 필요가 없다고 생각할 수 있다. 그러나 getBalance()synchronized로 동기화하지 않으면, withdraw()가 호출되어 객체에 lock을 걸고 작업을 수행하는 중인데도 getBalance()가 호출되는 것이 가능해진다. 출금이 진행 중일 때는 기다렸다가 출금이 끝난 후에 잔고를 조회할 수 있도록 하려면 getBalance()synchronized를 붙여서 동기화를 해야 한다.

 

📖 E. fork & Join 프레임워크

속도 보다는 코어의 개수를 늘려서 CPU의 성능을 향상시키는 방향으로 발전해 가고 있다.

JDK1.7부터 fork & join 프레임워크이 추가되었고, 이 프레임워크은 하나의 작업을 작은 단위로 나눠서 여러 쓰레드가 동시에 처리하는 것을 쉽게 만들어 준다.

먼저 수행할 작업에 따라 RecursiveActionRecursiveTask, 두 클래스 중에서 하나를 상속받아 구현해야한다.

RecursiveAction :	반환값이 없는 작업을 구현할 때 사용
RecursiveTask :	반환값이 있는 작업을 구현할 때 사용

두 클래스 모두 compute()라는 추상 메서드를 가지고 있는데, 상속을 통해 이 추상 메서드를 구현하기만 하면 된다.

public abstract class RecursiveAction extends ForkJoinTask<Void> {
	// ...
	protected abstract void compute(); // 상속을 통해 이 메서드를 구현해야 한다.
	// ...
}

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
	// ...
	V result;
	protected abstract V compute(); // 상속을 통해 이 메서드를 구현해야 한다.
	// ...
}

예를 들어 1부터 n까지의 합을 계산한 결과를 돌려주는 작업의 구현은 다음과 같이 한다.

class SumTask extends RecursiveTask<Long> { // RecursiveTask를 상속받는다.
	long from, to;

	SumTask(long from, long to){
		this.from = from;
		this.to = to;
	}

	public Long compute(){
		// 처리할 작업을 수행하기 위한 문장을 넣는다.
	}
}

 

fork&join 프레임워크로 수행할 작업도 compute()가 아닌 invoke()로 시작한다.

ForkJoinPool pool = new ForkJoinPool(); // 쓰레드 풀을 생성
SumTask task = new SumTask(from, to); // 수행할 작업을 생성

Long result = pool.invoke(task); // invoke()를 호출해서 작업을 시작

ForkJoinPoolfork & join프레임워크에서 제공하는 쓰레드 풀(thread pool)로, 지정된 수의 쓰레드를 생성해서 미리 만들어 놓고 반복해서 재사용할 수 있게 한다.
그리고 쓰레드를 반복해서 생성하지 않아도 된다는 장점과 너무 많은 쓰레드가 생성되어 성능이 저하 되는 것을 막아준다는 장점이 있다.
쓰레드 풀은 쓰레드가 수행해야하는 작업이 담긴 큐를 제공하며, 각 쓰레드는 자신의 작업 큐에 담긴 작업을 순서대로 처리한다.

 

💡 참고
쓰레드 풀은 기본적으로 코어의 개수와 동일한 개수의 쓰레드를 생성한다.

 

✔️ compute()의 구현

compute()를 구현할 때는 수행할 작업 외에도 작업을 어떻게 나눌 것인가에 대해서도 알려줘야 한다.

public long compute(){
	long size = to - from + 1; // from <= i <= to

	if(size <= 5) // 더할 숫자가 5개 이하면
		return sum(); // 숫자의 합을 반환. sum()은 from부터 to까지의 수를 더해서 반환
        
	// 범위를 반으로 나눠서 두 개의 작업을 생성
	long half = (from+to)/2;

	SumTask leftSum = new SumTask(from, half);
	SumTask rightSum = new SumTask(half+1, to);

	leftSum.fork(); // 작업(leftSum)을 작업 큐에 넣는다.

	return rightSum.compute() + leftSum.join();
}

 

💡 참고
compute()의 구조는 일반적인 재귀호출 메서드와 동일하다.

 

1부터 8까지의 숫자를 더하는 과정 사진 참고

  • 하나의 쓰레드는 compute()를 재귀호출하면서 작업을 계속해서 반으로 나눈다.
  • 다른 쓰레드는 fork()에 의해 작업 큐에 추가된 작업을 수행한다.

 

✔️ 다른 쓰레드의 작업 훔쳐오기

fork()가 호출되어 작업 큐에 추가된 작업 역시, compute()에 의해 더 이상 나눌 수 없을 때까지 반복해서 나뉘고, 자신의 작업 큐가 비어있는 쓰레드는 다른 쓰레드의 작업 큐에서 작업을 가져와서 수행한다.
이것을 작업 훔쳐오기(work stealing) 라고 하며, 이 과정은 모두 쓰레드풀에 의해 자동적으로 이루어진다.

사진 참고

이런 과정을 통해 한 쓰레드의 작업이 몰리지 않고, 여러 쓰레드가 골고루 작업을 나누어 처리하게 된다.

 

💡 참고
작업의 크기를 충분히 작게 해야 각 쓰레드가 골고루 작업을 나눠가질 수 있다.

 

✔️ fork()와 join()

fork()는 작업을 쓰레드의 작업 큐에 넣는 것이고, 작업 큐에 들어간 작업은 더 이상 나눌 수 없을 때까지 나뉜다. 즉, compute()로 나누고 fork()로 작업 큐에 넣는 작업이 계속해서 반복된다. 그리고 나눠진 작업은 각 쓰레드가 골고루 나눠서 처리하고, 작업의 결과는 join()을 호출해서 얻을 수 있다.

fork()join()의 중요한 차이점이 하나 있는데, 그것은 바로 fork()비동기 메서드(asynchronous method)이고, join()동기 메서드(synchronous method)라는 것이다.

fork() : 해당 작업을 쓰레드 풀의 작업 큐에 넣는다. 비동기 메서드
join() :	해당 작업의 수행이 끝날 때까지 기다렸다가, 수행이 끝나면 그 결과를 반환한다. 동기 메서드

비동기 메서드는 일반적인 메서드와 달리 메서드를 호출만 할 뿐, 그 결과를 기다리지 않는다. (내부적으로는 다른 쓰레드에게 작업을 수행하도록 지시만 하고 결과를 기다리지 않고 돌아오는 것이다.) 그래서 아래의 코드에서, fork()를 호출하면 결과를 기다리지 않고 다음 문장인 return문으로 넘어간다.

return문에서 compute()가 재귀호출될 때, join()은 호출되지 않는다. 그러다가 작업을 더 이상 나눌 수 없게 되었을 때, compute()의 재귀호출은 끝나고 join()의 결과를 기다렸다가 더해서 결과를 반환한다. 재귀호출된 compute()가 모두 종료될 때, 최종 결과를 얻는다.

public Long compute(){

    // ...
    
    SumTask leftSum = new SumTask(from, half);
    SumTask rightSum = new SumTask(half+1, to);
    leftSum.fork(); // 비동기 메서드. 호출 후 결과를 기다리지 않는다.

    return rightSum.compute() + leftSum.join(); // 동기 메서드. 호출결과를 기다린다.
}

 

📌 기억할 것
항상 멀티쓰레드로 처리하는 것이 빠르다고 생각해서는 안된다. 반드시 테스트해보고 이득이 있을 때만, 멀티쓰레드로 처리해야 한다.

좋은 웹페이지 즐겨찾기