절대 다수의 장면에 적합한 대량 작업 스레드 탱크

17822 단어

전언


업무 중의 한 장면에서 5천여 개의 계약을 긴급하게 처리해야 한다. 계약 처리 과정이 좀 복잡하다. 여기서 말하는 복잡함은 코드가 복잡한 것이 아니라 중간에 여러 개의 서비스 채널을 거쳐야 한다. 대외적으로 연결된 업무를 포함한다. 그래서 이 5천 개를 같은 단계로 처리하면 적어도 반나절은 걸릴 것이다.후기에 똑같은 문제(갱부의 코드와 어지러운 구조 문제)에 직면하게 될 것이라고 확정했기 때문에 계약을 처리하는 라인을 썼는데 글을 쓸수록 흥미가 높아졌다. 마지막에 쓴 후에 이 새는 정말 통용되고 거의 모든 장면에서 대량의 임무를 수행할 수 있다는 것을 발견했다.

약술


이 스레드 탱크는 대량 임무를 위해 정해진 방안이라고 할 수 있을 뿐만 아니라 거의 모든 장면에서의 대량 임무를 실현할 수 있다.크게 네 부분으로 나뉜다.
  • 구속자
  • 수행자
  • 관리자
  • 발기자
  • 구속자


    제약자는 주로 전체 라인 실행 클래스에 대한 제약이다. 그는 공공의 일치된 인터페이스를 정의하여 다른 역할의 스케줄링을 편리하게 했다. 나는 그에게 ThreadTask라는 핑계 클래스를 지어주었다. 다음은 코드이다. 그 중에서 T는 처리할 데이터 유형을 나타낸다.
    package com.cnpany.common.util.thread;
    
    import com.cnpany.common.util.thread.exception.ThreadTaskIncompleteExecutionException;
    
    /**
     *      
     * @author    
     * @time 2017 10 28    11:28:23
     * @email [email protected]
     */
    public interface ThreadTask<T> {
    
        /**
         *     
         */
        void start();
    
        /**
         *     
         */
        void stop();
    
        /**
         *      
         * @return
         */
        int getTaskCount();
    
        /**
         *      
         * @return
         */
        int getThreadCount();
    
        /**
         *     
         * @return
         *          
         */
        long doWait();
    
        /**
         *     
         * @param wautTime
         *                
         * @return
         *                
         */
        long doWait(long wautTime) throws ThreadTaskIncompleteExecutionException;
    }
    

    수행자


    집행자는 직원의 역할과 유사하다. 그는 전체 대량 임무를 수행하는 것을 책임진다. 나는 그를 ' '라고 부르는 것을 더 좋아한다. 그래서 그도 인터페이스 클래스이다. 이 클래스는 excute 방법이 하나밖에 없다. 다음은 이것 의 제약 코드이다. 그 중에서 T는 처리할 데이터 유형을 나타내고 K는 처리 과정에서 필요한 대상을 나타낸다.데이터베이스 및 기타 원격 업무에 대한 조작이 필요하기 때문에 이 로봇에게 서비스를 맡기고 로봇이 이 서비스로 뭔가를 하도록 해야 한다.
    package com.cnpany.common.util.thread;
    
    /**
     *      
     * @author    
     * @time 2017 10 28    11:25:05
     * @email [email protected]
     */
    public interface ThreadRobot<T, K> {
    
        /**
         *     
         * @param t
         */
        void excute(T t, K param);
    }
    

    관리자


    여기의 관리자는 Thread Task라는 인터페이스의 실현에 대한 것이다. 나는 그에게 Batch Thread Task라고 이름을 지었다. 즉, 이 관리자는 일괄 처리 임무의 관리 업무를 맡는다. 물론 서로 다른 업무 수요에 따라 더 많은 관리자를 만들 수 있다. 이 일괄 처리 역할에 서명하지 않으면 절대 다수의 업무의 일괄 조작을 실현할 수 있다. 다음은 구체적인 코드이다.
    package com.cnpany.common.util.thread.impl;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.NoSuchElementException;
    import java.util.concurrent.locks.Lock;
    
    import com.cnpany.common.util.thread.ThreadRobot;
    import com.cnpany.common.util.thread.ThreadTask;
    import com.cnpany.common.util.thread.exception.NotStartedException;
    import com.cnpany.common.util.thread.exception.NullTaskException;
    import com.cnpany.common.util.thread.exception.ThreadTaskIncompleteExecutionException;
    
    /**
     *       
     * @author    
     * @time 2017 10 28    11:33:05
     * @email [email protected]
     */
    public class BatchThread<T, K> implements ThreadTask<T> {
    
        /**
         *     
         */
        private LinkedList param = new LinkedList<>();
    
        /**
         *      
         */
        private int maxThread = 3;
    
        /**
         *      
         */
        private int thisThread = 0;
    
        /**
         *      
         */
        private ThreadRobot roboot;
    
        private long startTime = 0;
        private K member = null;
    
        private Lock lock = null;
    
        private boolean over = false;
    
        private boolean exit = false;
    
        /**
         *       
         * @param roboot
         *                 
         * @param maxThread
         *                 
         * @param param
         *              
         */
        public BatchThread(ThreadRobot roboot, int maxThread, List param, Lock lock, K member) {
            if(null == param || param.isEmpty()) {
                throw new NullTaskException();
            }
            if(maxThread > 0) {
                this.maxThread = maxThread;
            }
            for (T t : param) {
                this.param.addLast(t);
            }
            this.roboot = roboot;
            this.lock = lock;
            this.member = member;
        }
    
        @Override
        public void start() {
            startTime = System.currentTimeMillis();
            for(int i = 1; i <= maxThread; i++) {
                Thread t = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            T item = null;
                            while((item = getList()) != null) {
                                if(exit) {
                                    break;
                                }
                                roboot.excute(item, member);
                            }
                        }finally {
                            synchronized (lock) {
                                thisThread --;
                                if(thisThread == 0) {
                                    over = true;
                                }
                            }
                        }
                    }
                });
                t.start();
                thisThread ++;
            }
        }
    
    
        @Override
        public void stop() {
            //   
        }
    
        @Override
        public int getTaskCount() {
            return param.size();
        }
    
        @Override
        public int getThreadCount() {
            // TODO Auto-generated method stub
            return thisThread;
        }
    
        @Override
        public long doWait() {
            while(!over) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                continue;
            }
    
            if(startTime == 0) {
                throw new NotStartedException();
            }
            return System.currentTimeMillis() - startTime;
        }
    
        private T getList() {
            synchronized (lock) {
                try {
                    return param.removeFirst();
                } catch (NoSuchElementException e) {
                    return null;
                }
            }
        }
    
        @Override
        public long doWait(long wautTime) throws ThreadTaskIncompleteExecutionException {
            while(!over) {
                if(System.currentTimeMillis() - startTime > wautTime) {
                    //    
                    exit = true;
                    if(!over) {
                        throw new ThreadTaskIncompleteExecutionException();
                    }
                }
    
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            if(startTime == 0) {
                throw new NotStartedException();
            }
            return System.currentTimeMillis() - startTime;
        }
    
    }
    

    발기자


    발기자는 하나의 공장류이다. 그의 역할은 창설 관리자의 역할을 통해 당신이 그에게 맡긴 임무를 수행하도록 하는 것이다.사실은 매우 간단하다. 더 이상 말하지 말고 코드를 올려라.
    package com.cnpany.common.util.thread.factory;
    
    import java.util.List;
    import java.util.concurrent.locks.ReentrantLock;
    
    import com.cnpany.common.util.thread.ThreadRobot;
    import com.cnpany.common.util.thread.ThreadTask;
    import com.cnpany.common.util.thread.impl.BatchThread;
    
    /**
     *       
     * @author    
     * @time 2017 10 28    11:27:25
     * @email [email protected]
     */
    public class ThreadTaskFactory {
    
        /**
         *          
         * @param roboot
         *               
         * @param maxThread
         *             
         * @param quqe
         *              
         * @param param
         *                  
         * @return
         */
        public static ThreadTask createBatch(ThreadRobot roboot, int maxThread, List quqe, K param) {
            return new BatchThread(roboot, maxThread, quqe, new ReentrantLock(), param);
        }
    }
    

    OK!지금까지 이 스레드 탱크는 완공되었다고 해도 구체적으로 어떻게 사용할 것인가. 우리는 ThreadTaskFactory를 통해 이런 일괄 처리 임무를 만들 수 있다. 즉, 로봇(일꾼)이 해야 할 논리를 실현하면 된다. 여기서 나는 직접 이렇게 한다.
    /**
     *            
     */
    @Scheduled(cron = "0/20 * * * * ? ")
    public void doCreateIntelAfreement() {
        logger.info("[      ]-----------------------------        ");
        List accounts = commonSignService.listIntelAfreementAccounts();
    
        if(!accounts.isEmpty()) {
            //           
            ThreadTask batchSign = ThreadTaskFactory.createBatch(new ThreadRobot() {
                @Override
                public void excute(String accountId, CommonSignService service) {
                    CommonAttach attach = service.createIntelAfreementPdf(accountId);                               //    
                    String afreementId = service.InsertIntelTenderAfreement(accountId, attach.getId());     //    
                    service.signIntelAfreementPdf(afreementId);                                                                         //    
                }
            }, 20, accounts, commonSignService);
    
            logger.info("[      ]-----------------------------     ");
            batchSign.start();
            try {
                batchSign.doWait(18000);
                logger.info("[      ]-----------------------------     ");
            } catch (ThreadTaskIncompleteExecutionException e) {
                logger.info("[      ]-----------------------------     ,       ");
                e.printStackTrace();
            }
        }
    }

    좋은 웹페이지 즐겨찾기