자바 지연 작업 프로 세 스 처리 방법
시간 지연 대기 열,첫 번 째 는 그 가 대기 열 이기 때문에 열 에 대한 기능 을 가지 고 있 습 니 다.두 번 째 는 시간 지연 입 니 다.이것 이 바로 시간 지연 대 열 입 니 다.기능 은 이 시간 지연 대 열 에 작업 을 두 는 것 입 니 다.시간 지연 이 되 어야 만 이 시간 대 열 에서 작업 을 가 져 올 수 있 습 니 다.그렇지 않 으 면 가 져 올 수 없습니다.
응용 장면 이 비교적 많 습 니 다.예 를 들 어 1 분 지연 문자 발송,1 분 지연 재 실행 등 입 니 다.다음은 지연 대기 열 데모 를 본 다음 에 지연 대기 열 이 프로젝트 에서 사용 하 는 것 을 보 겠 습 니 다.
간단 한 지연 대기 열 은 세 부분 이 있어 야 합 니 다.첫 번 째 는 Delayed 인터페이스의 메시지 체,두 번 째 소비 메 시 지 를 실현 한 소비자,세 번 째 메 시 지 를 저장 하 는 지연 대기 열 입 니 다.다음은 지연 대기 열 demo 를 살 펴 보 겠 습 니 다.
메시지
package com.delqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* Delayed compareTo getDelay getDelay , …… */
public class Message implements Delayed {
private int id;
private String body; //
private long excuteTime;// , 。
public int getId() {
return id;
}
public String getBody() {
return body;
}
public long getExcuteTime() {
return excuteTime;
}
public Message(int id, String body, long delayTime) {
this.id = id;
this.body = body;
this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
}
// 1 0 -1
@Override
public int compareTo(Delayed delayed) {
Message msg = (Message) delayed;
return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
: (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
}
//
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
2.소식 소비자
package com.delqueue;
import java.util.concurrent.DelayQueue;
public class Consumer implements Runnable {
// ,
private DelayQueue<Message> queue;
public Consumer(DelayQueue<Message> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Message take = queue.take();
System.out.println(" id:" + take.getId() + " :" + take.getBody());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3.지연 대기 열
package com.delqueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DelayQueueTest {
public static void main(String[] args) {
//
DelayQueue<Message> queue = new DelayQueue<Message>();
// ,m1 3s
Message m1 = new Message(1, "world", 3000);
// ,m2 10s
Message m2 = new Message(2, "hello", 10000);
//
queue.offer(m2);
queue.offer(m1);
// ,
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(new Consumer(queue));
exec.shutdown();
}
}
지연 대기 열 에 메시지 체 를 넣 고 소비자 스 레 드 를 시작 하여 지연 대기 열 에 있 는 메 시 지 를 소비 합 니 다.지연 대기 열 에 있 는 메시지 가 지연 시간 이 되면 메 시 지 를 꺼 낼 수 있 습 니 다.그렇지 않 으 면 메 시 지 를 꺼 낼 수 없고 소비 할 수 없습니다.이것 이 바로 대기 열 데모 지연 입 니 다.실제 환경 에서 의 사용 을 말씀 드 리 겠 습 니 다.
필드 설명 사용:
택시 소프트웨어 에서 주문 서 를 파견 하 는 절 차 를 진행 합 니 다.주문 이 있 을 때 이 주문 서 를 기 사 를 선별 한 다음 에 주문 서 를 운전 자 에 게 연결 시 킵 니 다.그러나 가끔 은 운 이 좋 지 않 습 니 다.주문 이 들 어 온 후에 처음으로 적당 한 기 사 를 선별 하지 못 했 습 니 다.그러나 우 리 는 이 주문 서 를 끝 낼 수 없습니다.이 주문 의 정 보 를 지연 대기 열 에 넣 고 2 초 후에 한 번 더 진행 합 니 다.사실 이 2 초 는 지연 이기 때문에 여기 서 우 리 는 지연 대기 열 을 사용 하여 실현 할 수 있다.
다음은 간단 한 흐름 도 를 살 펴 보 자.
다음은 구체 적 인 코드 실현 을 살 펴 보 겠 습 니 다.
프로젝트 에는 다음 과 같은 몇 가지 유형 이 있다.첫째,임무 류 둘째,임무 류 에 따라 조립 한 메시지 체 류 셋째,지연 대기 열 관리 류
작업 클래스 는 운전 자,묶 기,push 메 시 지 를 선별 하 는 작업 클래스 입 니 다.
package com.test.delayqueue;
/**
*
* @author whd
* @date 2017 9 25 12:49:32
*/
public class DelayOrderWorker implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
//
System.out.println(Thread.currentThread().getName()+" do something ……");
}
}
메시지 체 류 는 지연 대기 열 에서 Delayed 인 터 페 이 스 를 실현 하 는 메시지 류 가 적지 않 습 니 다.인 터 페 이 스 를 실현 할 때 getDelay(TimeUnit unit)방법 이 있 습 니 다.이 방법 은 만 료 여 부 를 판단 하 는 것 입 니 다.여기 서 정의 하 는 것 은 일반적인 클래스 이기 때문에 우리 위의 작업 클래스 를 task 로 사용 할 수 있 습 니 다.그러면 작업 클래스 를 메시지 체 로 나 눌 수 있 습 니 다.
package com.test.delayqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
*
*
* @author whd
* @date 2017 9 25 12:48:30
* @param <T>
*/
public class DelayOrderTask<T extends Runnable> implements Delayed {
private final long time;
private final T task; // ,
/**
* @param timeout
* ( )
* @param task
*
*/
public DelayOrderTask(long timeout, T task) {
this.time = System.nanoTime() + timeout;
this.task = task;
}
@Override
public int compareTo(Delayed o) {
// TODO Auto-generated method stub
DelayOrderTask other = (DelayOrderTask) o;
long diff = time - other.time;
if (diff > 0) {
return 1;
} else if (diff < 0) {
return -1;
} else {
return 0;
}
}
@Override
public long getDelay(TimeUnit unit) {
// TODO Auto-generated method stub
return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int hashCode() {
return task.hashCode();
}
public T getTask() {
return task;
}
}
지연 대기 열 관리 클래스 입 니 다.이 클래스 는 주로 작업 클래스 를 메시지 로 밀봉 하고 지연 대기 열 에 추가 하 며,폴 링 지연 대기 열 에서 그 메시지 체 를 꺼 내 서 작업 클래스 를 스 레 드 풀 에 가 져 와 작업 을 수행 하 는 것 입 니 다.
package com.test.delayqueue;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* , 、
*
* @author whd
* @date 2017 9 25 12:44:59
*/
public class DelayOrderQueueManager {
private final static int DEFAULT_THREAD_NUM = 5;
private static int thread_num = DEFAULT_THREAD_NUM;
//
private ExecutorService executor;
//
private Thread daemonThread;
//
private DelayQueue<DelayOrderTask<?>> delayQueue;
private static final AtomicLong atomic = new AtomicLong(0);
private final long n = 1;
private static DelayOrderQueueManager instance = new DelayOrderQueueManager();
private DelayOrderQueueManager() {
executor = Executors.newFixedThreadPool(thread_num);
delayQueue = new DelayQueue<>();
init();
}
public static DelayOrderQueueManager getInstance() {
return instance;
}
/**
*
*/
public void init() {
daemonThread = new Thread(() -> {
execute();
});
daemonThread.setName("DelayQueueMonitor");
daemonThread.start();
}
private void execute() {
while (true) {
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
System.out.println(" :" + map.size());
int taskNum = delayQueue.size();
System.out.println(" :" + taskNum);
try {
//
DelayOrderTask<?> delayOrderTask = delayQueue.take();
if (delayOrderTask != null) {
Runnable task = delayOrderTask.getTask();
if (null == task) {
continue;
}
// task
executor.execute(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
*
* @param task
* @param time
*
* @param unit
*
*/
public void put(Runnable task, long time, TimeUnit unit) {
//
long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
// Delayed
DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);
//
delayQueue.put(delayOrder);
}
/**
*
*
* @param task
* @return
*/
public boolean removeTask(DelayOrderTask task) {
return delayQueue.remove(task);
}
}
테스트 클래스
package com.delqueue;
import java.util.concurrent.TimeUnit;
import com.test.delayqueue.DelayOrderQueueManager;
import com.test.delayqueue.DelayOrderWorker;
public class Test {
public static void main(String[] args) {
DelayOrderWorker work1 = new DelayOrderWorker();// 1
DelayOrderWorker work2 = new DelayOrderWorker();// 2
DelayOrderWorker work3 = new DelayOrderWorker();// 3
// ,
DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();
manager.put(work1, 3000, TimeUnit.MILLISECONDS);
manager.put(work2, 6000, TimeUnit.MILLISECONDS);
manager.put(work3, 9000, TimeUnit.MILLISECONDS);
}
}
OK 이것 이 바로 프로젝트 의 구체 적 인 사용 상황 입 니 다.물론 구체 적 인 내용 은 무시 되 었 습 니 다.전체 프레임 워 크 는 이 렇 습 니 다.그리고 여기 서 자바 의 지연 대기 열 을 사용 합 니 다.그러나 이런 방식 은 문제 가 있 습 니 다.다운 머 신 이 있 으 면 작업 을 잃 어 버 릴 수 있 기 때문에 mq,redis 를 사용 하여 실현 하 는 것 도 고려 할 수 있 습 니 다.2,mq 구현 지연 메시지
rabbitmq 3.5.7 이상 버 전에 서 지연 대기 열 기능 을 실현 하 는 플러그 인(rabbitmq-delayed-message-exchange)을 제공 합 니 다.동시에 플러그 인 은 Erlang/OPT 18.0 이상 에 의존 합 니 다.
플러그 인 원본 주소:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
플러그 인 다운로드 주소:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
설치:
플러그 인 설치 디 렉 터 리 에 들 어가 기
{rabbitmq-server}/plugins/(현재 존재 하 는 플러그 인 을 볼 수 있 습 니 다)
플러그 인 다운로드
rabbitmq_delayed_message_exchange
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
(다운로드 한 파일 이름 이 불규칙 하면 다음 과 같이 수 동 으로 이름 을 바 꿉 니 다.rabbitmq_delayed_message_exchange-0.0.1.ez)
플러그 인 사용
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
플러그 인 닫 기
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
플러그 인 사용x-delayed-message 형식의 exchange 를 통 해 delayed-messaging 기능 을 사용 합 니 다.
x-delayed-message 는 플러그 인 이 제공 하 는 형식 입 니 다.rabbitmq 자체 가 아 닙 니 다.메 시 지 를 보 낼 때 header 에'x-delay'인 자 를 추가 하여 메시지 의 지연 시간 을 제어 합 니 다.
maven 프로젝트 의 pom.xml 파일 에 직접 추가 합 니 다.
다음은 application.properties 파일 에 redis 설정 을 추가 합 니 다.
아주 간단 합 니 다.코드 는 다음 과 같 습 니 다.
메시지 전송 실현
x-delay
여기 서 제 가 설정 한 지연 시간 은 3 초 입 니 다.소식 소비자
main 방법 에서 Spring Boot 프로그램 을 직접 실행 하면 Spring Boot 는 Message Receiver 류 를 자동 으로 해석 합 니 다.
다음은 메 시 지 를 보 내 는 인 터 페 이 스 를 Junit 로 실행 하면 됩 니 다.
실행 이 끝나 면 다음 과 같은 정 보 를 볼 수 있 습 니 다.
:2018-05-03 12:44:53
3 ,Spring Boot :
:2018-05-03 12:44:56
:hello i am delay msg
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Is Eclipse IDE dying?In 2014 the Eclipse IDE is the leading development environment for Java with a market share of approximately 65%. but ac...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.