Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.util.concurrent.CyclicBarrier源码分析 #32

Open
diaosichengxuyuan opened this issue Aug 7, 2020 · 0 comments
Open

java.util.concurrent.CyclicBarrier源码分析 #32

diaosichengxuyuan opened this issue Aug 7, 2020 · 0 comments

Comments

@diaosichengxuyuan
Copy link
Owner

diaosichengxuyuan commented Aug 7, 2020

CyclicBarrier做的事情是,让一组线程到达一个屏障(也可以叫同步点-Barrier)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。而之所以叫Cyclic,是因为所有等待线程被释放后,CyclicBarrier可以被重用。

public class CyclicBarrier {

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    private final ReentrantLock lock = new ReentrantLock();
    
    private final Condition trip = lock.newCondition();
    
    private final int parties;
    
    private final Runnable barrierCommand;
    
    private Generation generation = new Generation();

    private int count;

    private static class Generation {
        boolean broken = false;
    }

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    //栅栏的主方法
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        //加锁,只有一个线程进入处理
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            
            //栅栏可以跳出时
            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            
            //当最后一个线程到达时
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //执行回调函数
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //重置栅栏
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }


            for (;;) {
                try {
                    //线程死循环在此等待
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                //等待的线程一般都会在此跳出
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant