Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java锁ReentrantReadWriteLock源码分析 #24

Open
diaosichengxuyuan opened this issue Feb 18, 2019 · 0 comments
Open

java锁ReentrantReadWriteLock源码分析 #24

diaosichengxuyuan opened this issue Feb 18, 2019 · 0 comments

Comments

@diaosichengxuyuan
Copy link
Owner

diaosichengxuyuan commented Feb 18, 2019

ReentrantReadWriteLock是可重入的读写锁,支持公平和非公平模式,ReadLock是共享锁,WriteLock是独占锁。

ReadLock.lock()和unlock()

 public static class ReadLock implements Lock, java.io.Serializable {
        public void lock() {
            //共享的获取状态1
            sync.acquireShared(1);
        }
        
        public void unlock() {
            //共享的释放状态1
            sync.releaseShared(1);
        }
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final void acquireShared(int arg) {
        //如果尝试共享获取状态1不成功,加入同步队列
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    public final boolean releaseShared(int arg) {
        //尝试释放成功,唤醒下个节点
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    private void doAcquireShared(int arg) {
        //加入同步队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            //自旋,如果前驱是首节点,就尝试获取状态;如果获取获取状态
            //失败或者前驱不是首节点,LockSupport.park(Thread),
            //阻塞当前线程
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    //唤醒同步队列中的下一个线程
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;           
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; 
            }
            if (h == head) 
                break;
        }
    }
    
}
abstract static class Sync extends AbstractQueuedSynchronizer {

        //获取读锁数量,高16位为读锁
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        //获取写锁数量,低16位为写锁
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            //如果当前线程是第一个读线程
            if (firstReader == current) {
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            }
            //否则,需要从ThreadLocal中取出当前线程中读锁个数,
            //然后减1
            else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            //死循环方式通过CAS释放状态
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }


        protected final int tryAcquireShared(int unused) {
            //当前线程
            Thread current = Thread.currentThread();
            //状态值,包括读锁和写锁
            int c = getState();
            //如果写锁数量不为0,并且独占线程不是当前线程,表示有其他
            //线程获取了写锁,直接返回-1,当前线程无法获取读锁
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //如果写锁数量为0或者当前线程自己获取到的写锁,则当前线程
            //可以继续获取读锁
            int r = sharedCount(c);
            //readerShouldBlock方法,对于公平锁,只要队列中有线程在        
            //等待,那么将会返回true,也就意味着读线程需要阻塞;对于非
            //公平锁,如果当前有线程获取了写锁,则返回true。
            //如果读线程不需要阻塞,且读锁个数小于最大值,且CAS设置状
            //态成功
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                //如果读锁个数为0,设置当前线程为第一个读线程
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                }
                //如果读锁不为0,但是当前线程是第一个读线程,直接增加
                //firstReaderHoldCount值
                else if (firstReader == current) {
                    firstReaderHoldCount++;
                }
                //否则,需要从ThreadLocal中取出当前线程的读锁个数
                //这个地方在ThreadLocal中保存了每个线程持有的读锁数
                //量,因为读锁是可以共享的,每个线程都可以获取读锁并且
                //可以重入,如果不保存的话,就不知道这个线程是否获取了
                //读锁,也不知道它重入了几次
                else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            //如果读线程需要阻塞,或读锁个数大于最大值,或CAS设置状
            //态失败,进行死循环式设置。这个地方与排它锁不同,排它锁
            //需要进入同步队列,因为排它锁是其他线程长久占有锁,需要 
            //等待唤醒,而这里可能稍微等待即可获取锁,如果需要进入同步
            //队列,会直接返回-1
            return fullTryAcquireShared(current);
        }


        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                //一旦有别的线程获得了写锁,返回-1,失败
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                }
                //如果读线程需要阻塞
                else if (readerShouldBlock()) {
                    if (firstReader == current) {
                    }
                    //说明有别的读线程占有了锁
                    else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                //如果读锁达到了最大值,抛出异常
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //如果CAS设置状态成功
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
    }

WriteLock.lock()和unlock()

 public static class WriteLock implements Lock, java.io.Serializable {

    public void lock() {
            //独占式获取状态1
            sync.acquire(1);
        }
        
    public void unlock() {
            //独占式释放状态1
            sync.release(1);
        }
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    public final void acquire(int arg) {
        //如果尝试获取状态1不成功,加入同步队列,然后自旋
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    
    public final boolean release(int arg) {
        //尝试释放状态1成功,唤醒后继节点
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
}
abstract static class Sync extends AbstractQueuedSynchronizer {

        //获取读锁数量,高16位为读锁
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        //获取写锁数量,低16位为写锁
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }


        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //状态变更
            int nextc = getState() - releases;
            //如果写锁数量变为0,则其他线程可获取锁;如果不为0,表示
            //当前线程重入了几次,未释放完,其他线程不可获取锁
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            //获取锁数量
            int c = getState();
            //获取写锁数量
            int w = exclusiveCount(c);
            //如果锁数量不为0
            if (c != 0) {
                //如果写锁为0--表示读锁不为0,那么任何线程(包括当前)
                //线程,都不可以获取写锁,返回-1
                //如果写锁不为0且独占线程不是当前线程--表示其他线程
                //获取了写锁,返回-1
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //如果写锁数量超过最大值,报错
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //如果锁数量不为0且写锁数量不为0且独占线程是当前线程
                //,则表示是当前线程重入,不用CAS方式设置状态
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

        final int getReadHoldCount() {
            if (getReadLockCount() == 0)
                return 0;

            Thread current = Thread.currentThread();
            if (firstReader == current)
                return firstReaderHoldCount;

            HoldCounter rh = cachedHoldCounter;
            if (rh != null && rh.tid == getThreadId(current))
                return rh.count;

            int count = readHolds.get().count;
            if (count == 0) readHolds.remove();
            return count;
        }
    }

作者原创,转载请注明出处,违法必究!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant