자바 지연 작업 프로 세 스 처리 방법

1.지연 대기 열 이용
시간 지연 대기 열,첫 번 째 는 그 가 대기 열 이기 때문에 열 에 대한 기능 을 가지 고 있 습 니 다.두 번 째 는 시간 지연 입 니 다.이것 이 바로 시간 지연 대 열 입 니 다.기능 은 이 시간 지연 대 열 에 작업 을 두 는 것 입 니 다.시간 지연 이 되 어야 만 이 시간 대 열 에서 작업 을 가 져 올 수 있 습 니 다.그렇지 않 으 면 가 져 올 수 없습니다.
응용 장면 이 비교적 많 습 니 다.예 를 들 어 1 분 지연 문자 발송,1 분 지연 재 실행 등 입 니 다.다음은 지연 대기 열 데모 를 본 다음 에 지연 대기 열 이 프로젝트 에서 사용 하 는 것 을 보 겠 습 니 다.
간단 한 지연 대기 열 은 세 부분 이 있어 야 합 니 다.첫 번 째 는 Delayed 인터페이스의 메시지 체,두 번 째 소비 메 시 지 를 실현 한 소비자,세 번 째 메 시 지 를 저장 하 는 지연 대기 열 입 니 다.다음은 지연 대기 열 demo 를 살 펴 보 겠 습 니 다.
메시지

package com.delqueue; 
 
import java.util.concurrent.Delayed; 
import java.util.concurrent.TimeUnit; 
 
/** 
 *         Delayed           compareTo   getDelay      getDelay  ,            …… */ 
public class Message implements Delayed { 
  private int id; 
  private String body; //      
  private long excuteTime;//     ,                     。 
 
  public int getId() { 
    return id; 
  } 
 
  public String getBody() { 
    return body; 
  } 
 
  public long getExcuteTime() { 
    return excuteTime; 
  } 
 
  public Message(int id, String body, long delayTime) { 
    this.id = id; 
    this.body = body; 
    this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime(); 
  } 
 
  //             1 0 -1     
  @Override 
  public int compareTo(Delayed delayed) { 
    Message msg = (Message) delayed; 
    return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1 
        : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0); 
  } 
 
  //                                       
  @Override 
  public long getDelay(TimeUnit unit) { 
    return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS); 
  } 
}
2.소식 소비자

package com.delqueue; 
 
import java.util.concurrent.DelayQueue; 
 
public class Consumer implements Runnable { 
  //      ,               
  private DelayQueue<Message> queue; 
 
  public Consumer(DelayQueue<Message> queue) { 
    this.queue = queue; 
  } 
 
  @Override 
  public void run() { 
    while (true) { 
      try { 
        Message take = queue.take(); 
        System.out.println("    id:" + take.getId() + "    :" + take.getBody()); 
      } catch (InterruptedException e) { 
        e.printStackTrace(); 
      } 
    } 
  } 
}
3.지연 대기 열

package com.delqueue; 
 
import java.util.concurrent.DelayQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
 
public class DelayQueueTest { 
   public static void main(String[] args) {  
      //         
      DelayQueue<Message> queue = new DelayQueue<Message>();  
      //       ,m1   3s  
      Message m1 = new Message(1, "world", 3000);  
      //       ,m2   10s  
      Message m2 = new Message(2, "hello", 10000);  
      //             
      queue.offer(m2);  
      queue.offer(m1);  
      //                     ,             
      ExecutorService exec = Executors.newFixedThreadPool(1); 
      exec.execute(new Consumer(queue)); 
      exec.shutdown(); 
    }  
}
지연 대기 열 에 메시지 체 를 넣 고 소비자 스 레 드 를 시작 하여 지연 대기 열 에 있 는 메 시 지 를 소비 합 니 다.지연 대기 열 에 있 는 메시지 가 지연 시간 이 되면 메 시 지 를 꺼 낼 수 있 습 니 다.그렇지 않 으 면 메 시 지 를 꺼 낼 수 없고 소비 할 수 없습니다.
이것 이 바로 대기 열 데모 지연 입 니 다.실제 환경 에서 의 사용 을 말씀 드 리 겠 습 니 다.
필드 설명 사용:
택시 소프트웨어 에서 주문 서 를 파견 하 는 절 차 를 진행 합 니 다.주문 이 있 을 때 이 주문 서 를 기 사 를 선별 한 다음 에 주문 서 를 운전 자 에 게 연결 시 킵 니 다.그러나 가끔 은 운 이 좋 지 않 습 니 다.주문 이 들 어 온 후에 처음으로 적당 한 기 사 를 선별 하지 못 했 습 니 다.그러나 우 리 는 이 주문 서 를 끝 낼 수 없습니다.이 주문 의 정 보 를 지연 대기 열 에 넣 고 2 초 후에 한 번 더 진행 합 니 다.사실 이 2 초 는 지연 이기 때문에 여기 서 우 리 는 지연 대기 열 을 사용 하여 실현 할 수 있다.
다음은 간단 한 흐름 도 를 살 펴 보 자.

다음은 구체 적 인 코드 실현 을 살 펴 보 겠 습 니 다.
프로젝트 에는 다음 과 같은 몇 가지 유형 이 있다.첫째,임무 류 둘째,임무 류 에 따라 조립 한 메시지 체 류 셋째,지연 대기 열 관리 류
작업 클래스 는 운전 자,묶 기,push 메 시 지 를 선별 하 는 작업 클래스 입 니 다.

package com.test.delayqueue; 
/** 
 *              
 * @author whd 
 * @date 2017 9 25    12:49:32 
 */ 
public class DelayOrderWorker implements Runnable { 
 
  @Override 
  public void run() { 
    // TODO Auto-generated method stub 
    //         
    System.out.println(Thread.currentThread().getName()+" do something ……"); 
  } 
}
메시지 체 류 는 지연 대기 열 에서 Delayed 인 터 페 이 스 를 실현 하 는 메시지 류 가 적지 않 습 니 다.인 터 페 이 스 를 실현 할 때 getDelay(TimeUnit unit)방법 이 있 습 니 다.이 방법 은 만 료 여 부 를 판단 하 는 것 입 니 다.
여기 서 정의 하 는 것 은 일반적인 클래스 이기 때문에 우리 위의 작업 클래스 를 task 로 사용 할 수 있 습 니 다.그러면 작업 클래스 를 메시지 체 로 나 눌 수 있 습 니 다.

package com.test.delayqueue; 
 
import java.util.concurrent.Delayed; 
import java.util.concurrent.TimeUnit; 
 
/** 
 *                    
 * 
 * @author whd 
 * @date 2017 9 25    12:48:30 
 * @param <T> 
 */ 
public class DelayOrderTask<T extends Runnable> implements Delayed { 
  private final long time; 
  private final T task; //    ,            
 
  /** 
   * @param timeout 
   *          ( ) 
   * @param task 
   *         
   */ 
  public DelayOrderTask(long timeout, T task) { 
    this.time = System.nanoTime() + timeout; 
    this.task = task; 
  } 
 
  @Override 
  public int compareTo(Delayed o) { 
    // TODO Auto-generated method stub 
    DelayOrderTask other = (DelayOrderTask) o; 
    long diff = time - other.time; 
    if (diff > 0) { 
      return 1; 
    } else if (diff < 0) { 
      return -1; 
    } else { 
      return 0; 
    } 
  } 
 
  @Override 
  public long getDelay(TimeUnit unit) { 
    // TODO Auto-generated method stub 
    return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); 
  } 
 
  @Override 
  public int hashCode() { 
    return task.hashCode(); 
  } 
 
  public T getTask() { 
    return task; 
  } 
}
지연 대기 열 관리 클래스 입 니 다.이 클래스 는 주로 작업 클래스 를 메시지 로 밀봉 하고 지연 대기 열 에 추가 하 며,폴 링 지연 대기 열 에서 그 메시지 체 를 꺼 내 서 작업 클래스 를 스 레 드 풀 에 가 져 와 작업 을 수행 하 는 것 입 니 다.

package com.test.delayqueue; 
 
import java.util.Map; 
import java.util.concurrent.DelayQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicLong; 
 
/** 
 *        ,      、     
 * 
 * @author whd 
 * @date 2017 9 25    12:44:59 
 */ 
public class DelayOrderQueueManager { 
  private final static int DEFAULT_THREAD_NUM = 5; 
  private static int thread_num = DEFAULT_THREAD_NUM; 
  //         
  private ExecutorService executor; 
  //      
  private Thread daemonThread; 
  //      
  private DelayQueue<DelayOrderTask<?>> delayQueue; 
  private static final AtomicLong atomic = new AtomicLong(0); 
  private final long n = 1; 
  private static DelayOrderQueueManager instance = new DelayOrderQueueManager(); 
 
  private DelayOrderQueueManager() { 
    executor = Executors.newFixedThreadPool(thread_num); 
    delayQueue = new DelayQueue<>(); 
    init(); 
  } 
 
  public static DelayOrderQueueManager getInstance() { 
    return instance; 
  } 
 
  /** 
   *     
   */ 
  public void init() { 
    daemonThread = new Thread(() -> { 
      execute(); 
    }); 
    daemonThread.setName("DelayQueueMonitor"); 
    daemonThread.start(); 
  } 
 
  private void execute() { 
    while (true) { 
      Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); 
      System.out.println("        :" + map.size()); 
      int taskNum = delayQueue.size(); 
      System.out.println("        :" + taskNum); 
      try { 
        //            
        DelayOrderTask<?> delayOrderTask = delayQueue.take(); 
        if (delayOrderTask != null) { 
          Runnable task = delayOrderTask.getTask(); 
          if (null == task) { 
            continue; 
          } 
          //         task 
          executor.execute(task); 
        } 
      } catch (Exception e) { 
        e.printStackTrace(); 
      } 
    } 
  } 
 
  /** 
   *      
   * 
   * @param task 
   * @param time 
   *           
   * @param unit 
   *           
   */ 
  public void put(Runnable task, long time, TimeUnit unit) { 
    //        
    long timeout = TimeUnit.NANOSECONDS.convert(time, unit); 
    //         Delayed       
    DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task); 
    //             
    delayQueue.put(delayOrder); 
  } 
 
  /** 
   *      
   * 
   * @param task 
   * @return 
   */ 
  public boolean removeTask(DelayOrderTask task) { 
 
    return delayQueue.remove(task); 
  } 
}
테스트 클래스

package com.delqueue; 
 
import java.util.concurrent.TimeUnit; 
 
import com.test.delayqueue.DelayOrderQueueManager; 
import com.test.delayqueue.DelayOrderWorker; 
 
public class Test { 
  public static void main(String[] args) { 
    DelayOrderWorker work1 = new DelayOrderWorker();//   1 
    DelayOrderWorker work2 = new DelayOrderWorker();//   2 
    DelayOrderWorker work3 = new DelayOrderWorker();//   3 
    //        ,                         
    DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance(); 
    manager.put(work1, 3000, TimeUnit.MILLISECONDS); 
    manager.put(work2, 6000, TimeUnit.MILLISECONDS); 
    manager.put(work3, 9000, TimeUnit.MILLISECONDS); 
  } 
 
}
OK 이것 이 바로 프로젝트 의 구체 적 인 사용 상황 입 니 다.물론 구체 적 인 내용 은 무시 되 었 습 니 다.전체 프레임 워 크 는 이 렇 습 니 다.그리고 여기 서 자바 의 지연 대기 열 을 사용 합 니 다.그러나 이런 방식 은 문제 가 있 습 니 다.다운 머 신 이 있 으 면 작업 을 잃 어 버 릴 수 있 기 때문에 mq,redis 를 사용 하여 실현 하 는 것 도 고려 할 수 있 습 니 다.
2,mq 구현 지연 메시지
rabbitmq 3.5.7 이상 버 전에 서 지연 대기 열 기능 을 실현 하 는 플러그 인(rabbitmq-delayed-message-exchange)을 제공 합 니 다.동시에 플러그 인 은 Erlang/OPT 18.0 이상 에 의존 합 니 다.
플러그 인 원본 주소:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
플러그 인 다운로드 주소:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
설치:
플러그 인 설치 디 렉 터 리 에 들 어가 기
{rabbitmq-server}/plugins/(현재 존재 하 는 플러그 인 을 볼 수 있 습 니 다)
플러그 인 다운로드

rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
(다운로드 한 파일 이름 이 불규칙 하면 다음 과 같이 수 동 으로 이름 을 바 꿉 니 다.
rabbitmq_delayed_message_exchange-0.0.1.ez)
플러그 인 사용

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
플러그 인 닫 기

rabbitmq-plugins disable rabbitmq_delayed_message_exchange
플러그 인 사용
x-delayed-message 형식의 exchange 를 통 해 delayed-messaging 기능 을 사용 합 니 다.
x-delayed-message 는 플러그 인 이 제공 하 는 형식 입 니 다.rabbitmq 자체 가 아 닙 니 다.메 시 지 를 보 낼 때 header 에'x-delay'인 자 를 추가 하여 메시지 의 지연 시간 을 제어 합 니 다.
maven 프로젝트 의 pom.xml 파일 에 직접 추가 합 니 다.
다음은 application.properties 파일 에 redis 설정 을 추가 합 니 다.
아주 간단 합 니 다.코드 는 다음 과 같 습 니 다.
메시지 전송 실현

x-delay
여기 서 제 가 설정 한 지연 시간 은 3 초 입 니 다.
소식 소비자
main 방법 에서 Spring Boot 프로그램 을 직접 실행 하면 Spring Boot 는 Message Receiver 류 를 자동 으로 해석 합 니 다.
다음은 메 시 지 를 보 내 는 인 터 페 이 스 를 Junit 로 실행 하면 됩 니 다.
실행 이 끝나 면 다음 과 같은 정 보 를 볼 수 있 습 니 다.

      :2018-05-03 12:44:53
3   ,Spring Boot      :
      :2018-05-03 12:44:56
      :hello i am delay msg
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기