Java 동시 프로그래밍용 Callable, Futrue, FutureTask

기존의 다중 루틴 (Thread와 Runable) 에서 루틴 실행 방법인run () 의 반환값은void이고, 패키지가 먼저 Callable의Call 방법을 사용할 때 V모드로 되돌아옵니다.
Callable 인터페이스 소스:
@FunctionalInterface
public interface Callable {
    V call() throws Exception;
}

이 인터페이스는 Runable와 다르다. Runable에서run 방법은 값을 되돌려 주지 않고 Callable에서 전송된 모델 V를 되돌려 달라고 요구한다. 여기서 우리는 다중 라인의 실행 단원을 설계하고 일정한 정보를 되돌려 줄 수 있다.
Future 인터페이스 소스:
public interface Future {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Futrue는 라인에 다섯 가지 능력을 제공하고, 이러한 능력은 라인의 실행 상태를 판단합니다.
-- cancel이 아직 실행되지 않았으니 스레드 실행을 취소할 수 있습니다
--get 실행 반환 결과 얻기
-- isDone 판단 완료 여부
-- isCancelled는 계산을 취소하지 말지 판단한다
클래스FutrueTask는 RunableFutrue 인터페이스를 실현하고 RunableFutrue 인터페이스는 Runable와Futrue 인터페이스를 계승하여 다섯 가지Task의 상태를 제공하여 우리가 항상 라인의 상태를 얻을 수 있도록 한다.
	private volatile int state; //   volatile   
    /**
     *    FutureTask   ,         callable     ,
     *    worker thread  FutureTask  run();
     */
    private static final int NEW = 0;

    /**
     * woker thread   task        ,      ,
     *   worker thread     result.
     */
    private static final int COMPLETING = 1;

    /**
     *    result     ,FutureTask     ,      ,
     *         final state,(         )
     */
    private static final int NORMAL = 2;

    /**
     *   ,   task        ,       exception,
     *   final state
     */
    private static final int EXCEPTIONAL = 3;

    /**
     * final state,   task cancel(task       cancel   ).
     */
    private static final int CANCELLED = 4;

    /**
     *     ,task      interrupt ,       
     */
    private static final int INTERRUPTING = 5;

    /**
     * final state,          ,    ,      
     */
    private static final int INTERRUPTED = 6;

state가 NEW로 초기화됩니다.set, setException, cancel 방법에서만 state가 최종 상태로 바뀔 수 있습니다.작업이 완료되는 동안 state 값은 COMPLETING 또는 INTERRUPTING일 수 있습니다.state에는 다음과 같은 네 가지 상태 변환이 있습니다.
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
다른 구성원 변수의 의미:
    /** The underlying callable; nulled out after running */

    private Callable callable;   //   run         call(),     ,     null.

 

    /** The result to return or exception to throw from get() */

    private Object outcome; // non-volatile, protected by state reads/writes       votaile,      state     , state FutureTask     。

 

    /** The thread running the callable; CASed during run() */

    private volatile Thread runner;   //   worker thread.

 

    /** Treiber stack of waiting threads */  

    private volatile WaitNode waiters;     //Treiber stack   stack    ,        futuretask#get     。

Task의 상태 변화(Task의 라이프 사이클)
FutureTask가 생성될 때 먼저 함수를 구성하여 Task의 상태가 NEW인지 확인합니다.
    public FutureTask(Callable callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

주의: Runable를 Callable로 변환하는 방법은 Callable의call 방법으로 Runable의run 방법을 호출한 다음 전송된 V를 되돌려줍니다.
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}

 	public static  Callable callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter(task, result);
}

    static final class RunnableAdapter implements Callable {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
}

Task 가 생성되면 Task 의 상태가 NEW 가 되고 FutureTask 의 run 메서드를 호출합니다.
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);//      EXCEPTION
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

실행 중 예외가 발생하면 setException 메서드를 호출하여 상태를 EXCEPTIONAL로 설정합니다.
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

완료되면 set(result) 메서드를 호출하여 상태를 NORMAL로 설정합니다.
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

마지막으로finishCompletion 방법을 실행하면 모든 막힌worker thread를 제거하고done () 방법을 호출하여 구성원 변수callable을null로 설정합니다.여기에는 LockSupport 클래스가 사용되어 스레드 장애를 제거합니다.
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

FutureTask의 get 방법은 우선 완성 상태인지 판단합니다. set이나 setException 방법을 실행했는지 설명하면 리포트로 되돌아갑니다. 미완성 상태라면 awaitDone를 호출해서 라인을 막습니다.
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

NORMAL은 실행에 아무런 오류가 없다는 것을 설명하기 때문에 get 방법을 통해 얻은 것은 실행이 되돌아오는 결과이고, 취소 상태라면 setException을 호출한 적이 있음을 설명하면 CancellationException 이상을 던집니다. 그렇지 않으면 ExcecutionException 이상을 던집니다.
   private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
}

awaiteDone은 간단히 FutureTask의 상태를 조회하는 것으로 볼 수 있다.awaiteDone을 실행하는 동안(get이 막히는 동안):
1. get을 실행하는 루트가 중단되면FutureTask의 모든 막힌 대기열의 루트(waiters)를 제거하고 중단 이상을 던집니다.
2. FutureTask의 상태가 완성 상태(정상적으로 완성되거나 취소)로 전환되면 완성 상태로 돌아간다.
3. FutureTask의 상태가 COMPLETING으로 변하면 set 결과가 있고 이때 라인을 일등하게 하는 것을 의미한다.
4.FutureTask의 상태가 초기 상태인 NEW라면 현재 스레드는FutureTask의 막힌 스레드에 추가됩니다.
5. get 방법이 시간 초과를 설정하지 않으면 현재 get 라인을 호출하는 것을 막습니다.시간 초과를 설정하면 시간 초과 여부를 판단하고, 시간 초과에 도달하면FutureTask의 모든 차단 행렬의 스레드를 제거하고 이때FutureTask의 상태로 돌아갑니다. 시간 초과가 없으면 남은 시간 동안 현재 스레드를 계속 차단합니다.

좋은 웹페이지 즐겨찾기