java.util.concurrent.synchronousQueue 소스 코드 간략 분석

71126 단어 Java
머리말
SynchronousQueue 는 비교적 특수 한 차단 대기 열 구현 클래스 입 니 다.실제 적 으로 요소 에 일련의 저장 공간 을 유지 하지 않 습 니 다.스 레 드 를 유지 합 니 다.요 소 를 추가 하 는 방법 을 호출 하려 면 다른 스 레 드 가 있어 야 합 니 다.그렇지 않 으 면 요 소 를 추가 할 수 없습니다.생산-소비자 모델 에서 이렇게 하 는 장점 은 생산자 에서 소비자 사이 의 지연 을 낮 출 수 있 고 링크 드 블록 링 큐 나 Array BlockingQueue 에서 생산 자 는 반드시 요 소 를 그 안에 저 장 된 요소 의 구조 에 삽입 한 후에 야 소비자 에 게 전달 할 수 있다 는 것 이다.이 클래스 는 자바 병렬 패키지 에서 CacheThreadPool 의 작업 대기 열 로 사 용 됩 니 다.
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                      new SynchronousQueue<Runnable>());
}

쉽게 말 하면 SynchronousQueue 는 다음 과 같은 몇 가지 특징 이 있다.1.SynchronousQueue 는 전통 적 인 집합 류 와 달리 내부 에 고정된 저장 요소 의 공간 이 없다.모든 요 소 를 추가 하 는 작업 은 반드시 다른 스 레 드 로 추출 작업 을 해 야 합 니 다.그렇지 않 으 면 요 소 를 추가 하 는 작업 이 성공 하지 못 하고 반대로 도 마찬가지 입 니 다.2.내부 에 고정된 저장 요소 의 공간 이 없 기 때문에 SynchronousQueue 는 isEmpty,size,remainingCapacity,clear,contains,remove,toArray 등 방법 을 지원 하지 않 고 요소 의 교체 작업 도 지원 하지 않 습 니 다.3.ReentrantLock 과 유사 하 며 공평 과 불공 정 모드 를 지원 합 니 다.
SynchronousQueue 의 자전 작업 은 비교적 복잡 합 니 다.사실 저 는 개인 적 으로 도 잘 모 르 고 대체적인 사상 만 알 기 때문에 이 블 로 그 는 참고 만 할 수 있 습 니 다.
2.소스 코드 분석
SynchronousQueue 는 두 개의 Public 구조 방법 을 제공 합 니 다.그 방법 은 서명 과 역할 은 다음 과 같 습 니 다.
방법 서명
해명 하 다.
public SynchronousQueue()
불공 정 모드 의 차단 대기 열 을 만 듭 니 다.
public SynchronousQueue(boolean)
인자 가 true 일 때 공정 모드 의 차단 대기 열 을 만 듭 니 다.false 일 때 효과 가 무 참 구조 기와 같 습 니 다.
public SynchronousQueue() {
	this(false);
}
public SynchronousQueue(boolean fair) {
	transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

SynchronousQueue 는 두 개의 내부 클래스 TransferQueue 와 TransferStack 을 통 해 소비자 의 공평 성 을 실현 한다.TransferQueue 의 밑바닥 은 대열 로 이 루어 지고 공평 전략 을 실현 하 는 데 사용 된다.TransferStack 의 밑바닥 은 창고 로 이 루어 지고 불공평 한 전략 을 실현 하 는 데 사용 된다.그들 은 모두 내부 추상 클래스 Transferer 를 계승 했다.공정 한 전략 에서 가장 오래 기다 리 는 소비자 스 레 드 가 요 소 를 우선 받 거나 가장 오래 기다 리 는 생산자 스 레 드 가 요 소 를 우선 전달 할 수 있 도록 보장 할 수 있다.
abstract static class Transferer<E> {
	abstract E transfer(E e, boolean timed, long nanos);
}

Transferer 클래스 의 일반적인 매개 변수 E 는 대기 열 요소 유형 으로 하나의 추상 적 인 방법 만 제공 합 니 다.transfer 방법 은 SynchronousQueue 류 에서 모든 가입 팀 작업 은 이 방법 을 바탕 으로 이 루어 집 니 다.그 매개 변수 e 는'삽입'요소 참조 입 니 다.즉,소비자 에 게 요 소 를 전달 하 는 것 입 니 다(e 가 null 일 때'추출'요 소 를 표시 합 니 다.즉,생산자 에서 요 소 를 꺼 내 려 고 시도 합 니 다).timed 는 이 작업 에 최대 차단 시간 이 있 는 지 를 표시 하 는 데 사 용 됩 니 다.nanos 는 최대 차단 시간 을 표시 합 니 다.
다음은 SynchronousQueue 의 주요 구성원 변수 와 정적 상수 입 니 다.
public class SynchronousQueue<E> extends AbstractQueue<E>
    	implements BlockingQueue<E>, java.io.Serializable {
    //  CPU   
    static final int NCPUS = Runtime.getRuntime().availableProcessors();
    //                 ,  CPU 0,   32
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
    //                 
    static final int maxUntimedSpins = maxTimedSpins * 16;
    static final long spinForTimeoutThreshold = 1000L;
    //       
    private transient volatile Transferer<E> transferer;
}

생산자 로 서 소비자 스 레 드 에 요 소 를 전달 하고 싶다 면 put,offer 방법 을 사용 할 수 있 습 니 다.
public void put(E e) throws InterruptedException {
	if (e == null) throw new NullPointerException();
	//           ,               
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

put 방법 은 요 소 를 추가 하려 고 시도 합 니 다.성공 하지 않 으 면 Interrupted Exception offer 방법 은 두 가지 재 부팅 방법 이 있 습 니 다.boolean offer(E)와 boolean offer(E,long,TimeUnit)전 자 는 차단 되 지 않 은 방식 으로 소비자 스 레 드 에 요 소 를 전달 하고 실패 하면 false 로 돌아 갑 니 다.후 자 는 가장 긴 차단 시간 을 지정 할 수 있다.
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;
}

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    //            ,  true
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    //        ,  false,      
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

소비자 로 서 생산자 스 레 드 에서 요 소 를 얻 으 려 면 poll,take 방법 take 방법 으로 현재 스 레 드 에서 요 소 를 가 져 오 거나 스 레 드 가 중 단 될 때 까지 차단 할 수 있 습 니 다.
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
   	//             ,   
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

poll 방법 역시 두 가지 과부하 방법 이 있 는데 그 역할 은 상기 offer 와 유사 하여 더 이상 설명 하지 않 습 니 다.
public E poll() {
    return transferer.transfer(null, true, 0);
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

사실 알 수 있 듯 이 이런 입단,출전 조작 은 모두 이 transfer 방법 을 중심 으로 이 루어 진 것 이다.그 핵심 실현 원 리 는 모두 Transferer 류 를 중심 으로 하 는 것 이다.다음은 내부 류 TransferQueue,TransferStack 을 차례대로 분석 하 자.
1、TransferQueue
TransferQueue 는 SynchronousQueue 의 공정 한 정책 구현 클래스 로 내부 에서 스 레 드 대기 열 을 유지 합 니 다.
static final class TransferQueue<E> extends Transferer<E> {
	//     
	transient volatile QNode head;
	//     
	transient volatile QNode tail;
	//                    
	transient volatile QNode cleanMe;
	
	TransferQueue() {
		//                
        QNode h = new QNode(null, false);
        head = h;
        tail = h;
    }
	//      ...
}

TransferQueue 에 대해 그 결점 은 모두 실현 클래스 TransferQueue 의 내부 클래스 QNode 입 니 다.
static final class QNode {
	//    
    volatile QNode next;
    //         , item==this ,  waiter  interrupt
    volatile Object item;
    //  transfer     
    volatile Thread waiter;
    //               
    final boolean isData;

	QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }
}

QNode 는 데이터 노드 와 요청 노드 두 가지 유형 으로 나 뉜 다.데이터 노드 는 생산자 스 레 드 로 구성 되 고 요청 노드 는 소비자 스 레 드 로 구성 된다.이러한 유형의 역할 과 구성원 변 수 를 초보 적 으로 파악 한 후에 우 리 는 고 개 를 돌려 TransferQueue 의 transfer 방법 을 분석 하여 실현 한다.
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
	//    ,          
	QNode s = null;
    boolean isData = (e != null);
	//        
    for (;;) {
    	//            
    	QNode t = tail;
        QNode h = head;
        //       null,       
        if (t == null || h == null)
        	continue;
		//            (          ,             )
        if (h == t || t.isData == isData) {
        	//           
            QNode tn = t.next;
            //                   
            if (t != tail)
                continue;
            //         null,         
            if (tn != null) {
            	//  CAS       ,      
                advanceTail(t, tn);
                continue;
            }
            //            0 ,   null
            if (timed && nanos <= 0)
                return null;
            //     s,       
            if (s == null)
                s = new QNode(e, isData);
            //  CAS t      null    s,      ,           
            if (!t.casNext(null, s))
                continue;
			//     ,     tail         
            advanceTail(t, s);
            //         
            Object x = awaitFulfill(s, e, timed, nanos);
            //       s,      ,  clean      
            if (x == s) {
                clean(t, s);
                return null;
            }
			//  s    
            if (!s.isOffList()) {
            	//  CAS       (       )       
                advanceHead(t, s);
                if (x != null)
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;
        } else { 
        	//           
            QNode m = h.next;
            //                   ,        
            if (t != tail || m == null || h != head)
                continue;
			//                
            Object x = m.item;
            //          
            if (isData == (x != null) || x == m || !m.casItem(x, e)) { 
            	//                ,      
                advanceHead(h, m);
                continue;
            }
			//                ,   m      
            advanceHead(h, m); 
            LockSupport.unpark(m.waiter);
            //    
            return (x != null) ? (E)x : e;
        }
    }
 }

transfer 방법 은 잠 금 이 없 는 스 레 드 안전 방법 으로 스 레 드 의 자전 작업 을 통 해 스 레 드 안전성 을 확보 합 니 다.이 방법 은 요약 하면 주요 임 무 는 다음 과 같다.1.이때 대기 열 이 비어 있 거나(하나의 머리 노드 만 존재 한다)대기 열 요소 가 데이터 노드 이 고 현재 스 레 드 도 데이터 노드 를 추가 하려 고 시도 한다(또는 대기 열 요 소 는 요청 노드 이 고 현재 스 레 드 도 데 이 터 를 가 져 오 려 고 시도 한다.여 기 는 패턴 이 같다).그러면 현재 스 레 드 대상 을..데이터(있 으 면)와 노드 형식 을 대기 열 끝 에 삽입 하고 현재 스 레 드 가 깨 어 나 거나 interrupt 되 기 를 기다 리 는 것 을 막 습 니 다.이 방법 은 대량의 if 판단 과 CAS 작업 은 잠 금 없 는 조건 에서 의 스 레 드 안전 을 확보 하기 위 한 것 이다.2.그렇지 않 으 면 대기 열 이 비어 있 지 않 고 패턴 이 다르다 는 것 을 의미한다.이 방법 은 대기 행렬 의 머리 가 기다 리 는 스 레 드 를 깨 우 고 팀 에서 나 오 려 고 시도 할 것 이다.
이러한 CAS 작업 은 많은 자바 병렬 공구 꾸러미 의 클래스 와 마찬가지 로 sun.misc.Unsafe 류 의 CAS 작업 을 통 해 이 루어 집 니 다.
다음은 스 레 드 를 차단 하 는 방법 을 분석 하 겠 습 니 다:await Fulfill
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
	//     
	final long deadline = timed ? System.nanoTime() + nanos : 0L;
	//      
	Thread w = Thread.currentThread();
	//             
    int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
    	//         ,     s      e    
    	if (w.isInterrupted())
        	s.tryCancel(e);
        //  s     
        Object x = s.item;
        //  s       e,       ,    ,    x
        if (x != e)
            return x;
        //         
        if (timed) {
            nanos = deadline - System.nanoTime();
            //         ,     s     e    ,      
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        //                 
        if (spins > 0)
            --spins;
        //    s      null,          
        else if (s.waiter == null)
            s.waiter = w;
       //      ,    
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

awaitFulfill 방법 은 4 개의 매개 변 수 를 입력 해 야 합 니 다.노드 참조 s,데이터 참조 e(일반적인 상황 에서 s 의 데이터 노드 는 e),시간 초과 와 차단 시간 이 있 는 지 여부 입 니 다.자전 횟수 spins 에 대해 서 는 시간 초과 나 스 레 드 에 interrupt 이 있 는 지 없 는 지 를 계속 계산 하 는 것 이 목적 입 니 다.다 핵 상황 에서 시간 초과 가 지 정 된 경우 자전 횟수 는 32 회,지정 되 지 않 은 경우 512 회)다.자전 작업 이 완료 되면 이 방법 은 LockSupport 의 Park 방법 으로 현재 스 레 드 를 막 고 기 다 립 니 다.
이 방법 으로 호출 된 스 레 드 가 interrupt 또는 시간 초과 되면 transfer 방법 은 clean 방법 으로 이 쓸모없는 노드 를 제거 합 니 다.
//pred s       
void clean(QNode pred, QNode s) {
	//      
	s.waiter = null;
    while (pred.next == s) {
    	QNode h = head;
        QNode hn = h.next;
        //              
        if (hn != null && hn.isCancelled()) {
        	//       hn,      
        	advanceHead(h, hn);
            continue;
        }
        QNode t = tail;
        //        ,    
        if (t == h)
        	return;
       	//            ,      
        QNode tn = t.next;
        if (t != tail)
            continue;
        //                  ,       ,      
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
        //    s     
        if (s != t) {
        	//    s      
        	QNode sn = s.next;
        	//       
            if (sn == s || pred.casNext(s, sn))
            	return;
        }
        //            
        QNode dp = cleanMe;
        //            null
        if (dp != null) {
        	QNode d = dp.next;
            QNode dn;
            if (d == null || d == dp || !d.isCancelled() ||
            	(d != t &&  (dn = d.next) != null &&  dn != d && dp.casNext(d, dn)))
            	//      
            	casCleanMe(dp, null);
          	//        pred,
            if (dp == pred)
                return;
        //   cleanMe  pred
        } else if (casCleanMe(null, pred))
            return;
    }
}

2、TransferStack
TransferStack 은 SynchronousQueue 의 불공 정 정책 구현 클래스 로 내부 에서 스 레 드 스 택 을 유지 합 니 다.
static final class TransferStack<E> extends Transferer<E> {
	//      
	static final int REQUEST = 0;
	//      
	static final int DATA = 1;
	//        
	static final int FULFILLING = 2;
	//   
	volatile SNode head;
}

TransferStack 클래스 는 하나의 인 스 턴 스 변수 만 있 습 니 다.head,유형 은 SNode 이 고 SNode 는 TransferStack 의 내부 클래스 입 니 다.인 스 턴 스 는 하나의 노드 를 대표 합 니 다.
static final class SNode {
	//      
	volatile SNode next;
	//      
	volatile SNode match;
	//       
	volatile Thread waiter;
	//       (            null)
	Object item;
	//        
	int mode;
	
	SNode(Object item) {
    	this.item = item;
    }
}

그 중에서 mode 의 첫 번 째 는 이 노드 가 데이터 노드 인지 요청 노드 인지(0 은 요청 노드 이 고 1 은 데이터 노드),두 번 째 는 노드 의 상태(1 은 이 노드 가 데 이 터 를 인수 인계 하고 있 음 을 나타 낸다)를 나타 낸다.다음은 TransferStack 의 transfer 방법 실현:
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
	SNode s = null;
	//  e   null            
	int mode = (e == null) ? REQUEST : DATA;
	//      
	for (;;) {
        SNode h = head;
        //           
        if (h == null || h.mode == mode) {
        	//              0
            if (timed && nanos <= 0) {
            	//       null         ,                   (        null)
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                //      null
                else
                    return null;
            //        ,  CAS       ,           
            } else if (casHead(h, s = snode(s, e, h, mode))) {
            	//  awaitFulfill        
                SNode m = awaitFulfill(s, timed, nanos);
                //       s,      ,  null
                if (m == s) {
                    clean(s);
                    return null;
                }
                //      ,            s,          
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);
                //           ,      m    ,    s    
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        //            
        } else if (!isFulfilling(h.mode)) {
        	//           ,    CAS               
            if (h.isCancelled())
                casHead(h, h.next);
            //        ,  CAS          (     ),        
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) {
                	//             
                	SNode m = s.next;
                	//        null,        null,        ,    
                    if (m == null) {
                        casHead(s, null); 
                        s = null;
                        break;
                    }
                    SNode mn = m.next;
                    //    
                    if (m.tryMatch(s)) {
                    	//         , s m  
                        casHead(s, mn);
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    //     ,             
                    } else
                        s.casNext(m, mn);
                }
            }
        //          ,               (    mode    2  1)
        } else {
            SNode m = h.next;
            //            null,       
            if (m == null) 
                casHead(h, null); 
            else {
            	//         
                SNode mn = m.next;
                // m h  ,            , s m  
                if (m.tryMatch(h))
                    casHead(h, mn);
                //         
                else
                    h.casNext(m, mn);
            }
        }
    }
}

요약 하면 transfer 방법 은 자전 작업 에서 세 가지 상황 이 있 습 니 다.1.스 택 이 비어 있 거나 모드 가 같 으 면(Transfer Queue 와 유사)새로운 결산 점 을 만 들 고 스 택 에 눌 러 서 현재 스 레 드 가 요소 가 연결 되 거나 interrupt 되 기 를 기다 리 도록 합 니 다.2.스 택 이 비어 있 지 않 고 패턴 이 다 르 면 현재 결산 점 의 mode 변 수 를 일치 하 는 표 시 를 추가 하여 스 택 에 눌 러 서 원래 스 택 의 정상 요소 와 일치 합 니 다.일치 하 는 데 성공 하면 스 택 에서 이 두 개의 결산 점 을 팝 업 합 니 다.3.만약 에 결점 이 일치 하고 있다 면 이 결점 을 일치 시 키 고 일치 한 후에 두 노드 를 팝 업 하여 다음 순환 을 계속 시작 합 니 다.
try Match 방법 은 SNode 의 비정 상 방법 입 니 다.
boolean tryMatch(SNode s) {
	//      match null,    CAS match   s,         
	if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
		Thread w = waiter;
		//  waiter    null,          waiter    null
        if (w != null) {
        	waiter = null;
            LockSupport.unpark(w);
        }
        //    
        return true;
    }
    //    match    s
    return match == s;
}

try Match 방법의 주요 임 무 는 이 노드 에서 기다 리 는 스 레 드 를 깨 우 는 것 입 니 다.CAS 알고리즘 을 통 해 하나의 스 레 드 만 match 인용 을 s 로 설정 하여 잠 금 조건 이 없 는 스 레 드 안전성 을 확보 하 는 것 입 니 다.

좋은 웹페이지 즐겨찾기