您身边的App定制专业企业--10年开发经验为您护航

18678812288
0531-88887250

Java多线程系列--“JUC锁”08之 共享锁和ReentrantReadWriteLock

文章编辑:2138com太阳集团 时间:2016年12月20日

概要

Java的JUC(java.util.concurrent)包中的锁包括"独占锁"和"共享锁"。在“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock ”中,对Java的独占锁进行了说明。本章对Java的“共享锁”进行先容,JUC中的共享锁有CountDownLatch, CyclicBarrier, Semaphore, ReentrantReadWriteLock等;本章会以ReentrantReadWriteLock为蓝本对共享锁进行说明。内容包括:

ReadWriteLock 和 ReentrantReadWriteLock先容

ReadWriteLock 和 ReentrantReadWriteLock函数列表

参考代码(基于JDK1.7.0_40)

  获取共享锁

  释放共享锁

  公平共享锁和非公平共享锁

ReentrantReadWriteLock示例

 

 

 

ReadWriteLock 和 ReentrantReadWriteLock先容

ReadWriteLock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。

“读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。

“写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。

注意:不能同时存在读取锁和写入锁!

ReadWriteLock是一个接口。ReentrantReadWriteLock是它的实现类,ReentrantReadWriteLock包括子类ReadLock和WriteLock。

 

 

 

ReadWriteLock 和 ReentrantReadWriteLock函数列表

ReadWriteLock函数列表

 

// 返回用于读取操作的锁。

Lock readLock()

// 返回用于写入操作的锁。

Lock writeLock()

 

 

ReentrantReadWriteLock函数列表

 

复制代码

// 创建一个新的 ReentrantReadWriteLock,默认是采用“非公平策略”。

ReentrantReadWriteLock()

// 创建一个新的 ReentrantReadWriteLock,fair是“公平策略”。fair为true,意味着公平策略;否则,意味着非公平策略。

ReentrantReadWriteLock(boolean fair)

 

// 返回当前拥有写入锁的线程,如果没有这样的线程,则返回 null。

protected Thread getOwner()

// 返回一个 collection,它包含可能正在等待获取读取锁的线程。

protected Collection<Thread> getQueuedReaderThreads()

// 返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。

protected Collection<Thread> getQueuedThreads()

// 返回一个 collection,它包含可能正在等待获取写入锁的线程。

protected Collection<Thread> getQueuedWriterThreads()

// 返回等待获取读取或写入锁的线程估计数目。

int getQueueLength()

// 查询当前线程在此锁上保持的重入读取锁数量。

int getReadHoldCount()

// 查询为此锁保持的读取锁数量。

int getReadLockCount()

// 返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。

protected Collection<Thread> getWaitingThreads(Condition condition)

// 返回正等待与写入锁相关的给定条件的线程估计数目。

int getWaitQueueLength(Condition condition)

// 查询当前线程在此锁上保持的重入写入锁数量。

int getWriteHoldCount()

// 查询是否给定线程正在等待获取读取或写入锁。

boolean hasQueuedThread(Thread thread)

// 查询是否所有的线程正在等待获取读取或写入锁。

boolean hasQueuedThreads()

// 查询是否有些线程正在等待与写入锁有关的给定条件。

boolean hasWaiters(Condition condition)

// 如果此锁将公平性设置为 ture,则返回 true。

boolean isFair()

// 查询是否某个线程保持了写入锁。

boolean isWriteLocked()

// 查询当前线程是否保持了写入锁。

boolean isWriteLockedByCurrentThread()

// 返回用于读取操作的锁。

ReentrantReadWriteLock.ReadLock readLock()

// 返回用于写入操作的锁。

ReentrantReadWriteLock.WriteLock writeLock()

复制代码

 

 

参考代码(基于JDK1.7.0_40)

ReentrantReadWriteLock的完整源码

 

 View Code

 

 

AQS的完整源码

 

 View Code

 

 

其中,共享锁源码相关的代码如下:

 

复制代码

public static class ReadLock implements Lock, java.io.Serializable {

    private static final long serialVersionUID = -5992448646407690164L;

    // ReentrantReadWriteLock的AQS对象

    private final Sync sync;

 

    protected ReadLock(ReentrantReadWriteLock lock) {

        sync = lock.sync;

    }

 

    // 获取“共享锁”

    public void lock() {

        sync.acquireShared(1);

    }

 

    // 如果线程是中断状态,则抛出一场,否则尝试获取共享锁。

    public void lockInterruptibly() throws InterruptedException {

        sync.acquireSharedInterruptibly(1);

    }

 

    // 尝试获取“共享锁”

    public  boolean tryLock() {

        return sync.tryReadLock();

    }

 

    // 在指定时间内,尝试获取“共享锁”

    public boolean tryLock(long timeout, TimeUnit unit)

            throws InterruptedException {

        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

    }

 

    // 释放“共享锁”

    public  void unlock() {

        sync.releaseShared(1);

    }

 

    // 新建条件

    public Condition newCondition() {

        throw new UnsupportedOperationException();

    }

 

    public String toString() {

        int r = sync.getReadLockCount();

        return super.toString() +

            "[Read locks = " + r + "]";

    }

}

复制代码

说明:

ReadLock中的sync是一个Sync对象,Sync继承于AQS类,即Sync就是一个锁。ReentrantReadWriteLock中也有一个Sync对象,而且ReadLock中的sync和ReentrantReadWriteLock中的sync是对应关系。即ReentrantReadWriteLock和ReadLock共享同一个AQS对象,共享同一把锁。

ReentrantReadWriteLock中Sync的定义如下:

 

final Sync sync;

下面,分别从“获取共享锁”和“释放共享锁”两个方面对共享锁进行说明。

 

 

 

获取共享锁

获取共享锁的思想(即lock函数的步骤),是先通过tryAcquireShared()尝试获取共享锁。尝试成功的话,则直接返回;尝试失败的话,则通过doAcquireShared()不断的循环并尝试获取锁,若有需要,则阻塞等待。doAcquireShared()在循环中每次尝试获取锁时,都是通过tryAcquireShared()来进行尝试的。下面看看“获取共享锁”的详细流程。

 

1. lock()

 

lock()在ReadLock中,源码如下:

 

public void lock() {

    sync.acquireShared(1);

}

 

 

2. acquireShared()

 

Sync继承于AQS,acquireShared()定义在AQS中。源码如下:

 

public final void acquireShared(int arg) {

    if (tryAcquireShared(arg) < 0)

        doAcquireShared(arg);

}

说明:acquireShared()首先会通过tryAcquireShared()来尝试获取锁。

尝试成功的话,则不再做任何动作(因为已经成功获取到锁了)。

尝试失败的话,则通过doAcquireShared()来获取锁。doAcquireShared()会获取到锁了才返回。

 

 

 

3. tryAcquireShared()

 

tryAcquireShared()定义在ReentrantReadWriteLock.java的Sync中,源码如下:

 

复制代码

protected final int tryAcquireShared(int unused) {

    Thread current = Thread.currentThread();

    // 获取“锁”的状态

    int c = getState();

    // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。

    if (exclusiveCount(c) != 0 &&

        getExclusiveOwnerThread() != current)

        return -1;

    // 获取“读取锁”的共享计数

    int r = sharedCount(c);

    // 如果“不需要阻塞等待”,并且“读取锁”的共享计数小于MAX_COUNT;

    // 则通过CAS函数更新“锁的状态”,将“读取锁”的共享计数+1。

    if (!readerShouldBlock() &&

        r < MAX_COUNT &&

        compareAndSetState(c, c + SHARED_UNIT)) {

        // 第1次获取“读取锁”。

        if (r == 0) { 

            firstReader = current;

            firstReaderHoldCount = 1;

        // 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程

        } else if (firstReader == current) { 

            firstReaderHoldCount++;

        } else {

            // HoldCounter是用来统计该线程获取“读取锁”的次数。

            HoldCounter rh = cachedHoldCounter;

            if (rh == null || rh.tid != current.getId())

                cachedHoldCounter = rh = readHolds.get();

            else if (rh.count == 0)

                readHolds.set(rh);

            // 将该线程获取“读取锁”的次数+1。

            rh.count++;

        }

        return 1;

    }

    return fullTryAcquireShared(current);

}

复制代码

说明:tryAcquireShared()的作用是尝试获取“共享锁”。

如果在尝试获取锁时,“不需要阻塞等待”并且“读取锁的共享计数小于MAX_COUNT”,则直接通过CAS函数更新“读取锁的共享计数”,以及将“当前线程获取读取锁的次数+1”。

否则,通过fullTryAcquireShared()获取读取锁。

 

 

 

4. fullTryAcquireShared()

 

fullTryAcquireShared()在ReentrantReadWriteLock中定义,源码如下:

 

复制代码

final int fullTryAcquireShared(Thread current) {

    HoldCounter rh = null;

    for (;;) {

        // 获取“锁”的状态

        int c = getState();

        // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。

        if (exclusiveCount(c) != 0) {

            if (getExclusiveOwnerThread() != current)

                return -1;

        // 如果“需要阻塞等待”。

        // (01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下实行。

        // (02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。

        } else if (readerShouldBlock()) {

            // 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程

            if (firstReader == current) {

            } else {

                if (rh == null) {

                    rh = cachedHoldCounter;

                    if (rh == null || rh.tid != current.getId()) {

                        rh = readHolds.get();

                        if (rh.count == 0)

                            readHolds.remove();

                    }

                }

                // 如果当前线程获取锁的计数=0,则返回-1。

                if (rh.count == 0)

                    return -1;

            }

        }

        // 如果“不需要阻塞等待”,则获取“读取锁”的共享统计数;

        // 如果共享统计数超过MAX_COUNT,则抛出异常。

        if (sharedCount(c) == MAX_COUNT)

            throw new Error("Maximum lock count exceeded");

        // 将线程获取“读取锁”的次数+1。

        if (compareAndSetState(c, c + SHARED_UNIT)) {

            // 如果是第1次获取“读取锁”,则更新firstReader和firstReaderHoldCount。

            if (sharedCount(c) == 0) {

                firstReader = current;

                firstReaderHoldCount = 1;

            // 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程,

            // 则将firstReaderHoldCount+1。

            } else if (firstReader == current) {

                firstReaderHoldCount++;

            } else {

                if (rh == null)

                    rh = cachedHoldCounter;

                if (rh == null || rh.tid != current.getId())

                    rh = readHolds.get();

                else if (rh.count == 0)

                    readHolds.set(rh);

                // 更新线程的获取“读取锁”的共享计数

                rh.count++;

                cachedHoldCounter = rh; // cache for release

            }

            return 1;

        }

    }

}

复制代码

说明:fullTryAcquireShared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。

 

 

 

5. doAcquireShared()

 

doAcquireShared()定义在AQS函数中,源码如下:

 

复制代码

private void doAcquireShared(int arg) {

    // addWaiter(Node.SHARED)的作用是,创建“当前线程”对应的节点,并将该线程添加到CLH队列中。

    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;

    try {

        boolean interrupted = false;

        for (;;) {

            // 获取“node”的前一节点

            final Node p = node.predecessor();

            // 如果“当前线程”是CLH队列的表头,则尝试获取共享锁。

            if (p == head) {

                int r = tryAcquireShared(arg);

                if (r >= 0) {

                    setHeadAndPropagate(node, r);

                    p.next = null; // help GC

                    if (interrupted)

                        selfInterrupt();

                    failed = false;

                    return;

                }

            }

            // 如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,

            // 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。

            if (shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

复制代码

说明:doAcquireShared()的作用是获取共享锁。

它会首先创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。

如果“当前线程”是CLH队列的表头,则尝试获取共享锁;否则,则需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。

doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来实行的!

 

若读者对CLH队列,shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()等内容的细节感兴趣,可以参考“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock”。

 

 

 

释放共享锁

释放共享锁的思想,是先通过tryReleaseShared()尝试释放共享锁。尝试成功的话,则通过doReleaseShared()唤醒“其他等待获取共享锁的线程”,并返回true;否则的话,返回flase。

 

1. unlock()

 

public  void unlock() {

    sync.releaseShared(1);

}

说明:该函数实际上调用releaseShared(1)释放共享锁。

 

 

 

2. releaseShared()

 

releaseShared()在AQS中实现,源码如下:

 

复制代码

public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        doReleaseShared();

        return true;

    }

    return false;

}

复制代码

说明:releaseShared()的目的是让当前线程释放它所持有的共享锁。

它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。

 

 

 

3. tryReleaseShared()

 

tryReleaseShared()定义在ReentrantReadWriteLock中,源码如下:

 

复制代码

protected final boolean tryReleaseShared(int unused) {

    // 获取当前线程,即释放共享锁的线程。

    Thread current = Thread.currentThread();

    // 如果想要释放锁的线程(current)是第1个获取锁(firstReader)的线程,

    // 并且“第1个获取锁的线程获取锁的次数”=1,则设置firstReader为null;

    // 否则,将“第1个获取锁的线程的获取次数”-1。

    if (firstReader == current) {

        // assert firstReaderHoldCount > 0;

        if (firstReaderHoldCount == 1)

            firstReader = null;

        else

            firstReaderHoldCount--;

    // 获取rh对象,并更新“当前线程获取锁的信息”。

    } else {

 

        HoldCounter rh = cachedHoldCounter;

        if (rh == null || rh.tid != current.getId())

            rh = readHolds.get();

        int count = rh.count;

        if (count <= 1) {

            readHolds.remove();

            if (count <= 0)

                throw unmatchedUnlockException();

        }

        --rh.count;

    }

    for (;;) {

        // 获取锁的状态

        int c = getState();

        // 将锁的获取次数-1。

        int nextc = c - SHARED_UNIT;

        // 通过CAS更新锁的状态。

        if (compareAndSetState(c, nextc))

            return nextc == 0;

    }

}

复制代码

说明:tryReleaseShared()的作用是尝试释放共享锁。

 

 

 

4. doReleaseShared()

 

doReleaseShared()定义在AQS中,源码如下:

 

复制代码

private void doReleaseShared() {

    for (;;) {

        // 获取CLH队列的头节点

        Node h = head;

        // 如果头节点不为null,并且头节点不等于tail节点。

        if (h != null && h != tail) {

            // 获取头节点对应的线程的状态

            int ws = h.waitStatus;

            // 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。

            if (ws == Node.SIGNAL) {

                // 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。

                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

                    continue;

                // 唤醒“头节点的下一个节点所对应的线程”。

                unparkSuccessor(h);

            }

            // 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。

            else if (ws == 0 &&

                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

                continue;                // loop on failed CAS

        }

        // 如果头节点发生变化,则继续循环。否则,退出循环。

        if (h == head)                   // loop if head changed

            break;

    }

}

复制代码

说明:doReleaseShared()会释放“共享锁”。它会从前往后的遍历CLH队列,依次“唤醒”然后“实行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的锁。

 

 

 

公平共享锁和非公平共享锁

和互斥锁ReentrantLock一样,ReadLock也分为公平锁和非公平锁。

 

公平锁和非公平锁的区别,体现在判断是否需要阻塞的函数readerShouldBlock()是不同的。

公平锁的readerShouldBlock()的源码如下:

 

final boolean readerShouldBlock() {

    return hasQueuedPredecessors();

}

 

 

在公平共享锁中,如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。

非公平锁的readerShouldBlock()的源码如下:

 

final boolean readerShouldBlock() {

    return apparentlyFirstQueuedIsExclusive();

}

在非公平共享锁中,它会无视当前线程的前面是否有其他线程在等待获取共享锁。只要该非公平共享锁对应的线程不为null,则返回true。

 

 

 

ReentrantReadWriteLock示例

复制代码

 1 import java.util.concurrent.locks.ReadWriteLock; 

 2 import java.util.concurrent.locks.ReentrantReadWriteLock; 

 3 

 4 public class ReadWriteLockTest1 { 

 5 

 6     public static void main(String[] args) { 

 7         // 创建账户

 8         MyCount myCount = new MyCount("4238920615242830", 10000); 

 9         // 创建用户,并指定账户

10         User user = new User("Tommy", myCount); 

11 

12         // 分别启动3个“读取账户金钱”的线程 和 3个“设置账户金钱”的线程

13         for (int i=0; i<3; i++) {

14             user.getCash();

15             user.setCash((i+1)*1000);

16         }

17     } 

18 } 

19 

20 class User {

21     private String name;            //用户名 

22     private MyCount myCount;        //所要操作的账户 

23     private ReadWriteLock myLock;   //实行操作所需的锁对象 

24 

25     User(String name, MyCount myCount) {

26         this.name = name; 

27         this.myCount = myCount; 

28         this.myLock = new ReentrantReadWriteLock();

29     }

30 

31     public void getCash() {

32         new Thread() {

33             public void run() {

34                 myLock.readLock().lock(); 

35                 try {

36                     System.out.println(Thread.currentThread().getName() +" getCash start"); 

37                     myCount.getCash();

38                     Thread.sleep(1);

39                     System.out.println(Thread.currentThread().getName() +" getCash end"); 

40                 } catch (InterruptedException e) {

41                 } finally {

42                     myLock.readLock().unlock(); 

43                 }

44             }

45         }.start();

46     }

47 

48     public void setCash(final int cash) {

49         new Thread() {

50             public void run() {

51                 myLock.writeLock().lock(); 

52                 try {

53                     System.out.println(Thread.currentThread().getName() +" setCash start"); 

54                     myCount.setCash(cash);

55                     Thread.sleep(1);

56                     System.out.println(Thread.currentThread().getName() +" setCash end"); 

57                 } catch (InterruptedException e) {

58                 } finally {

59                     myLock.writeLock().unlock(); 

60                 }

61             }

62         }.start();

63     }

64 }

65 

66 class MyCount {

67     private String id;         //账号 

68     private int    cash;       //账户余额 

69 

70     MyCount(String id, int cash) { 

71         this.id = id; 

72         this.cash = cash; 

73     } 

74 

75     public String getId() { 

76         return id; 

77     } 

78 

79     public void setId(String id) { 

80         this.id = id; 

81     } 

82 

83     public int getCash() { 

84         System.out.println(Thread.currentThread().getName() +" getCash cash="+ cash); 

85         return cash; 

86     } 

87 

88     public void setCash(int cash) { 

89         System.out.println(Thread.currentThread().getName() +" setCash cash="+ cash); 

90         this.cash = cash; 

91     } 

92 }

复制代码

运行结果:

 

复制代码

Thread-0 getCash start

Thread-2 getCash start

Thread-0 getCash cash=10000

Thread-2 getCash cash=10000

Thread-0 getCash end

Thread-2 getCash end

Thread-1 setCash start

Thread-1 setCash cash=1000

Thread-1 setCash end

Thread-3 setCash start

Thread-3 setCash cash=2000

Thread-3 setCash end

Thread-4 getCash start

Thread-4 getCash cash=2000

Thread-4 getCash end

Thread-5 setCash start

Thread-5 setCash cash=3000

Thread-5 setCash end

复制代码

结果说明:

(01) 观察Thread0和Thread-2的运行结果,大家发现,Thread-0启动并获取到“读取锁”,在它还没运行完毕的时候,Thread-2也启动了并且也成功获取到“读取锁”。

因此,“读取锁”支撑被多个线程同时获取。

(02) 观察Thread-1,Thread-3,Thread-5这三个“写入锁”的线程。只要“写入锁”被某线程获取,则该线程运行完毕了,才释放该锁。

因此,“写入锁”不支撑被多个线程同时获取。


想要了解更多详情欢迎来电咨询18678812288
登陆网址:www.jnydkj.cn。
联系人:王经理。

XML 地图 | Sitemap 地图