자바 비동기 계산 장면 응용

최근 프로젝트 에서 업무 장면 을 만 났 습 니 다.
당기 데이터베이스 에 있 는 시 계 를 다른 데이터베이스 로 옮 기 고 이전 효율 을 만족 시 키 기 위해 동시 데이터 이전 을 해 야 합 니 다.표 마다 서로 다른 스 레 드 를 시작 하여 데 이 터 를 동시에 이전 할 수 있 습 니 다.이전 이 완료 되면 이전 작업 에 대한 상태 필드 를 동기 화 합 니 다.
가장 먼저 생각 나 는 것 은 자바 의 병발 도구 류:동기 화 장벽 Cyclic Barrier 를 사용 하 는 것 입 니 다.
Cyclic Barrier 는 순환 적 으로 사용 할 수 있 는 장벽(Barrier)을 의미한다.그것 이 해 야 할 일 은 하나의 스 레 드 가 하나의 장벽(동기 점 이 라 고도 할 수 있 음)에 도 착 했 을 때 막 히 고 마지막 스 레 드 가 장벽 에 도 착 했 을 때 만 장벽 이 문 을 열 고 모든 장벽 에 의 해 차단 되 는 스 레 드 가 계속 운행 되 는 것 이다.
Cyclic Barrier 는 다 중 스 레 드 로 데 이 터 를 계산 하고 결 과 를 계산 하 는 장면 을 합 칠 수 있 습 니 다.
1.CyclicBarrier 를 통 해 이전 작업 코드 를 실현 합 니 다.

package com.future.test;

import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VerticaTransfer extends DataTransfer<DataInfo>{
    int threadCount = 10;
    //    
    ExecutorService executor = null;
    CyclicBarrier barrier;
    //     
    protected void doBefore(DataInfo entity){
        //   
        executor = Executors.newFixedThreadPool(threadCount);
        //CyclicBarrier           ,         
        barrier = new CyclicBarrier(threadCount,new DoAfter(this,entity));  
    }
    protected void doJob(DataInfo entity){
        //    
        List<Product> ps = entity.getProducts();
        for (Product product : ps) {
            executor.execute(new VerticaTransferTask(barrier,product));
        }

    }
    @Override
    protected void doAfter(DataInfo entity) {
    }

}
/**
 *       
 * @author Administrator
 *
 */
class DoAfter implements Runnable {   
    private VerticaTransfer verticaTransfer;   
    private DataInfo entity;
    DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) {   
        this.verticaTransfer = verticaTransfer;   
        this.entity = entity;
    }   
    public void run() {
        System.out.println("    。   :" + entity.getProducts().size());
    }   
}
 
업무 처리 코드:

package com.future.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 *         
 * @author 
 *
 */
public class VerticaTransferTask implements Runnable{

    private CyclicBarrier barrier; 
    private Product product;
    VerticaTransferTask(Product product){
        this.product = product;
    }
    VerticaTransferTask(CyclicBarrier barrier,Product product){
        this.barrier = barrier;
        this.product = product;
    }


    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            System.out.println("     :" + product.getId());
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        } finally {
            try {
                barrier.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

테스트 입구:

package com.future.test;

import java.util.ArrayList;
import java.util.List;

public class VerticaTransferTest{
    public static void main(String[] args) {
            VerticaTransfer transfer = new VerticaTransfer(); //
            DataInfo data = new DataInfo();
            List<Product> ps = new ArrayList<Product>();
            int tmp = 0;
            for(int i = 0; i < 10;i++){
                Product p = new Product();
                p.setId(i + "");
                p.setPurchase_price(10);
                p.setSalse_price(10 + i);
                ps.add(p);
                tmp += i;
            }

            data.setProducts(ps);
            transfer.execute(data);
    }

}

상술 한 실현 절 차 를 통 해 업무 장면 을 충분히 실현 할 수 있다.
강화 업무 장면:상기 장면 을 바탕 으로 매번 이전 한 결 과 를 최종 적 으로 정리한다.이전 에 성공 한 만큼 이전 에 실패 하 다.모든 스 레 드 처리 결 과 를 모 으 는 것 이다.
이것 은 스 레 드 간 통신 문제 와 관련된다.기 존 처 리 를 바탕 으로 공공 List 변 수 를 추가 하고 VerticaTransferTask run()이전 방법 에서 이전 결 과 를 synchronized 를 List 에 놓 습 니 다.
중.
하지만 더 좋 은 실현 방식 은 없 을 까?
Future 인터페이스
설명:자바 1.5 부터 Callable 과 Future 를 제공 합 니 다.이 를 통 해 작업 수행 이 끝 난 후에 작업 수행 결 과 를 얻 을 수 있 습 니 다.
이것 은 우리 가 Future 를 통 해 모든 라인 의 실행 결 과 를 얻 을 수 있 음 을 나타 낸다.저 는 다음 과 같이 제품 이윤 을 병행 계산 하 는 방식 으로 수 요 를 간단하게 실현 하 겠 습 니 다.
2.Future 를 통 해 병행 처리 작업 코드 구현:

package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class VerticaTransfer extends DataTransfer<DataInfo>{
    int threadCount = 10;
    //    
    ExecutorService executor = null;
    //     
    List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>();
    protected void doBefore(DataInfo entity){
        //   
        executor = Executors.newFixedThreadPool(threadCount);

    }
    protected void doJob(DataInfo entity){
        //    
        List<Product> ps = entity.getProducts();
        for (Product product : ps) {
            Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product));
            results.add(res);
        }

    }
    @Override
    protected void doAfter(DataInfo entity) {
        double total = 0;
        List<Future<ResultInfo>> rs = this.results;
        for (Future<ResultInfo> future : rs) {
            try {
                ResultInfo info = future.get();
                total += info.getPrice();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("     :" + total);
    }

}

업무 처리 코드:

package com.test;

import java.util.concurrent.Callable;

/**
 *         
 * @author
 *
 */
public class VerticaTransferTask implements Callable<ResultInfo>{

    private Product product;
    VerticaTransferTask(Product product){
        this.product = product;
    }


    @Override
    public ResultInfo call() throws Exception {
        // TODO Auto-generated method stub
        ResultInfo res = null;
        try {
            double money = product.getSalse_price() - product.getPurchase_price();
            res = new ResultInfo();
            res.setPrice(money);
            res.setProductId(product.getId());
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        } 
        return res;
    }
}

간단 합 니 다.
그럼 둘 다 같이 사용 할 수 있 을까요? 저 는 Cyclic Barrier 처리 결과 DoAfter 류 에서 Future 결 과 를 얻어 통 계 를 얻 었 습 니 다.
이렇게 하면 수 요 를 만족 시 킬 수 있 지 않 겠 습 니까?구상 처 리 는 다음 과 같다.

public class VerticaTransfer extends DataTransfer<DataInfo>{
    int threadCount = 10;
    //    
    ExecutorService executor = null;
    CyclicBarrier barrier;
    //     
    List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>();
    protected void doBefore(DataInfo entity){
        //   
        executor = Executors.newFixedThreadPool(threadCount);
        //CyclicBarrier           ,         
        barrier = new CyclicBarrier(threadCount,new DoAfter(this,entity));  
    }
    protected void doJob(DataInfo entity){
        //    
        List<Product> ps = entity.getProducts();
        for (Product product : ps) {
            Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product));
            results.add(res);
        }

    }
    @Override
    protected void doAfter(DataInfo entity) {

    }

}
/**
 *       
 * @author Administrator
 *
 */
class DoAfter implements Runnable {   
    private VerticaTransfer verticaTransfer;   
    private DataInfo entity;
    DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) {   
        this.verticaTransfer = verticaTransfer;   
        this.entity = entity;
    }   
    public void run() {
        double total = 0;
        List<Future<ResultInfo>> rs = verticaTransfer.results;
        for (Future<ResultInfo> future : rs) {
            try {
                ResultInfo info = future.get();
                total += info.getPrice();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("     :" + total);
    }   
} 
 
업무 처리 VerticaTransferTask:

public class VerticaTransferTask implements Callable<ResultInfo>{

    private CyclicBarrier barrier; 
    private Product product;
    VerticaTransferTask(Product product){
        this.product = product;
    }
    VerticaTransferTask(CyclicBarrier barrier,Product product){
        this.barrier = barrier;
        this.product = product;
    }


    @Override
    public ResultInfo call() {
        // TODO Auto-generated method stub
        ResultInfo res = null;
        try {
            double money = product.getSalse_price() - product.getPurchase_price();
            res = new ResultInfo();
            res.setPrice(money);
            res.setProductId(product.getId());
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        } finally {
            try {
                barrier.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return res;
    }
}

운행 중 자물쇠 가 발견 되 었 습 니 다.원인 은 무엇 입 니까?
Cyclic Barrier 자 료 를 찾 아 보 았 습 니 다.이 점 에 주의 하 세 요.
Cyclicbarrier 는 더 높 은 구조 함수 Cyclicbarrier(int parties,Runnable barrier-Action)를 제공 하여 스 레 드 가 장벽 에 도 착 했 을 때 barrierAction 을 우선 수행 합 니 다.
즉,barrier.awat()가 실 행 된 후에 DoAfter 클래스 의 run 을 우선 실행 합 니 다.이때 run 의 future.get()은 Vertica Transfer Task call 실행 결 과 를 기다 리 는 것 을 막 아 자원 상호 가 형성 되 었 습 니 다.
선점 하여 자물쇠 가 생 겼 다.
이렇게 해서 우 리 는 자바 에서 병행 계산 을 실현 하 는 두 가지 방식 이 있다 는 것 을 대충 알 게 되 었 습 니 다.그러면 구체 적 으로 문제 가 발생 했 을 때 어떻게 선택 하 시 겠 습 니까?
우 리 는 두 가지 개념 을 잘 알 아야 한다.
Cyclic Barrier 는 장벽 에 도착 한 후 스 레 드 가 처리 되 지 않 고 차단 되 어 기다 리 고 있 으 며,우선 처리 barrierAction 이 완료 되면 signal All 에 의 해 깨 어 나 계속 운행 된다.
CyclicBarrier 의 소스 코드:

   private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

Future 는 스 레 드 가 실 행 된 후에 야 결 과 를 얻 을 수 있 습 니 다.그렇지 않 으 면 계속 기다 리 는 것 을 막 습 니 다.
이 데모 코드 첨부

좋은 웹페이지 즐겨찾기