JDK 용기 와 병발 - Queue - Priority BlockingQueue

9473 단어 JDK 용기 와 병발
개술
      우선 적 인 무 계 차단 대기 열 을 기반 으로 Priority Queue 의 스 레 드 보안 버 전 입 니 다.
데이터 구조
      배열 의 균형 이 잡 힌 두 갈래 더 미 를 바탕 으로 Priority Queue 를 바탕 으로 자물쇠 하나, 조건 하 나 를 추가 했다.
private transient Object[] queue;

//        
private final ReentrantLock lock;

//      ,  take/poll     
private final Condition notEmpty;

 //    , CAS    ,      queue
private transient volatile int allocationSpinLock;

구조 기
      Priority Queue 와 마찬가지 로 lock, notEmpty 초기 화 를 제외 하고:
public PriorityBlockingQueue() {
	this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
	this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
							 Comparator super E> comparator) {
	if (initialCapacity < 1)
		throw new IllegalArgumentException();
	this.lock = new ReentrantLock();		// lock、notEmpty   
	this.notEmpty = lock.newCondition();
	this.comparator = comparator;
	this.queue = new Object[initialCapacity];
}

public PriorityBlockingQueue(Collection extends E> c) {
	this.lock = new ReentrantLock();
	this.notEmpty = lock.newCondition();
	boolean heapify = true; // true if not known to be in heap order
	boolean screen = true;  // true if must screen for nulls
	if (c instanceof SortedSet>) {
		SortedSet extends E> ss = (SortedSet extends E>) c;
		this.comparator = (Comparator super E>) ss.comparator();
		heapify = false;
	}
	else if (c instanceof PriorityBlockingQueue>) {
		PriorityBlockingQueue extends E> pq =
			(PriorityBlockingQueue extends E>) c;
		this.comparator = (Comparator super E>) pq.comparator();
		screen = false;
		if (pq.getClass() == PriorityBlockingQueue.class) // exact match
			heapify = false;
	}
	Object[] a = c.toArray();
	int n = a.length;
	// If c.toArray incorrectly doesn't return Object[], copy it.
	if (a.getClass() != Object[].class)
		a = Arrays.copyOf(a, n, Object[].class);
	if (screen && (n == 1 || this.comparator != null)) {
		for (int i = 0; i < n; ++i)
			if (a[i] == null)
				throw new NullPointerException();
	}
	this.queue = a;
	this.size = n;
	if (heapify)
		heapify();
}

첨삭 검사
용기 조정 정책 (무제 한 확장 방지)
      절차 (Priority Queue 와 대체적으로 같 습 니 다. 자동 잠 금 방식 으로 배열 을 동적 으로 분배 하 는 것 을 제외 하고 공용 잠 금 에서 quue 를 복사 합 니 다): 1) quue 가 가득 찼 을 때 요소 의 가입 요청 이 있 으 면 용량 확장 을 합 니 다.2) 공용 잠 금 lock 을 가 져 오 는 전제 에서 lock 을 방출 하고 자 회전 잠 금 방식 으로 동적 확장 배열 을 사용 하여 take / poll 스 레 드 와 병행 하여 분 배 를 마 친 후에 lock 을 다시 가 져 올 수 있 습 니 다.3) oldCap 이 64 보다 작 으 면 용량 이 배로 증가한다.그렇지 않 으 면 50% 증가 합 니 다.4) new Cap 이 MAX 에 있 는 지 확인ARRAY_SIZE 범위 내 에서 minCap 에 overflow 가 있 거나 MAX 보다 크 면ARRAY_SIZE, OutOfmory 오류 이상 던 지기;그렇지 않 으 면 용량 이 최대 MAX 를 초과 하지 않 습 니 다.ARRAY_SIZE; 5) 새로운 용량 을 동적 으로 분배 하 는 Object [];6) 공용 자 물 쇠 를 가 져 와 오래된 queue 의 요 소 를 복사 합 니 다.
while ((n = size) >= (cap = (array = queue).length)) // while              queue
		tryGrow(array, cap);

//     queue		
//      ,     ,        take/poll    ,           
private void tryGrow(Object[] array, int oldCap) {
	lock.unlock(); 				//      
	Object[] newArray = null;
	if (allocationSpinLock == 0 &&				//   CAS    allocationSpinLock,      
		UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
								 0, 1)) {
		try {
			int newCap = oldCap + ((oldCap < 64) ?
								   (oldCap + 2) : // grow faster if small
								   (oldCap >> 1));
			if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
				int minCap = oldCap + 1;
				if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
					throw new OutOfMemoryError();
				newCap = MAX_ARRAY_SIZE;
			}
			if (newCap > oldCap && queue == array)
				newArray = new Object[newCap];
		} finally {
			allocationSpinLock = 0;
		}
	}
	if (newArray == null) //          ,    yield
		Thread.yield();
	lock.lock();			    //        
	if (newArray != null && queue == array) {      //     
		queue = newArray;
		System.arraycopy(array, 0, newArray, 0, oldCap);
	}
}

기초 방법
      Priority Queue 와 마찬가지 로 두 개의 인 자 를 추가 하 는 것 을 제외 하고 Object [] array, Comparator 슈퍼 T > cmp 를 추가 하여 동시성 을 확보 합 니 다.
//    x, k   ,         
private static  void siftUpUsingComparator(int k, T x, Object[] array,
								   Comparator super T> cmp) {
	while (k > 0) {
		int parent = (k - 1) >>> 1;
		Object e = array[parent];
		if (cmp.compare(x, (T) e) >= 0)
			break;
		array[k] = e;
		k = parent;
	}
	array[k] = x;
}

//    x, k   ,         
private static  void siftDownUsingComparator(int k, T x, Object[] array,
												int n,
												Comparator super T> cmp) {
	if (n > 0) {
		int half = n >>> 1;
		while (k < half) {
			int child = (k << 1) + 1;
			Object c = array[child];
			int right = child + 1;
			if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
				c = array[child = right];
			if (cmp.compare(x, (T) c) <= 0)
				break;
			array[k] = c;
			k = child;
		}
		array[k] = x;
	}
}

늘다
      절차 (과정 은 Priority Queue 와 마찬가지 로 동시성 잠 금 을 고려 하 는 것 을 제외 하고): 1) 공용 잠 금 lock 을 가 져 옵 니 다.2) 대기 열 이 가득 찼 는 지 확인 하고 가득 차 면 자동 잠 금 방식 으로 용량 확장 을 합 니 다.3) 팀 의 끝 에서 원 소 를 sift Up 하여 이 진 더미 의 균형 성 을 유지한다.4) take / poll 스 레 드 에 notEmpty 신 호 를 보 냅 니 다.5) 자물쇠 잠 금 해제;6) 트 루 로 돌아간다.
또한 Priority BlockingQueue 가 무한 하기 때문에 add, put 작업 은 모두 offer 에 직접 의뢰 하여 진행 합 니 다.
public boolean offer(E e) {
	if (e == null)
		throw new NullPointerException();
	final ReentrantLock lock = this.lock;
	lock.lock();
	int n, cap;
	Object[] array;
	while ((n = size) >= (cap = (array = queue).length))
		tryGrow(array, cap);
	try {
		Comparator super E> cmp = comparator;
		if (cmp == null)
			siftUpComparable(n, e, array);
		else
			siftUpUsingComparator(n, e, array, cmp);
		size = n + 1;
		notEmpty.signal();
	} finally {
		lock.unlock();
	}
	return true;
}

삭제 하 다.
절차 (과정 은 Priority Queue 와 마찬가지 로 동시성 잠 금 을 고려 하 는 것 을 제외 하고): 1) 공용 잠 금 lock 을 가 져 옵 니 다.2) 대기 열 이 비어 있 는 지 확인 하고 비어 있 으 면 null 로 돌아 갑 니 다.3) 대열 의 마지막 요 소 를 꺼 내 색인 0 부터 sift Down 을 하여 이 진 더미 의 균형 성 을 유지 합 니 다.4) 자물쇠 잠 금 해제;5) 팀 의 첫 번 째 원소 값 을 되 돌려 줍 니 다.
public E poll() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		return dequeue();
	} finally {
		lock.unlock();
	}
}

public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	E result;
	try {
		while ( (result = dequeue()) == null)
			notEmpty.await();
	} finally {
		lock.unlock();
	}
	return result;
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	long nanos = unit.toNanos(timeout);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	E result;
	try {
		while ( (result = dequeue()) == null && nanos > 0)
			nanos = notEmpty.awaitNanos(nanos);
	} finally {
		lock.unlock();
	}
	return result;
}

private E dequeue() {
	int n = size - 1;
	if (n < 0)
		return null;
	else {
		Object[] array = queue;
		E result = (E) array[0];
		E x = (E) array[n];
		array[n] = null;
		Comparator super E> cmp = comparator;
		if (cmp == null)
			siftDownComparable(0, x, array, n);
		else
			siftDownUsingComparator(0, x, array, n, cmp);
		size = n;
		return result;
	}
}

조사 하 다.
      절차 (과정 은 Priority Queue 와 마찬가지 로 동시성 잠 금 을 고려 하 는 것 을 제외 하고): 1) 공용 잠 금 lock 을 가 져 옵 니 다.2) 대기 열 이 비어 있 는 지 확인 하고 비어 있 으 면 null 로 돌아 갑 니 다.3) 자물쇠 잠 금 해제;4) 팀 의 첫 번 째 원소 값 을 되 돌려 줍 니 다.
public E peek() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		return (size == 0) ? null : (E) queue[0];
	} finally {
		lock.unlock();
	}
}

교체 기
      요소 의 교체 순 서 를 보장 하지 않 고 바 텀 배열 의 사본 을 바탕 으로 이 루어 집 니 다.
public Iterator iterator() {
	return new Itr(toArray());
}

final class Itr implements Iterator {
	final Object[] array; // Array of all elements
	int cursor;           // index of next element to return
	int lastRet;          // index of last element, or -1 if no such

	Itr(Object[] array) {
		lastRet = -1;
		this.array = array;
	}

	public boolean hasNext() {
		return cursor < array.length;
	}

	public E next() {
		if (cursor >= array.length)
			throw new NoSuchElementException();
		lastRet = cursor;
		return (E)array[cursor++];
	}

	public void remove() {
		if (lastRet < 0)
			throw new IllegalStateException();
		removeEQ(array[lastRet]);
		lastRet = -1;
	}
}

특성
Priority BlockingQueue 의 우선 순위 가 같은 요소 처리
      만약 에 여러 요소 의 우선 순위 가 같 으 면 그 순 서 는 고정 되 지 않 고 2 급 비교 방법 으로 순 서 를 정할 수 있 습 니 다. 다음 과 같은 예 는 요소 의 입대 순서에 따라 2 급 비 교 를 할 수 있 습 니 다.
class FIFOEntry>
		implements Comparable> {
	static final AtomicLong seq = new AtomicLong(0);
	final long seqNum;
	final E entry;
	public FIFOEntry(E entry) {
		seqNum = seq.getAndIncrement();
		this.entry = entry;
	}
	public E getEntry() { return entry; }
	public int compareTo(FIFOEntry other) {
		int res = entry.compareTo(other.entry);
		if (res == 0 && other.entry != this.entry)
			res = (seqNum < other.seqNum ? -1 : 1);
		return res;
	}
}

왜 Priority BlockingQueue 의 조작 은 Priority Queue 에 직접 의뢰 하지 않 고 잠 금 을 추가 하여 실현 합 니까?
      allocationSpinLock 은 동적 확장 quue 에서 사용 하여 의뢰 + lock 을 실현 할 수 없습니다.
Priority BlockingQueue 는 Priority Queue 의 잠 금 스 레 드 보안 판 입 니 다.

좋은 웹페이지 즐겨찾기