[Java 병발] JAVA 병발 프로그래밍 실전. - 독서노트10.

Executor에서 일괄 처리 작업을 제출하고 싶으면, 결과를 얻으려면 타임아웃이 0인 get을 계속 호출해야 합니다.다행히도 더 좋은 방법이 있다. 서비스 완성 (completion 서비스)
CompletionServie는 Executor 및 BlockingQueue 기능을 통합합니다.Callable 작업을 실행하기 위해 제출한 다음, 대기열에 있는 take와poll 방법을 사용하면 결과가 완전하게 사용할 때 이 결과를 얻을 수 있습니다.ExecutorCompletionService는CompletionService 인터페이스를 실현하는 클래스로 계산 작업을 Executor에 의뢰합니다.
CompletionService를 사용하면 두 가지 측면에서 페이지 렌더링의 성능을 향상시킬 수 있습니다.우리는 그림을 다운로드할 때마다 독립된 작업을 만들고 온라인 채널에서 그들을 실행하며 순서의 다운로드 과정을 병행으로 전환할 수 있다.또한 Completion 서비스에서 결과를 얻을 수 있으며, 이미지 다운로드가 완료되면 바로 나타납니다.
4
public class Renderer{

  private final ExecutorService executor;

  Rnderer(ExecutorService executor){

    this.executor=executor;

  }

  void renderPage(CharSequence source){

    final List info=scanForImageInfo(source);

    CompletionService completion=

    new ExecutorCompletionService(executor);

    for(final ImageInfo imageInfo:info){

      completionService.submit(new Callable(){

        public ImageData call(){

          return imageInfo.downloadImage();

        }

      });

    }

    renderText(source);

    try{

      for(int t=0,n=info.size();t f=completionService.take();

        ImageData imageData=f.get();

        renderImage(imageData);

      }

    }catch(InterruptedException e){

      Thread.currentThread().interrupt();

    }catch(ExecutionException e){

      throw launderThrowable(e.getCause());

    }

  }

}
다중
ExecutorCompletionService
단일
Executor
따라서 현명한 방법은 하나를 만드는 것이다
ExecutorCompletionService
그는 특정한 계산 서비스에 대해 개인적인 것이고, 그 다음에 공공적인 것을 공유한다
Executor
.이런 식으로 하면
CompletionService
일괄 처리 계산의 핸들과
Future
맡은 단일 계산의 문맥은 어느 정도 같다.기록 아래 제출
CompletionService
작업 개수를 계산해서 완성된 결과를 얼마나 얻었는지 계산해 보세요. 공유된 것을 사용해도
Executor
, 임무를 일괄 처리하는 모든 결과가 언제 모두 얻었는지 알 수 있다.
만약 지정된 시간에 완성할 수 없는 활동이 있다면 그것은 효력을 상실한 것이다. 이때 이 활동을 포기해야 한다.Futrue.get의 시간 제한 버전은 이 조건에 부합됩니다. 시간이 초과되면 TimeoutException을 던집니다.
두 번째 문제는 임무가 시간을 초과할 때 그들을 멈출 수 있어야 한다는 것이다.
Page renderPageWithAd() throws InterruptedException{

  long endNanos=System.nanoTime()+TIME_BUDGET;

  Future f=exec.submit(new FetchAdTask());

  Page page=renderPageBody();

  Ad ad;

  try{

    long timeLeft=endNanos-System.nanoTime();

    ad=f.get(timeLeft,NANOSECONDS);

  }catch(ExecutionException e){

    ad=DEFAULT_AD;

  }catch(TimeoutException e){

    ad=DEFAULT_AD;

    f.cancel(true);

  }

  page.setAd(ad);

  return page;

}

cancel
중간 매개변수
true
작업 라인이 실행될 때 중단될 수 있음을 의미합니다.
예정 시간 내에 여행 견적을 청구하다
private class QuoteTask implements Callable{

  private final TravelCompany company;

  private final TravelInfo travelInfo;

  public TravelQuote call()throws Exception{

    return company.solicitQuote(travelInfo);

  }

}

public List getRankedTravelQuotes(

  TravelInfo travelInfo,Set companies,

  Comparator ranking,long time,TimeUnit unit){

  List tasks=new ArrayList();

  for(TravelCompany company:companies){

    tasks.add(new QuoteTask(company,travelInfo));

  }

  List> futures = exec.invokeAll(task,time,unit);

  List quotes = new ArrayList(tasks.size));

  Iterator taskIter=tasks.iterator();

  for(Future f:futures){

    QuoteTask task=taskIter.next();

    try{

      quotes.add(f.get());

    }catch(ExecutionException e){

      quotes.add(task.getFailureQuote(e.getCause()));

    }catch(CancellationException e){

      quotes.add(task.getTimeoutQuote(e));

    }

  }

  Collections.sore(quotes,ranking);

  return quotes;

}

작업과 스레드의 정지에 대해 자바는 스레드를 안전하게 멈추게 하는 메커니즘을 제공하지 않았다. 자바는 스레드가 현재 작업을 중단하도록 하는 협동 메커니즘을 제공했다.
public class PrimeGenerator implments Runnable{

  private final List primes=new ArrayList();

  private volatile boolean cancelled;

  public void run(){

    BigInteger p=BigInteger.ONE;

    while(!cancelled){

      p=p.nextProbablePrime();

      synchronized(this){

        primes.add(p);

      }

    }

  }

  public void cancel(){

    cancelled=true;

  }

  public synchronized List get(){

    return new ArrayList(primes);

  }

}

List aSecondOfPrimes()throws InterruptedException{

  PrimeGenerator generator=new PrimeGenerator();

  new Thread(generator).start();

  try{

    TimeUnit.SECONDS.sleep(1);

  }finally{

    generator.cancel();

  }

  return generator.get();

}

신뢰할 수 없는 취소는 생산자를 막힌 조작에 처하게 한다
class BrokenPrimeProducer extends Thread{

  private final BlockingQueue queue;

  private volatile boolean cancelled=false;

  BrokenPrimeProducer(BlockingQueue queue){

    this.queue=queue;

  }

  public void run(){

    try{

      BigInteger p=BigInteger.ONE;

      while(!cancelled){

        queue.put(p=p.nextProbablePrime());

      }catch(InterruptedException consumed){

      }

    }

  }

  public void cancel(){

    cancelled=true;

  }

}

void consumePrimes()throws InterruptedException{

  BlockingQueue primes=...

  BrokenPrimeProducer producer=new BrokenPrimeProducer(primes);

  producer.start();

  try{

    while(needMorePrimes()){

      consume(primes.take());

    }

  }finally{

    producer.cancel();

  }

}

위의 예는 잘못된 시범이다. 만약에 생산자의 속도가 소비자를 초과하면 대열이 채워진다.
put
방법이 막히면
put
방법이 막힌 후 소비자가 생산자의 임무를 취소하려고 시도하면
cancel
방법 설정
cancelled
표지, 그러나 이때의 생산자는 영원히 이 표지를 검사하지 않을 것이다. 왜냐하면 그는 이미
put
방법이 막혔다.
우리는 인터럽트 정책을 제정해야 한다. 인터럽트 정책은 인터럽트 요청에 어떻게 대응할 것인지를 결정한다. 요청을 발견할 때 무엇을 할 것인지, 터미널에 어떤 작업 단원이 원자 조작이고, 얼마나 빠른 시간에 인터럽트에 응답할 것인지를 결정한다.
Thread에서 정적인interrupted는 병렬 라인의 단말기 상태를 제거하기 때문에 조심스럽게 사용해야 한다.
public Task getNextTask(BlockingQueue queue){

  boolean interrupted=false;

  try{

    while(true){

      try{

        return queue.take();

      }catch(InterruptedException e){

        interrupted=true;

      }

    }

  }finally{

    if(interrupted){

      Thread.currentThread().interrupt();

    }

  }

}

위의 예는 취소할 수 없는 작업이 종료되기 전에 중단된 것을 보여 줍니다.
private static final ScheduledExecutorService cancelExec=...

  public static void timedRun(Runnable r,long timeout,TimeUnit unit){

    final Thread taskThread=Thread.currentThread();

    cancelExec.schedule(new Runnable(){

    public void run(){

      taskThread.interrupt();

    }

  });

  r.run();

}

위의 예는 잘못된 시범이다. 왜냐하면
timedRun
임의의 라인에 호출될 수 있지만, 우리는 라인을 호출하는 중단 정책을 이해할 수 없습니다. 만약 작업이 시한 전에 완성된다면, 라인을 중단하면 알 수 없는 문제가 발생할 수 있습니다.또한 퀘스트가 터미널에 맞지 않으면,
timedRun
임무가 끝날 때까지 돌아오지 않을 것이다. 이것은 기대한 제한 시간을 초과한 지 오래될 것이다.
public static void timeRun(final Runnable r,long timeout,TimeUnit unit){

  class RethrowableTask implements Runnable{

    private volatile Throwable t;

    public void run(){

      try{

        r.run();

      }catch(Throwable t){

        this.t=t;

      }

    }

  }

}

RethrowableTask task=new RethrowableTask();

final Thread taskThread=new Thread(task);

taskThread.start();

cancelExec.schedule(new Runnable(){

  public void run(){

    taskThread.interrupt();

  }

},timeout,unit);

taskThread.join(unit.toMillis(timeout));

task.rethrow();

위의 예는 새로 만든 라인에서 시한부 실행
join
방법
join
상관없이
join
성공적으로 완료했는지,
Java
메모리 가시성은 메모리 모델에 상응하지만
join
그 자체가 그의 성공 여부를 나타내는 어떤 상태도 되돌아오지 않을 것이다.

좋은 웹페이지 즐겨찾기