자바 에서 흔히 볼 수 있 는 차단 대기 열 총화
차단 대기 열과 일반 대기 열 은 주로 차단 두 글자 로 구 분 됩 니 다.
4.567917.차단 추가:대기 열 이 가득 찼 을 때 요소 스 레 드 를 추가 하면 차단 되 고 대기 열 이 불만 이 있 을 때 까지 스 레 드 를 깨 워 추가 작업 을 수행 합 니 다4.567917.차단 삭제:대기 열 요소 가 비어 있 을 때 요소 스 레 드 를 삭제 하면 대기 열 이 비어 있 지 않 을 때 까지 삭제 작업 을 수행 합 니 다흔히 볼 수 있 는 차단 대기 열 은 링크 드 BlockingQueue 와 Array BlockingQueue 가 있 습 니 다.그 중에서 모두 BlockingQueue 인 터 페 이 스 를 실현 합 니 다.이 인 터 페 이 스 는 차단 대기 열 이 실현 해 야 할 핵심 방법 을 정의 합 니 다.
public interface BlockingQueue<E> extends Queue<E> {
// , true, IllegalStateException
boolean add(E e);
// , true, false
boolean offer(E e);
//
void put(E e) throws InterruptedException;
// ,
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//
E take() throws InterruptedException;
// ,
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//
int remainingCapacity();
// , true, false
boolean remove(Object o);
//
public boolean contains(Object o);
//
int drainTo(Collection<? super E> c);
//
int drainTo(Collection<? super E> c, int maxElements);
}
위의 방법 외 에 도 Queue 인 터 페 이 스 를 계승 하 는 세 가지 방법 이 자주 사용 된다.
// , , NoSuchElementException
E element();
// , , null
E peek();
// , nul
E poll();
구체 적 인 작용 에 따라 방법 은 다음 과 같은 세 가지 로 나 눌 수 있다.Array BlockingQueue 는 배열 을 바탕 으로 이 루어 지고 대열 의 선진 적 인 특성 을 만족 시 킵 니 다.다음은 코드 를 통 해 초보 적 으로 인식 하 겠 습 니 다.
public class ArrayBlockingQueueTest {
ArrayBlockingQueue<TestProduct> queue = new ArrayBlockingQueue<TestProduct>(1);
public static void main(String[] args) {
ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
new Thread(test.new Product()).start();
new Thread(test.new Customer()).start();
}
class Product implements Runnable {
@Override
public void run() {
while (true) {
try {
queue.put(new TestProduct());
System.out.println(" ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Customer implements Runnable {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
queue.take();
System.out.println(" ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class TestProduct {
}
}
상기 코드 는 비교적 간단 하 다.용량 이 1 인 차단 대기 열 에서 생산자 와 소비 자 는 용량 제한 으로 인해 순서대로 운행 을 막는다.Array BlockingQueue 는 ReentrantLock 자물쇠 와 Condition 대기 열 을 기반 으로 이 루어 지기 때문에 공평 과 불공평 한 두 가지 모델 이 존재 합 니 다.공평 한 장면 에서 모든 막 힌 스 레 드 는 차단 순서에 따라 집행 된다.불공평 한 장면 에서 대열 의 스 레 드 와 마침 대열 에 들 어 갈 준비 가 되 어 있 는 스 레 드 경쟁 은 누가 빼 앗 으 면 누구의 것 이다.기본적으로 불공평 한 자 물 쇠 를 사용 합 니 다.효율 이 높 기 때 문 입 니 다.
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
코드 를 통 해 알 수 있 듯 이 Array BlockingQueue 는 하나의 ReentrantLock 자물쇠 와 두 개의 Condition 대기 열 을 통 해 이 루어 집 니 다.그 속성 은 다음 과 같 습 니 다.
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
//
final Object[] items;
//
int takeIndex;
//
int putIndex;
//
int count;
//
final ReentrantLock lock;
// , take()
private final Condition notEmpty;
// , put()
private final Condition notFull;
//
transient Itrs itrs = null;
}
코드 를 통 해 알 수 있 듯 이 Array BlockingQueue 는 같은 자 물 쇠 를 사용 하고 요 소 를 제거 하 며 요 소 를 추가 하 는 것 은 배열 아래 에 표 시 된 방식 으로 기록 하고 표 로 나 누 어 대기 열 머리 와 대기 열 끝 을 표시 합 니 다.두 대기 열 을 통 해 take()와 put()방법 을 각각 차단 합 니 다.다음은 원본 코드 를 직접 보 겠 습 니 다.
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
//
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
//
items[putIndex] = x;
//
if (++putIndex == items.length)
putIndex = 0;
count++;
// take
notEmpty.signal();
}
코드 에서 알 수 있 듯 이 add()방법 은 offer()방법 을 바탕 으로 이 루어 집 니 다.offer()방법 은 추가 에 실 패 했 습 니 다.false 로 돌아 간 후 add()방법 은 이상 을 던 집 니 다.offer()방법 은 자 물 쇠 를 추가 하여 라인 의 안전 을 확보 합 니 다.대기 열 이 채 워 지지 않 았 을 때 입 대 를 실행 합 니 다.입 대 는 조작 배열 을 통 해 이 루어 지고 순환 을 통 해 배열 공간 을 재 활용 합 니 다.요소 추가 성공 후 대기 열 이 비어 있 지 않 습 니 다.signal()방법 으로 요 소 를 제거 하 는 차단 스 레 드 를 깨 웁 니 다.마지막 으로 put()방법 을 봅 니 다.
public void put(E e) throws InterruptedException {
//
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
코드 를 통 해 알 수 있 듯 이 대기 열 이 가득 찼 을 때 현재 스 레 드 는 대기 열 에 걸 려 있 고 대기 열 이 불만 이 있 을 때 깨 워 서 추가 작업 을 수행 합 니 다.다음은 삭제 작업 을 살 펴 보 겠 습 니 다.
public boolean remove(Object o) {
// NULL
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
// , ( )
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//
if (removeIndex == takeIndex) {
items[takeIndex] = null;
//
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
// , ,
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
//
items[i] = items[next];
i = next;
} else {
// put null
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
//
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
reove()는 poll(),take()와 달리 지정 한 요 소 를 삭제 할 수 있 습 니 다.삭 제 된 요 소 는 색인 이 가리 키 는 것 을 제거 하 는 것 이 아니 라 코드 를 통 해 알 수 있 듯 이 삭 제 될 요소 가 색인 이 가리 키 는 요 소 를 제거 하지 않 을 때 삭 제 된 요소 아래 표 시 를 시작 으로 요소 아래 표 시 된 모든 요 소 를 왼쪽으로 이동 합 니 다.
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// put()
notFull.signal();
return x;
}
remove()방법 에 비해 poll()방법 은 매우 간단 합 니 다.여 기 는 군말 을 하지 않 습 니 다.다음은 take()를 보 겠 습 니 다.
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
take()방법 과 put()방법 은 기본적으로 일치 하고 상대 적 으로 간단 하 다 고 할 수 있 습 니 다.마지막 으로 두 가지 조회 방법 을 살 펴 보 겠 습 니 다.
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// ,
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
element()는 peek()방법 을 바탕 으로 이 루어 집 니 다.대기 열 이 비어 있 을 때 peek()방법 은 null,element()에 이상 을 던 집 니 다.Array Blocking Queue 에 대해 서 는 여기까지 소개 해 드 리 겠 습 니 다.링크 드 BlockingQueue 는 링크 를 기반 으로 이 루어 집 니 다.속성 은 다음 과 같 습 니 다.
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
// ,
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//
private final int capacity;
//
private final AtomicInteger count = new AtomicInteger();
//
transient Node<E> head;
//
private transient Node<E> last;
//
private final ReentrantLock takeLock = new ReentrantLock();
//
private final Condition notEmpty = takeLock.newCondition();
//
private final ReentrantLock putLock = new ReentrantLock();
//
private final Condition notFull = putLock.newCondition();
}
코드 를 통 해 알 수 있 듯 이 요 소 는 Node 노드 로 밀봉 되 어 단 방향 링크 에 저 장 됩 니 다.그 중에서 링크 의 기본 길 이 는 Integer.MAX 입 니 다.VALUE,따라서 사용 할 때 메모리 넘 침 에 주의해 야 합 니 다.요소 추가 속도 가 요소 삭제 속도 보다 클 때 대기 열 은 사용 하지 않 고 회수 할 수 없 는 대상 을 대량으로 기록 하여 메모리 가 넘 칩 니 다.Array BlockingQueue 와 LinkedBlockingQueue 의 주요 차이 점 은 ReentrantLock 잠 금 의 수량 과 대기 열 에 있 습 니 다.LinkedBlockingQueue 는 두 개의 잠 금 과 두 개의 대기 열 을 사용 합 니 다.즉,추가 와 삭제 작업 이 동시에 실 행 될 수 있 고 전체적인 효율 이 높 습 니 다.다음은 코드 를 직접 보 겠 습 니 다.
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
//
if (e == null) throw new NullPointerException();
//
final AtomicInteger count = this.count;
// false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
//
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// , ,
if (count.get() < capacity) {
//
enqueue(node);
// ,
c = count.getAndIncrement();
if (c + 1 < capacity)
//
notFull.signal();
}
} finally {
putLock.unlock();
}
// ,
if (c == 0)
// take
signalNotEmpty();
return c >= 0;
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
private void signalNotEmpty() {
// take take
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
여기에 우리 가 주의해 야 할 몇 가지 가 있다.1.LinkedBlockingQueue count 속성 은 병발 류 패 키 징 을 통 해 이 루어 져 야 합 니 다.두 개의 스 레 드 를 추가,삭제 하고 동시에 실행 할 수 있 기 때문에 동기 화 를 고려 해 야 합 니 다.
2.여기 서 두 번 판단 해 야 하 는 주요 원인 은 방법 이 시 작 될 때 자 물 쇠 를 채 우지 않 았 기 때 문 입 니 다.수치 가 바 뀔 수 있 기 때문에 자 물 쇠 를 얻 은 후에 두 번 판단 해 야 합 니 다.
3.Array BlockingQueue 와 달리 링크 드 BlockingQueue 는 대기 열 에 불만 이 있 을 때 추가 스 레 드 를 깨 웁 니 다.이렇게 하 는 이 유 는 링크 드 BlockingQueue 에 추가 와 삭제 작업 에 서로 다른 자 물 쇠 를 사용 하기 때 문 입 니 다.각자 자신 만 잘 관리 하고 스루풋 도 높 일 수 있 기 때 문 입 니 다.한편,Array BlockingQueue 는 유일한 자 물 쇠 를 사용 합 니 다.이렇게 하면 스 레 드 를 제거 하 는 것 이 영원히 깨 어 나 지 않 거나 스 레 드 를 추가 하 는 것 이 영원히 깨 어 나 지 않 고 스루풋 이 낮 습 니 다.
4.요 소 를 추가 하기 전에 대기 열 길 이 는 0 이 어야 스 레 드 를 제거 할 수 있 습 니 다.대기 열 길이 가 0 일 때 스 레 드 를 제거 하 는 것 이 분명 걸 려 있 을 것 입 니 다.이때 스 레 드 를 제거 하 는 것 을 깨 우 면 됩 니 다.스 레 드 를 제거 하 는 것 은 스 레 드 를 추가 하 는 것 과 유사 하기 때문에 스스로 자신 을 깨 웁 니 다.c>0 시 에는 두 가지 상황 만 있 습 니 다.제거 스 레 드 가 실행 되 고 있 습 니 다.재 귀적 으로 깨 어 날 경우 우리 가 참여 하지 않 아 도 되 고 제거 스 레 드 가 실행 되 지 않 습 니 다.이때 도 우리 가 참여 하지 않 아 도 됩 니 다.take(),poll()방법 을 호출 하면 됩 니 다.
5.put(),take()방법 으로 만 막 힌 스 레 드 를 깨 우 고 offer()방법 은 직접 되 돌아 갑 니 다(최대 대기 시간 포함 하지 않 음).장면 깨 우기 에 참여 하지 않 습 니 다.
다음은 put()차단 방법의 실현 을 살 펴 보 겠 습 니 다.
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//
while (count.get() == capacity) {
notFull.await();
}
//
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
코드 를 통 해 알 수 있 듯 이 put()방법 과 offer()방법 은 자신 이 condition 차단 을 통 해 대기 열 에 걸 리 고 나머지 는 대체적으로 같다.이로써 추가 작업 에 대한 소개 가 끝 났 습 니 다.다음은 제거 방법 을 보 겠 습 니 다.
public boolean remove(Object o) {
if (o == null) return false;
//
fullyLock();
try {
//
for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
// p ,trail
// gc
p.item = null;
//
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
코드 를 통 해 알 수 있 듯 이 reove()방법 은 작업 전 용량 이 부족 할 때 만 스 레 드 를 깨 우 고 스 레 드 를 제거 하 는 것 을 깨 우지 않 습 니 다.또한 요소 의 위 치 를 삭제 할 지 확실 하지 않 기 때문에 데이터 안전 을 위해 두 개의 자 물 쇠 를 추가 해 야 합 니 다.
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
//
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// ,
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;
//
Node<E> first = h.next;
// ( gc)
h.next = h;
//
head = first;
//
E x = first.item;
//
first.item = null;
return x;
}
주의해 야 할 점 은 매번 팀 을 나 갈 때마다 head 노드 를 바 꾸 고 head 노드 자체 가 데 이 터 를 저장 하지 않 습 니 다.head.next 는 다음 에 팀 을 나 가 야 할 요 소 를 기록 합 니 다.매번 팀 을 나 갈 때마다 head.next 는 새로운 head 노드 로 돌아 가 null 로 병 치 됩 니 다.poll()방법 은 위 에서 언급 한 offer()방법 과 기본 적 인 거울 과 같 습 니 다.여기 서 저 는 더 이상 군말 하지 않 겠 습 니 다.
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
take()방법 은 poll()방법 과 유사 하 며,차단 논 리 를 추가 한 것 과 구별 된다.이로써 넘 치 는 요소 방법 에 대한 소개 가 끝 났 습 니 다.마지막 으로 우 리 는 조회 방법 소스 코드 를 보 겠 습 니 다.
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
코드 를 통 해 알 수 있 듯 이 기본 헤드 와 last 헤드 노드 는 모두 null 이 고 가입 할 때 next 부터 조작 합 니 다.즉,헤드 노드 는 데 이 터 를 저장 하지 않 습 니 다.마지막 으로 가장 오래 기다 리 는 offer()방법 을 살 펴 보 겠 습 니 다.
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
//
long nanos = unit.toNanos(timeout);
int c = -1;
//
final ReentrantLock putLock = this.putLock;
//
final AtomicInteger count = this.count;
//
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 0
if (nanos <= 0)
return false;
// ,
nanos = notFull.awaitNanos(nanos);
}
//
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
// AQS Node
Node node = addConditionWaiter();
//
int savedState = fullyRelease(node);
//
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
//
while (!isOnSyncQueue(node)) {
// , ,
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//
if (nanosTimeout >= spinForTimeoutThreshold)
//
LockSupport.parkNanos(this, nanosTimeout);
//
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//
nanosTimeout = deadline - System.nanoTime();
}
// ,
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// Condition
unlinkCancelledWaiters();
//
if (interruptMode != 0)
// , ,
reportInterruptAfterWait(interruptMode);
// , , 0, 0
return deadline - System.nanoTime();
}
코드 를 통 해 알 수 있 듯 이 최대 대기 시간 을 포함 하 는 offer(),poll()방법 은 순환 을 통 해 시간 초과 여 부 를 판단 하 는 방식 으로 대기 열 에 걸 고 최대 대기 시간 이 깨 어 나 지 않 거나 실행 되 지 않 은 채 되 돌아 갑 니 다.Array BlockingQueue 와 LinkedBlockingQueue 의 비교:
4.567917.크기 가 다 르 고 하 나 는 경계 가 있 으 며 하 나 는 경계 가 없다.Array BlockingQueue 는 초기 크기 를 지정 해 야 합 니 다.LinkedBlockingQueue 가 경계 가 없 을 때 메모리 가 넘 칠 수 있 습 니 다
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
JPA + QueryDSL 계층형 댓글, 대댓글 구현(2)이번엔 전편에 이어서 계층형 댓글, 대댓글을 다시 리팩토링해볼 예정이다. 이전 게시글에서는 계층형 댓글, 대댓글을 구현은 되었지만 N+1 문제가 있었다. 이번에는 그 N+1 문제를 해결해 볼 것이다. 위의 로직은 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.