Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

线程池源码分析 #27

Open
diaosichengxuyuan opened this issue Feb 18, 2019 · 0 comments
Open

线程池源码分析 #27

diaosichengxuyuan opened this issue Feb 18, 2019 · 0 comments

Comments

@diaosichengxuyuan
Copy link
Owner

diaosichengxuyuan commented Feb 18, 2019

1.线程池原理图

default

2.创建

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        //核心线程数
        this.corePoolSize = corePoolSize;
        //最大线程数
        this.maximumPoolSize = maximumPoolSize;
        //任务队列
        this.workQueue = workQueue;
        //最大线程数-核心线程数=非核心线程数,此处是非核心线程空闲时
        //存活时间
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        //线程工厂
        this.threadFactory = threadFactory;
        //拒绝策略,DiscardPolicy/AbortPolicy等
        this.handler = handler;
    }

3.execute

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //如果工作线程数小于核心线程数,创建核心线程并执行任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池正在运行,将任务添加到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程池不再运行,执行删除任务,然后执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果工作线程的数量为0,启动非核心线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //启动非核心线程并执行任务,如果失败,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
    
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加工作线程数量
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个Worker对象,先看下Worker的分析
            w = new Worker(firstTask);
            //取出Worker中的线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    //Worker对象继承自AQS,可以方便执行一些锁的逻辑
    //同时Worker实现了Runnable接口,可以将自身作为一个Thread任务执行
    //同时Worker持有了一个Thread,可以执行启动,而firstTask就是我们
    //提交到线程池的任务
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        final Thread thread;
        Runnable firstTask;
    
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        //线程start后会回调到这个方法
        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }    
    }
    
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            //看下这个地方的逻辑,一个Worker进来时,如果firstTask不
            //为null,则先执行firstTask;同时w.firstTask被置成
            //null,这样下次循环就会调用getTask()从阻塞队列中获取任
            //务,如果获取不到,就一直循环
            while (task != null || (task = getTask()) != null) {
                //线程执行任务时是加锁的
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //前置拦截器
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行runnable
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //后置拦截器
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

4.shutdown

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //设置状态是SHUTDOWN,如果线程池状态一旦设为SHUTDOWN,则在线程池中会出
            //现两种现象:
            //1.你不能再往线程池中添加任何任务,否则会抛
            //RejectedExecutionException异常(详细请看ThreadPoolExecutor的
            //addIfUnderCorePoolSize方法)。
            //2.工作线程Worker获得池队列中的任务时(详细看Worker中的getTask()方
            //法)的处理逻辑也发生了变化:如果线程池为RUNNING状态,并且池队列中没任
            //务时,它会一直等待,直到你提交任务到池队列中,然后取出任务,返回。但
            //是,一旦你执行了shutDown()方法,线程池状态为SHUTDOWN状态,它将不再等
            //待了,直接返回null。如果返回null,则工作线程没有要执行的任务,直接退
            //出(详细看Worker中run()方法)。
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    //遍历线程池所有线程,进行中断,过程中加锁,防止被重复shutdown
    //只有w.tryLock()成功的任务,才会中断,上面分析过,线程池执行
    //任务的线程是加锁锁定的,所以此时执行任务的线程不会被中断
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

5.shutdownNow()

  public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //设置状态为STOP
            advanceRunState(STOP);
            //将所有线程(无论是否执行任务)立即中断
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

作者原创,转载请注明出处,违法必究!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant