java.util.concurrent.synchronousQueue 소스 코드 간략 분석
71126 단어 Java
SynchronousQueue 는 비교적 특수 한 차단 대기 열 구현 클래스 입 니 다.실제 적 으로 요소 에 일련의 저장 공간 을 유지 하지 않 습 니 다.스 레 드 를 유지 합 니 다.요 소 를 추가 하 는 방법 을 호출 하려 면 다른 스 레 드 가 있어 야 합 니 다.그렇지 않 으 면 요 소 를 추가 할 수 없습니다.생산-소비자 모델 에서 이렇게 하 는 장점 은 생산자 에서 소비자 사이 의 지연 을 낮 출 수 있 고 링크 드 블록 링 큐 나 Array BlockingQueue 에서 생산 자 는 반드시 요 소 를 그 안에 저 장 된 요소 의 구조 에 삽입 한 후에 야 소비자 에 게 전달 할 수 있다 는 것 이다.이 클래스 는 자바 병렬 패키지 에서 CacheThreadPool 의 작업 대기 열 로 사 용 됩 니 다.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
쉽게 말 하면 SynchronousQueue 는 다음 과 같은 몇 가지 특징 이 있다.1.SynchronousQueue 는 전통 적 인 집합 류 와 달리 내부 에 고정된 저장 요소 의 공간 이 없다.모든 요 소 를 추가 하 는 작업 은 반드시 다른 스 레 드 로 추출 작업 을 해 야 합 니 다.그렇지 않 으 면 요 소 를 추가 하 는 작업 이 성공 하지 못 하고 반대로 도 마찬가지 입 니 다.2.내부 에 고정된 저장 요소 의 공간 이 없 기 때문에 SynchronousQueue 는 isEmpty,size,remainingCapacity,clear,contains,remove,toArray 등 방법 을 지원 하지 않 고 요소 의 교체 작업 도 지원 하지 않 습 니 다.3.ReentrantLock 과 유사 하 며 공평 과 불공 정 모드 를 지원 합 니 다.
SynchronousQueue 의 자전 작업 은 비교적 복잡 합 니 다.사실 저 는 개인 적 으로 도 잘 모 르 고 대체적인 사상 만 알 기 때문에 이 블 로 그 는 참고 만 할 수 있 습 니 다.
2.소스 코드 분석
SynchronousQueue 는 두 개의 Public 구조 방법 을 제공 합 니 다.그 방법 은 서명 과 역할 은 다음 과 같 습 니 다.
방법 서명
해명 하 다.
public SynchronousQueue()
불공 정 모드 의 차단 대기 열 을 만 듭 니 다.
public SynchronousQueue(boolean)
인자 가 true 일 때 공정 모드 의 차단 대기 열 을 만 듭 니 다.false 일 때 효과 가 무 참 구조 기와 같 습 니 다.
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue 는 두 개의 내부 클래스 TransferQueue 와 TransferStack 을 통 해 소비자 의 공평 성 을 실현 한다.TransferQueue 의 밑바닥 은 대열 로 이 루어 지고 공평 전략 을 실현 하 는 데 사용 된다.TransferStack 의 밑바닥 은 창고 로 이 루어 지고 불공평 한 전략 을 실현 하 는 데 사용 된다.그들 은 모두 내부 추상 클래스 Transferer 를 계승 했다.공정 한 전략 에서 가장 오래 기다 리 는 소비자 스 레 드 가 요 소 를 우선 받 거나 가장 오래 기다 리 는 생산자 스 레 드 가 요 소 를 우선 전달 할 수 있 도록 보장 할 수 있다.
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
Transferer 클래스 의 일반적인 매개 변수 E 는 대기 열 요소 유형 으로 하나의 추상 적 인 방법 만 제공 합 니 다.transfer 방법 은 SynchronousQueue 류 에서 모든 가입 팀 작업 은 이 방법 을 바탕 으로 이 루어 집 니 다.그 매개 변수 e 는'삽입'요소 참조 입 니 다.즉,소비자 에 게 요 소 를 전달 하 는 것 입 니 다(e 가 null 일 때'추출'요 소 를 표시 합 니 다.즉,생산자 에서 요 소 를 꺼 내 려 고 시도 합 니 다).timed 는 이 작업 에 최대 차단 시간 이 있 는 지 를 표시 하 는 데 사 용 됩 니 다.nanos 는 최대 차단 시간 을 표시 합 니 다.
다음은 SynchronousQueue 의 주요 구성원 변수 와 정적 상수 입 니 다.
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// CPU
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// , CPU 0, 32
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
//
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
//
private transient volatile Transferer<E> transferer;
}
생산자 로 서 소비자 스 레 드 에 요 소 를 전달 하고 싶다 면 put,offer 방법 을 사용 할 수 있 습 니 다.
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// ,
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
put 방법 은 요 소 를 추가 하려 고 시도 합 니 다.성공 하지 않 으 면 Interrupted Exception offer 방법 은 두 가지 재 부팅 방법 이 있 습 니 다.boolean offer(E)와 boolean offer(E,long,TimeUnit)전 자 는 차단 되 지 않 은 방식 으로 소비자 스 레 드 에 요 소 를 전달 하고 실패 하면 false 로 돌아 갑 니 다.후 자 는 가장 긴 차단 시간 을 지정 할 수 있다.
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
// , true
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
// , false,
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
소비자 로 서 생산자 스 레 드 에서 요 소 를 얻 으 려 면 poll,take 방법 take 방법 으로 현재 스 레 드 에서 요 소 를 가 져 오 거나 스 레 드 가 중 단 될 때 까지 차단 할 수 있 습 니 다.
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
// ,
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
poll 방법 역시 두 가지 과부하 방법 이 있 는데 그 역할 은 상기 offer 와 유사 하여 더 이상 설명 하지 않 습 니 다.
public E poll() {
return transferer.transfer(null, true, 0);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
사실 알 수 있 듯 이 이런 입단,출전 조작 은 모두 이 transfer 방법 을 중심 으로 이 루어 진 것 이다.그 핵심 실현 원 리 는 모두 Transferer 류 를 중심 으로 하 는 것 이다.다음은 내부 류 TransferQueue,TransferStack 을 차례대로 분석 하 자.
1、TransferQueue
TransferQueue 는 SynchronousQueue 의 공정 한 정책 구현 클래스 로 내부 에서 스 레 드 대기 열 을 유지 합 니 다.
static final class TransferQueue<E> extends Transferer<E> {
//
transient volatile QNode head;
//
transient volatile QNode tail;
//
transient volatile QNode cleanMe;
TransferQueue() {
//
QNode h = new QNode(null, false);
head = h;
tail = h;
}
// ...
}
TransferQueue 에 대해 그 결점 은 모두 실현 클래스 TransferQueue 의 내부 클래스 QNode 입 니 다.
static final class QNode {
//
volatile QNode next;
// , item==this , waiter interrupt
volatile Object item;
// transfer
volatile Thread waiter;
//
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
}
QNode 는 데이터 노드 와 요청 노드 두 가지 유형 으로 나 뉜 다.데이터 노드 는 생산자 스 레 드 로 구성 되 고 요청 노드 는 소비자 스 레 드 로 구성 된다.이러한 유형의 역할 과 구성원 변 수 를 초보 적 으로 파악 한 후에 우 리 는 고 개 를 돌려 TransferQueue 의 transfer 방법 을 분석 하여 실현 한다.
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
// ,
QNode s = null;
boolean isData = (e != null);
//
for (;;) {
//
QNode t = tail;
QNode h = head;
// null,
if (t == null || h == null)
continue;
// ( , )
if (h == t || t.isData == isData) {
//
QNode tn = t.next;
//
if (t != tail)
continue;
// null,
if (tn != null) {
// CAS ,
advanceTail(t, tn);
continue;
}
// 0 , null
if (timed && nanos <= 0)
return null;
// s,
if (s == null)
s = new QNode(e, isData);
// CAS t null s, ,
if (!t.casNext(null, s))
continue;
// , tail
advanceTail(t, s);
//
Object x = awaitFulfill(s, e, timed, nanos);
// s, , clean
if (x == s) {
clean(t, s);
return null;
}
// s
if (!s.isOffList()) {
// CAS ( )
advanceHead(t, s);
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else {
//
QNode m = h.next;
// ,
if (t != tail || m == null || h != head)
continue;
//
Object x = m.item;
//
if (isData == (x != null) || x == m || !m.casItem(x, e)) {
// ,
advanceHead(h, m);
continue;
}
// , m
advanceHead(h, m);
LockSupport.unpark(m.waiter);
//
return (x != null) ? (E)x : e;
}
}
}
transfer 방법 은 잠 금 이 없 는 스 레 드 안전 방법 으로 스 레 드 의 자전 작업 을 통 해 스 레 드 안전성 을 확보 합 니 다.이 방법 은 요약 하면 주요 임 무 는 다음 과 같다.1.이때 대기 열 이 비어 있 거나(하나의 머리 노드 만 존재 한다)대기 열 요소 가 데이터 노드 이 고 현재 스 레 드 도 데이터 노드 를 추가 하려 고 시도 한다(또는 대기 열 요 소 는 요청 노드 이 고 현재 스 레 드 도 데 이 터 를 가 져 오 려 고 시도 한다.여 기 는 패턴 이 같다).그러면 현재 스 레 드 대상 을..데이터(있 으 면)와 노드 형식 을 대기 열 끝 에 삽입 하고 현재 스 레 드 가 깨 어 나 거나 interrupt 되 기 를 기다 리 는 것 을 막 습 니 다.이 방법 은 대량의 if 판단 과 CAS 작업 은 잠 금 없 는 조건 에서 의 스 레 드 안전 을 확보 하기 위 한 것 이다.2.그렇지 않 으 면 대기 열 이 비어 있 지 않 고 패턴 이 다르다 는 것 을 의미한다.이 방법 은 대기 행렬 의 머리 가 기다 리 는 스 레 드 를 깨 우 고 팀 에서 나 오 려 고 시도 할 것 이다.
이러한 CAS 작업 은 많은 자바 병렬 공구 꾸러미 의 클래스 와 마찬가지 로 sun.misc.Unsafe 류 의 CAS 작업 을 통 해 이 루어 집 니 다.
다음은 스 레 드 를 차단 하 는 방법 을 분석 하 겠 습 니 다:await Fulfill
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
//
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//
Thread w = Thread.currentThread();
//
int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// , s e
if (w.isInterrupted())
s.tryCancel(e);
// s
Object x = s.item;
// s e, , , x
if (x != e)
return x;
//
if (timed) {
nanos = deadline - System.nanoTime();
// , s e ,
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
//
if (spins > 0)
--spins;
// s null,
else if (s.waiter == null)
s.waiter = w;
// ,
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
awaitFulfill 방법 은 4 개의 매개 변 수 를 입력 해 야 합 니 다.노드 참조 s,데이터 참조 e(일반적인 상황 에서 s 의 데이터 노드 는 e),시간 초과 와 차단 시간 이 있 는 지 여부 입 니 다.자전 횟수 spins 에 대해 서 는 시간 초과 나 스 레 드 에 interrupt 이 있 는 지 없 는 지 를 계속 계산 하 는 것 이 목적 입 니 다.다 핵 상황 에서 시간 초과 가 지 정 된 경우 자전 횟수 는 32 회,지정 되 지 않 은 경우 512 회)다.자전 작업 이 완료 되면 이 방법 은 LockSupport 의 Park 방법 으로 현재 스 레 드 를 막 고 기 다 립 니 다.
이 방법 으로 호출 된 스 레 드 가 interrupt 또는 시간 초과 되면 transfer 방법 은 clean 방법 으로 이 쓸모없는 노드 를 제거 합 니 다.
//pred s
void clean(QNode pred, QNode s) {
//
s.waiter = null;
while (pred.next == s) {
QNode h = head;
QNode hn = h.next;
//
if (hn != null && hn.isCancelled()) {
// hn,
advanceHead(h, hn);
continue;
}
QNode t = tail;
// ,
if (t == h)
return;
// ,
QNode tn = t.next;
if (t != tail)
continue;
// , ,
if (tn != null) {
advanceTail(t, tn);
continue;
}
// s
if (s != t) {
// s
QNode sn = s.next;
//
if (sn == s || pred.casNext(s, sn))
return;
}
//
QNode dp = cleanMe;
// null
if (dp != null) {
QNode d = dp.next;
QNode dn;
if (d == null || d == dp || !d.isCancelled() ||
(d != t && (dn = d.next) != null && dn != d && dp.casNext(d, dn)))
//
casCleanMe(dp, null);
// pred,
if (dp == pred)
return;
// cleanMe pred
} else if (casCleanMe(null, pred))
return;
}
}
2、TransferStack
TransferStack 은 SynchronousQueue 의 불공 정 정책 구현 클래스 로 내부 에서 스 레 드 스 택 을 유지 합 니 다.
static final class TransferStack<E> extends Transferer<E> {
//
static final int REQUEST = 0;
//
static final int DATA = 1;
//
static final int FULFILLING = 2;
//
volatile SNode head;
}
TransferStack 클래스 는 하나의 인 스 턴 스 변수 만 있 습 니 다.head,유형 은 SNode 이 고 SNode 는 TransferStack 의 내부 클래스 입 니 다.인 스 턴 스 는 하나의 노드 를 대표 합 니 다.
static final class SNode {
//
volatile SNode next;
//
volatile SNode match;
//
volatile Thread waiter;
// ( null)
Object item;
//
int mode;
SNode(Object item) {
this.item = item;
}
}
그 중에서 mode 의 첫 번 째 는 이 노드 가 데이터 노드 인지 요청 노드 인지(0 은 요청 노드 이 고 1 은 데이터 노드),두 번 째 는 노드 의 상태(1 은 이 노드 가 데 이 터 를 인수 인계 하고 있 음 을 나타 낸다)를 나타 낸다.다음은 TransferStack 의 transfer 방법 실현:
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
// e null
int mode = (e == null) ? REQUEST : DATA;
//
for (;;) {
SNode h = head;
//
if (h == null || h.mode == mode) {
// 0
if (timed && nanos <= 0) {
// null , ( null)
if (h != null && h.isCancelled())
casHead(h, h.next);
// null
else
return null;
// , CAS ,
} else if (casHead(h, s = snode(s, e, h, mode))) {
// awaitFulfill
SNode m = awaitFulfill(s, timed, nanos);
// s, , null
if (m == s) {
clean(s);
return null;
}
// , s,
if ((h = head) != null && h.next == s)
casHead(h, s.next);
// , m , s
return (E) ((mode == REQUEST) ? m.item : s.item);
}
//
} else if (!isFulfilling(h.mode)) {
// , CAS
if (h.isCancelled())
casHead(h, h.next);
// , CAS ( ),
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) {
//
SNode m = s.next;
// null, null, ,
if (m == null) {
casHead(s, null);
s = null;
break;
}
SNode mn = m.next;
//
if (m.tryMatch(s)) {
// , s m
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
// ,
} else
s.casNext(m, mn);
}
}
// , ( mode 2 1)
} else {
SNode m = h.next;
// null,
if (m == null)
casHead(h, null);
else {
//
SNode mn = m.next;
// m h , , s m
if (m.tryMatch(h))
casHead(h, mn);
//
else
h.casNext(m, mn);
}
}
}
}
요약 하면 transfer 방법 은 자전 작업 에서 세 가지 상황 이 있 습 니 다.1.스 택 이 비어 있 거나 모드 가 같 으 면(Transfer Queue 와 유사)새로운 결산 점 을 만 들 고 스 택 에 눌 러 서 현재 스 레 드 가 요소 가 연결 되 거나 interrupt 되 기 를 기다 리 도록 합 니 다.2.스 택 이 비어 있 지 않 고 패턴 이 다 르 면 현재 결산 점 의 mode 변 수 를 일치 하 는 표 시 를 추가 하여 스 택 에 눌 러 서 원래 스 택 의 정상 요소 와 일치 합 니 다.일치 하 는 데 성공 하면 스 택 에서 이 두 개의 결산 점 을 팝 업 합 니 다.3.만약 에 결점 이 일치 하고 있다 면 이 결점 을 일치 시 키 고 일치 한 후에 두 노드 를 팝 업 하여 다음 순환 을 계속 시작 합 니 다.
try Match 방법 은 SNode 의 비정 상 방법 입 니 다.
boolean tryMatch(SNode s) {
// match null, CAS match s,
if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
// waiter null, waiter null
if (w != null) {
waiter = null;
LockSupport.unpark(w);
}
//
return true;
}
// match s
return match == s;
}
try Match 방법의 주요 임 무 는 이 노드 에서 기다 리 는 스 레 드 를 깨 우 는 것 입 니 다.CAS 알고리즘 을 통 해 하나의 스 레 드 만 match 인용 을 s 로 설정 하여 잠 금 조건 이 없 는 스 레 드 안전성 을 확보 하 는 것 입 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
JPA + QueryDSL 계층형 댓글, 대댓글 구현(2)이번엔 전편에 이어서 계층형 댓글, 대댓글을 다시 리팩토링해볼 예정이다. 이전 게시글에서는 계층형 댓글, 대댓글을 구현은 되었지만 N+1 문제가 있었다. 이번에는 그 N+1 문제를 해결해 볼 것이다. 위의 로직은 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.