자바 병렬 스트림의 내부 구조? 포크-조인(Fork-Join) 프레임워크

1. 포크-조인 프레임워크


Java의 스트림(Stream)이 병렬 처리를 할 때 내부적으로 사용하는 포크-조인 프레임워크에 대해 알아본다.

포크-조인 프레임워크는 Java7에서 추가되었다.

포크-조인 프레임워크는 태스크(Task)를 재귀적으로 여러 개의 작은 작업 단위로 분할(fork)하여 처리한다. 처리된 서브 태스크들의 결과를 합쳐 전체 결과로 합친다(join).

이러한 방식을 분할 정복(divide & conquer)이라고 한다. 큰 작업을 작은 작업으로 나누어 처리하고 결과를 합치는 방식이다.

포크 조인의 작업 처리 방식은 의사 코드(psudo-code)로 아래와 같이 표현할 수 있다.

if(task_size < SIZE) {
    seuquentially_process(task); //작업 단위가 기준보다 작아지면 순차 처리
} else {
    (subtask1, subtask2) = divide(task); //분할
    result1 = subtask1.process(); //서브태스크1 처리
    result2 = subtask2.process(); //서브태스크2 처리
    return result1 + result2; //결과 병합
}

포크-조인을 사용해서 병렬연산을 하기 위해서는 RecursiveTask<R>을 확장하는 클래스를 작성해야 한다. R은 결과 타입이다.

RecursiveTask를 확장하면 compute() 메소드를 오버라이딩 해야한다. compute() 메소드에서 실제 처리가 실행된다.

아래 예제는 1부터 10,000,000까지 더하는 작업을 ForkJoin을 통해 처리한다.

public class ForkJoinSum extends RecursiveTask<Long> {

    private final long[] numbers;
    private static final long TASK_THRESHOLD_SIZE = 10000;

    public ForkJoinSum(long[] numbers) {
        this.numbers = numbers;
    }

    @Override
    protected Long compute() {
        if (numbers.length <= TASK_THRESHOLD_SIZE) {
            return computeSequentially();
        }

        long[] subNumbers1 = Arrays.copyOfRange(numbers, 0, numbers.length / 2);
        long[] subNumbers2 = Arrays.copyOfRange(numbers, numbers.length / 2, numbers.length);

        ForkJoinSum subTask1 = new ForkJoinSum(subNumbers1); // 서브태스크1 생성
        subTask1.fork(); // ForkJoinPool의 다른 스레드로 서브태스크1 비동기 실행
        ForkJoinSum subTask2 = new ForkJoinSum(subNumbers2); // 서브태스크2 생성
        long subResult2 = subTask2.compute(); // 서브태스크2는 동기적 실행

        long subResult1 = subTask1.join(); // join을 호출하여 서브태스크1 완료시까지 대기

        return subResult1 + subResult2;

    }

    private long computeSequentially() {
        return Arrays.stream(numbers).sum();
    }
}

위 예제에서는 실제 작업을 처리하기 위한 기준 태스크 크기(여기서는 배열의 크기)를 10,000으로 잡았다. 이 크기가 되기 전까지는 재귀적으로 배열을 더 작은 작업단위로 나누게 된다. 태스크 크기가 10,000 이하가 되면 computeSequentially()를 실행해서 실제 합계 작업을 처리한다.

또한 subTask1은 fork()를 호출하여 비동기로 실행하도록 다른 스레드에 던진다. 그럼 결과가 나오기 전에 다음 코드행을 실행한다. 다음으로 subTask2를 동기로 호출하면 subTask2의 완료를 기다리게 된다. 이후 subTask1의 join()을 호출하여 완료가 되었는지 확인하며 완료가 되지 않았으면 기다리게 된다. 이후 계산이 완료되었으므로 두 결과를 합하여 반환한다.

public static void main(String[] args) throws Exception {
	ForkJoinSum task = new ForkJoinSum(LongStream.rangeClosed(1, 10000000).toArray());
	long sum = new ForkJoinPool().invoke(task);
	System.out.println(sum); //50,000,005,000,000
}

직접 작성한 태스크를 만들고 ForkJoinPool에 작업을 요청한다(invoke). invoke의 반환 타입은 Task의 반환 타입과 같다.

이 예시는 포크조인의 사용법을 위해 작성된 코드이므로 실제 작업은 for등을 통한 외부 반복을 사용한 직접 합계 작업보다 느릴 수 있다. Long 타입에 의한 박싱/언박싱 작업의 오버헤드 및 멀티코어에서 각 코어의 계산 완료 후 발생하는 코어간 데이터 이동이 병렬 처리를 통해 얻을 수 있는 이점보다 크다면 말이다.

또한 컴파일러의 최적화 작업은 병렬 처리보다는 순차 처리에 더 적합하므로 이러한 사항도 고려 대상이다.


2. 분할 크기는 얼마가 적당할까?


위 예제에서는 태스크의 배열 크기를 10,000 이하로 제한했다. 포크 조인을 수행할 때 적당한 태스크 크기를 정하는 공식 같은 건 없다. 기준 태스크 크기를 변경해가며 각각 테스트를 해보기 전에는 마땅한 방법이 없는 것이 사실이다.

만약 쿼드코어(4개) 컴퓨터에서 포크조인을 수행한다고 생각해보자. 그렇다면 10,000,000개의 배열을 4개(각 2,500,000 길이 배열)보다 더 많이 쪼개는게 의미가 있을까?

의미가 있다. 만약 듀얼코어에서 작업을 두 개로 쪼갰는데, 하나의 실행 시간이 10초이고 나머지 하나가 1초라면 전체 시간이 10초가 걸린다. 위 예제야 단순 배열을 분할하므로 반반씩 정확히 나누는게 가능하지만 실제 작업들에서는 단순하지가 않아 각 작업간의 시간 차이가 발생할 가능성이 항상 존재한다. 따라서 코어가 2개라고 해서 2개로 나누면 나머지 하나의 코어는 idle 상태가 되어 놀게 된다.

하지만 듀얼 코어에서 작업을 4개로 쪼갰다면 한 코어가 작업을 끝내고 다른 작업을 찾아 실행하게 된다. 이런 이유 때문에 작업을 작게 쪼갤수록 각 코어에 할당되는 작업의 총량이 균형 있게 배분된다.

포크 조인에서 작은 태스크 하나를 끝내면 다른 큐(Queue)에서 다른 태스크를 가져와서 실행하는데, 이러한 것을 작업 훔치기(work stealing) 이라고 한다.

결론적으로 작업 훔치기라는 메커니즘에 의해 적절한 크기로 분할된 많은 태스크를 포킹하는 것이 바람직하다.

좋은 웹페이지 즐겨찾기