전략 모델 시 뮬 레이 션 을 통 해 자바 스 레 드 탱크 의 논리 와 스 레 드 탱크 의 실현 원 리 를 실현 하고 자신 이 반드시 손 을 써 야 안의 오묘 함 을 알 수 있다.

며칠 전에 고정된 크기 의 연결 탱크 를 썼 습 니 다.오늘 은 학습 을 통 해 스 레 드 탱크 의 실현 논 리 를 정리 하고 이 코드 를 보고 자바 스 레 드 탱크 의 기본 사상 을 완전히 hold 할 수 있 습 니 다.고급 프로그래머 와 한 걸음 더 가 까 워 졌 습 니 다.여러분 의 참고 와 교 류 를 환영 합 니 다. 
package com.smallfan.connectionpool;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @PACKAGE_NAME: com.smallfan.connectionpool
 * @NAME: TestThreadPool
 * @USER: dell
 * @DATE: 2020/5/29
 * @PROJECT_NAME: aboutthread
 */
@Slf4j
public class TestThreadPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1, 1000,
                TimeUnit.MILLISECONDS, 1, ((queue, task) -> {
            //          
//            queue.takeQueue();
            //    
//            Object o = queue.takeQueueForTime(500, TimeUnit.MILLISECONDS);
            //  
//            log.info("   ,   {}",task);
            //    
//            throw new RuntimeException("    "+task);
            //       
            task.run();
        }));
        for (int i = 0; i < 5; i++) {
            int j = i;
            threadPool.execute(() ->
                    {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        log.info("   " + j);
                    }
            );
        }
    }
}

@Slf4j
class ThreadPool {
    //    
    private BlockingQueue taskQueue;
    //    
    private HashSet workers = new HashSet();
    //   
    private int threadSize;
    //    
    private long timeout;
    //    
    private TimeUnit timeUnit;
    //    
    private RejectPolicy policy;

    public ThreadPool(int threadSize, long timeout, TimeUnit timeUnit, int capacity, RejectPolicy policy) {
        this.threadSize = threadSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        taskQueue = new BlockingQueue<>(capacity);
        this.policy = policy;
    }

    public void execute(Runnable task) {
        synchronized (workers) {//          
            //       threadSize     
            //         
            if (workers.size() < threadSize) {
                log.info("  worker{}", task);
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {

                //taskQueue.putQueue(task);
                /**
                 *     
                 * 1      
                 * 2      
                 * 3  
                 * 4     
                 * 5    
                 *              
                 */
                taskQueue.tryPut(policy, task);
            }

        }
    }

    @FunctionalInterface
    interface RejectPolicy {
        void reject(BlockingQueue queue, T task);
    }

    class Worker extends Thread {
        private Runnable runnable;

        public Worker(Runnable runnable) {
            this.runnable = runnable;
        }

        @Override
        public void run() {
            /**
             *     
             * 1. runnable    
             * 2.            
             */
//            while (runnable != null || (runnable = taskQueue.takeQueue()) != null) {
            while (runnable != null || (runnable = taskQueue.takeQueueForTime(timeout, timeUnit)) != null) {
                try {
                    log.info("  worker{}", runnable);
                    runnable.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    runnable = null;//     
                }
            }
            synchronized (workers) {
                log.info("  worker{}", this);
                workers.remove(this);
            }
        }
    }
}

//      
@Slf4j
class BlockingQueue {

    //1.      
    private int capacity;
    //2.      ,    
    private Deque deque = new ArrayDeque();
    //3.   
    private ReentrantLock lock = new ReentrantLock();
    //4.       
    private Condition emptyWaitSet = lock.newCondition();
    //5.       
    private Condition fullWaitSet = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    //      
    public T takeQueue() {
        lock.lock();
        try {
            while (deque.isEmpty()) {//    
                try {
                    emptyWaitSet.await();//        
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //     
            T t = deque.removeFirst();
            fullWaitSet.signal();//     
            return t;
        } finally {
            lock.unlock();//   ,    
        }
    }

    //      
    public T takeQueueForTime(long timeout, TimeUnit unit) {
        lock.lock();
        long nanos = unit.toNanos(timeout);//      
        try {
            while (deque.isEmpty()) {//    
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);//                   
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //     
            T t = deque.removeFirst();
            fullWaitSet.signal();//     
            return t;
        } finally {
            lock.unlock();//   ,    
        }
    }

    //      
    public void putQueue(T task) {
        lock.lock();
        try {
            while (deque.size() == capacity) {//    
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.addLast(task);
            emptyWaitSet.signal();
            log.info("     {}", task);
        } finally {
            lock.unlock();
        }
    }

    /**
     *     ,           
     *
     * @param task
     * @param timeout
     * @param timeUnit
     * @return
     */
    public boolean putQueueForTimeOut(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        long nanos = timeUnit.toNanos(timeout);
        try {
            while (deque.size() == capacity) {//    
                try {
                    if (nanos <= 0) {//    
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.addLast(task);
            emptyWaitSet.signal();
            return true;//    
        } finally {
            lock.unlock();
        }
    }

    //    
    public int getCapacity() {
        lock.lock();
        try {
            return deque.size();
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(ThreadPool.RejectPolicy policy, T task) {
        lock.lock();
        try {
            if (deque.size() == capacity) {//                
                policy.reject(this, task);
            } else {//  
                deque.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}

좋은 웹페이지 즐겨찾기