DelayQueue 사용법
15191 단어 Java 동시 관리
간단한 소개
우리 실제 상황을 이야기합시다.우리는 개발 과정에서 다음과 같은 장면이 있다.
아날로그 캐시 실례 a) 빈 연결을 닫습니다.서버에 많은 클라이언트의 연결이 있어서 한동안 비어 있으면 닫아야 한다.b) 캐시.캐시에 있는 대상이 빈 시간을 초과하여 캐시에서 꺼내야 합니다.c) 작업 시간 초과 처리네트워크 프로토콜 슬라이딩 창에서 응답식 상호작용을 요청할 때 시간 초과로 응답하지 않은 요청을 처리합니다.
하나의 멍청한 방법은 백그라운드 라인을 사용하여 모든 대상을 두루 훑어보고 하나씩 검사하는 것이다.이런 둔한 방법은 간단하고 쓰기 쉽지만 대상의 수량이 너무 많으면 성능 문제가 존재할 수 있다. 검사 간격을 설정하기 어렵고 간격이 너무 많으면 정확도에 영향을 주고 많으면 효율 문제가 존재한다.시간 초과 순서대로 처리할 수도 없고.
이 장면은 DelayQueue를 사용하는 것이 가장 적합하다.
DelayQueue는 java입니다.util.concurrent에서 제공하는 재미있는 클래스입니다.교묘해, 아주 좋아!그러나 자바 doc와 자바 SE 5.0의 소스에는 Sample이 제공되지 않았다.내가 처음에Scheduled Thread Pool Executor 원본을 읽었을 때DelayQueue의 묘용을 발견했다.그 다음에 실제 작업에서session 시간 초과 관리, 네트워크 응답 통신 프로토콜의 요청 시간 초과 처리에 응용된다.
본고는DelayQueue에 대해 소개한 다음에 응용 장면을 열거할 것이다.Delayed 인터페이스의 구현과 Sample 코드를 제공합니다.
DelayQueue는 BlockingQueue로 특화된 매개 변수는 Delayed입니다.(BlockingQueue에 대해 잘 모르는 학생은 먼저 BlockingQueue에 대해 알고 본문을 보십시오)Delayed는Comparable 인터페이스를 확장했습니다. 비교 기준은 시간 지연값이고Delayed 인터페이스의 실현 클래스인 getDelay의 반환값은 고정값(final)이어야 합니다.DelayQueue 내부는PriorityQueue로 이루어집니다.
DelayQueue = BlockingQueue + PriorityQueue + Delayed
DelayQueue의 핵심 요소인 BlockingQueue,PriorityQueue,Delayed.이렇게 말하면 DelayQueue는 우선 대기열(PriorityQueue)을 사용하여 이루어진 BlockingQueue로 우선 대기열의 비교 기준치는 시간이다.
그들의 기본 정의는 다음과 같다
public interface Comparable<T> {
public int compareTo(T o);
}
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> implements BlockingQueue<E> {
private final PriorityQueue q = new PriorityQueue();
}
DelayQueue 내부의 구현은 우선 대기열을 사용합니다.DelayQueue의 offer 방법을 호출할 때 Delayed 대상을 우선 대기열 q에 추가합니다.다음과 같습니다.
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e);
if (first == null || e.compareTo(first) < 0)
available.signalAll();
return true;
} finally {
lock.unlock();
}
}
DelayQueue의 take 방법은 우선 대기열 q의first를 꺼내서 (peek), 시간 지연 밸브 값이 없으면 await 처리를 합니다.다음과 같습니다.
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
코드 예
다음은 Sample이며 캐시의 간단한 구현입니다.세 가지 유형의 Pair, DelayItem, Cache가 포함됩니다.다음과 같습니다.
public class Pair {
public K first;
public V second;
public Pair() {}
public Pair(K first, V second) {
this.first = first;
this.second = second;
}
}
Delayed 구현은 다음과 같습니다.
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class DelayItem<T> implements Delayed {
/** Base of nanosecond timings, to avoid wrapping */
private static final long NANO_ORIGIN = System.nanoTime();
/**
* Returns nanosecond time offset by origin
*/
final static long now() {
return System.nanoTime() - NANO_ORIGIN;
}
/**
* Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied
* entries.
*/
private static final AtomicLong sequencer = new AtomicLong(0);
/** Sequence number to break ties FIFO */
private final long sequenceNumber;
/** The time the task is enabled to execute in nanoTime units */
private final long time;
private final T item;
public DelayItem(T submit, long timeout) {
this.time = now() + timeout;
this.item = submit;
this.sequenceNumber = sequencer.getAndIncrement();
}
public T getItem() {
return this.item;
}
public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
return d;
}
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof DelayItem) {
DelayItem x = (DelayItem) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
}
다음은 Cache의 실현입니다.put과 get 방법을 포함하고 실행 가능한main 함수도 포함합니다.
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Cache {
private static final Logger LOG = Logger.getLogger(Cache.class.getName());
private ConcurrentMap cacheObjMap = new ConcurrentHashMap();
private DelayQueue>> q = new DelayQueue>>();
private Thread daemonThread;
public Cache() {
Runnable daemonTask = new Runnable() {
public void run() {
daemonCheck();
}
};
daemonThread = new Thread(daemonTask);
daemonThread.setDaemon(true);
daemonThread.setName("Cache Daemon");
daemonThread.start();
}
private void daemonCheck() {
if (LOG.isLoggable(Level.INFO))
LOG.info("cache service started.");
for (;;) {
try {
DelayItem> delayItem = q.take();
if (delayItem != null) {
//
Pair pair = delayItem.getItem();
cacheObjMap.remove(pair.first, pair.second); // compare and remove
}
} catch (InterruptedException e) {
if (LOG.isLoggable(Level.SEVERE))
LOG.log(Level.SEVERE, e.getMessage(), e);
break;
}
}
if (LOG.isLoggable(Level.INFO))
LOG.info("cache service stopped.");
}
//
public void put(K key, V value, long time, TimeUnit unit) {
V oldValue = cacheObjMap.put(key, value);
if (oldValue != null)
q.remove(key);
long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);
q.put(new DelayItem>(new Pair(key, value), nanoTime));
}
public V get(K key) {
return cacheObjMap.get(key);
}
//
public static void main(String[] args) throws Exception {
Cache cache = new Cache();
cache.put(1, "aaaa", 3, TimeUnit.SECONDS);
Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}
Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}
}
}
Sample,main , aaa, null。
Java 멀티스레드/동시 27, DelayQueue 지연 대기열 아날로그 구현Session