Java并发编程(十一) : JUC 之 ReentrantReadWriteLock、StampedLock、Semaphore(信号量)、CountdownLatch、CyclicBarrier·
ReentrantReadWriteLock·
当读操作远远高于写操作时,这时候使用 读写锁 让 读-读
可以并发,提高性能。 类似于数据库中的 select ... from ... lock in share mode
提供一个 数据容器类 内部分别使用读锁保护数据的 read()
方法,写锁保护数据的 write()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| class DataContainer {
private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() { log.debug("获取读锁..."); r.lock(); try { log.debug("读取"); sleep(1); return data; } finally { log.debug("释放读锁..."); r.unlock(); } }
public void write() { log.debug("获取写锁..."); w.lock(); try { log.debug("写入"); sleep(1); } finally { log.debug("释放写锁..."); w.unlock(); } } }
|
测试 读锁-读锁
可以并发
1 2 3 4 5 6 7
| DataContainer dataContainer = new DataContainer(); new Thread(() -> { dataContainer.read(); }, "t1").start(); new Thread(() -> { dataContainer.read(); }, "t2").start();
|
输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响
1 2 3 4 5 6
| 14:05:14.341 c.DataContainer [t2] - 获取读锁... 14:05:14.341 c.DataContainer [t1] - 获取读锁... 14:05:14.345 c.DataContainer [t1] - 读取 14:05:14.345 c.DataContainer [t2] - 读取 14:05:15.365 c.DataContainer [t2] - 释放读锁... 14:05:15.386 c.DataContainer [t1] - 释放读锁...
|
测试 读锁-写锁 相互阻塞
1 2 3 4 5 6 7 8
| DataContainer dataContainer = new DataContainer(); new Thread(() -> { dataContainer.read(); }, "t1").start(); Thread.sleep(100); new Thread(() -> { dataContainer.write(); }, "t2").start();
|
输出结果
1 2 3 4 5 6
| 14:04:21.838 c.DataContainer [t1] - 获取读锁... 14:04:21.838 c.DataContainer [t2] - 获取写锁... 14:04:21.841 c.DataContainer [t2] - 写入 14:04:22.843 c.DataContainer [t2] - 释放写锁... 14:04:22.843 c.DataContainer [t1] - 读取 14:04:23.843 c.DataContainer [t1] - 释放读锁...
|
写锁-写锁
也是相互阻塞的,这里就不测试了
注意事项·
- 读锁不支持条件变量
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
1 2 3 4 5 6 7 8 9 10 11 12
| r.lock(); try { w.lock(); try { } finally{ w.unlock(); } } finally{ r.unlock(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true; } rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
|
应用(缓存)·
缓存更新策略·
更新时,是先清缓存还是先更新数据库
- 先清缓存
- 先更新数据库
补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询
这种情况的出现几率非常小,见 facebook 论文
读写锁实现一致性缓存·
使用读写锁实现一个简单的按需加载缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| class GenericCachedDao<T> { HashMap<SqlPair, T> map = new HashMap<>(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); GenericDao genericDao = new GenericDao(); public int update(String sql, Object... params) { SqlPair key = new SqlPair(sql, params); lock.writeLock().lock(); try { int rows = genericDao.update(sql, params); map.clear(); return rows; } finally { lock.writeLock().unlock(); } } public T queryOne(Class<T> beanClass, String sql, Object... params) { SqlPair key = new SqlPair(sql, params); lock.readLock().lock(); try { T value = map.get(key); if (value != null) { return value; } } finally { lock.readLock().unlock(); } lock.writeLock().lock(); try { T value = map.get(key); if (value == null) { value = genericDao.queryOne(beanClass, sql, params); map.put(key, value); } return value; } finally { lock.writeLock().unlock(); } } class SqlPair { private String sql; private Object[] params; public SqlPair(String sql, Object[] params) { this.sql = sql; this.params = params; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } SqlPair sqlPair = (SqlPair) o; return sql.equals(sqlPair.sql) && Arrays.equals(params, sqlPair.params); } @Override public int hashCode() { int result = Objects.hash(sql); result = 31 * result + Arrays.hashCode(params); return result; } } }
|
注意
- 以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑
- 适合读多写少,如果写操作比较频繁,以上实现性能低
- 没有考虑缓存容量
- 没有考虑缓存过期
- 只适合单机
- 并发性还是低,目前只会用一把锁
- 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)
- 乐观锁实现:用 CAS 去更新
读写锁原理·
图解流程·
读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个
t1 w.lock,t2 r.lock·
- t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位
- t2 执行 r.lock,这时进入读锁的
sync.acquireShared(1)
流程,首先会进入 tryAcquireShared
流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
tryAcquireShared 返回值表示
- 这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
- t2 会看看自己的节点是不是老二,如果是,还会再次调用
tryAcquireShared(1)
来尝试获取锁 - 如果没有成功,在
doAcquireShared
内 for (;;)
循环一次,把前驱节点的 waitStatus
改为 -1,再 for (;;)
循环一次尝试 tryAcquireShared(1)
如果还不成功 (总共会尝试3次,才会阻塞),那么在 parkAndCheckInterrupt()
处 park
t3 r.lock,t4 w.lock·
这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子
t1 w.unlock·
这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子
接下来执行唤醒流程 sync.unparkSuccessor
,即让老二恢复运行,这时 t2 在 doAcquireShared
内parkAndCheckInterrupt()
处恢复运行这回再来一次 for (;;)
执行 tryAcquireShared
成功则让读锁计数加一
这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
事情还没完,在 setHeadAndPropagate
方法内还会检查下一个节点是否是 shared,如果是则调用doReleaseShared()
将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared
内parkAndCheckInterrupt()
处恢复运行
这回再来一次 for (;;)
执行 tryAcquireShared
成功则让读锁计数加一
这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点
t2 r.unlock,t3 r.unlock·
t2 进入 sync.releaseShared(1)
中,调用 tryReleaseShared(1)
让计数减一,但由于计数还不为零
t3 进入 sync.releaseShared(1)
中,调用 tryReleaseShared(1)
让计数减一,这回计数为零了,进入doReleaseShared()
将头节点从 -1 改为 0 并唤醒老二,即
之后 t4 在 acquireQueued 中 parkAndCheckInterrupt
处恢复运行,再次 for (;;)
这次自己是老二,并且没有其他竞争,tryAcquire(1)
成功,修改头结点,流程结束
源码分析·
写锁上锁流程·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| static final class NonfairSync extends Sync { public void lock() { sync.acquire(1); } public final void acquire(int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { if ( w == 0 || current != getExclusiveOwnerThread() ) { return false; } if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } if ( writerShouldBlock() || !compareAndSetState(c, c + acquires) ) { return false; } setExclusiveOwnerThread(current); return true; } final boolean writerShouldBlock() { return false; } }
|
写锁释放流程·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| static final class NonfairSync extends Sync { public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) { setExclusiveOwnerThread(null); } setState(nextc); return free; } }
|
读锁上锁流程·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| static final class NonfairSync extends Sync { public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) { doAcquireShared(arg); } } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if ( exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current ) { return -1; } int r = sharedCount(c); if ( !readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT) ) { return 1; } return fullTryAcquireShared(current); } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { return 1; } } } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; if (interrupted) selfInterrupt(); failed = false; return; } } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) { doReleaseShared(); } } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } } }
|
读锁释放流程·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| static final class NonfairSync extends Sync { public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) { return nextc == 0; } } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } } }
|
StampedLock·
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用加解读锁
1 2
| long stamp = lock.readLock(); lock.unlockRead(stamp);
|
加解写锁
1 2
| long stamp = lock.writeLock(); lock.unlockWrite(stamp);
|
乐观读,StampedLock
支持 tryOptimisticRead()
方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
1 2 3 4 5
| long stamp = lock.tryOptimisticRead();
if(!lock.validate(stamp)){
}
|
提供一个 数据容器类 内部分别使用读锁保护数据的 read()
方法,写锁保护数据的 write()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock(); public DataContainerStamped(int data) { this.data = data; } public int read(int readTime) { long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}", stamp); sleep(readTime); if (lock.validate(stamp)) { log.debug("read finish...{}, data:{}", stamp, data); return data; } log.debug("updating to read lock... {}", stamp); try { stamp = lock.readLock(); log.debug("read lock {}", stamp); sleep(readTime); log.debug("read finish...{}, data:{}", stamp, data); return data; } finally { log.debug("read unlock {}", stamp); lock.unlockRead(stamp); } } public void write(int newData) { long stamp = lock.writeLock(); log.debug("write lock {}", stamp); try { sleep(2); this.data = newData; } finally { log.debug("write unlock {}", stamp); lock.unlockWrite(stamp); } } }
|
测试 读-读
可以优化
1 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); sleep(0.5); new Thread(() -> { dataContainer.read(0); }, "t2").start(); }
|
输出结果,可以看到实际没有加读锁
1 2 3 4
| 15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1
|
测试 读-写
时优化读补加读锁
1 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); sleep(0.5); new Thread(() -> { dataContainer.write(100); }, "t2").start(); }
|
输出结果
1 2 3 4 5 6 7
| 15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 15:57:00.717 c.DataContainerStamped [t2] - write lock 384 15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 15:57:02.719 c.DataContainerStamped [t1] - read lock 513 15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 15:57:03.719 c.DataContainerStamped [t1] - read unlock 513
|
注意
- StampedLock 不支持条件变量
- StampedLock 不支持可重入
Semaphore·
[ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { new Thread(() -> { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..."); sleep(1); log.debug("end..."); } finally { semaphore.release(); } }).start(); } }
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 07:35:15.485 c.TestSemaphore [Thread-2] - running... 07:35:15.485 c.TestSemaphore [Thread-1] - running... 07:35:15.485 c.TestSemaphore [Thread-0] - running... 07:35:16.490 c.TestSemaphore [Thread-2] - end... 07:35:16.490 c.TestSemaphore [Thread-0] - end... 07:35:16.490 c.TestSemaphore [Thread-1] - end... 07:35:16.490 c.TestSemaphore [Thread-3] - running... 07:35:16.490 c.TestSemaphore [Thread-5] - running... 07:35:16.490 c.TestSemaphore [Thread-4] - running... 07:35:17.490 c.TestSemaphore [Thread-5] - end... 07:35:17.490 c.TestSemaphore [Thread-4] - end... 07:35:17.490 c.TestSemaphore [Thread-3] - end... 07:35:17.490 c.TestSemaphore [Thread-6] - running... 07:35:17.490 c.TestSemaphore [Thread-7] - running... 07:35:17.490 c.TestSemaphore [Thread-9] - running... 07:35:18.491 c.TestSemaphore [Thread-6] - end... 07:35:18.491 c.TestSemaphore [Thread-7] - end... 07:35:18.491 c.TestSemaphore [Thread-9] - end... 07:35:18.491 c.TestSemaphore [Thread-8] - running... 07:35:19.492 c.TestSemaphore [Thread-8] - end...
|
Semaphore 应用·
- 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
- 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好,注意下面的实现中线程数和数据库连接数是相等的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Slf4j(topic = "c.Pool") class Pool { private final int poolSize; private Connection[] connections; private AtomicIntegerArray states; private Semaphore semaphore; public Pool(int poolSize) { this.poolSize = poolSize; this.semaphore = new Semaphore(poolSize); this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接" + (i+1)); } } public Connection borrow() { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < poolSize; i++) { if(states.get(i) == 0) { if (states.compareAndSet(i, 0, 1)) { log.debug("borrow {}", connections[i]); return connections[i]; } } } return null; } public void free(Connection conn) { for (int i = 0; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0); log.debug("free {}", conn); semaphore.release(); break; } } } }
|
Semaphore 原理·
加锁解锁流程·
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
这时 Thread-4 释放了 permits,状态如下
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
源码分析·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if ( remaining < 0 || compareAndSetState(available, remaining) ) { return remaining; } } } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } }
|
为什么要有 PROPAGATE·
早期有 bug
1 2 3 4 5 6 7 8 9
| public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
|
1 2 3 4 5 6 7 8 9 10
| private void setHeadAndPropagate(Node node, int propagate) { setHead(node); if (propagate > 0 && node.waitStatus != 0) { Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } }
|
- 假设存在某次循环中队列里排队的结点情况为
head(-1)->t1(-1)->t2(-1)
- 假设存在将要信号量释放的 T3 和 T4,释放顺序为先 T3 后 T4
正常流程·
产生 bug 的情况·
修复前版本执行流程
- T3 调用 releaseShared(1),直接调用了 unparkSuccessor(head),head 的等待状态从 -1 变为 0
- T1 由于 T3 释放信号量被唤醒,调用 tryAcquireShared,假设返回值为 0(获取锁成功,但没有剩余资源量)
- T4 调用 releaseShared(1),此时 head.waitStatus 为 0(此时读到的 head 和 1 中为同一个head),不满足条件,因此不调用 unparkSuccessor(head)
- T1 获取信号量成功,调用 setHeadAndPropagate 时,因为不满足 propagate > 0(2 的返回值也就是propagate(剩余资源量) == 0),从而不会唤醒后继结点, T2 线程得不到唤醒
bug 修复后·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) { doReleaseShared(); } } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
|
- T3 调用 releaseShared(),直接调用了 unparkSuccessor(head),head 的等待状态从 -1 变为 0
- T1 由于 T3 释放信号量被唤醒,调用 tryAcquireShared,假设返回值为 0(获取锁成功,但没有剩余资源量)
- T4 调用 releaseShared(),此时 head.waitStatus 为 0(此时读到的 head 和 1 中为同一个 head),调用doReleaseShared() 将等待状态置为
PROPAGATE(-3)
- T1 获取信号量成功,调用 setHeadAndPropagate 时,读到 h.waitStatus < 0,从而调用doReleaseShared() 唤醒 T2
CountdownLatch·
用来进行线程同步协作,等待所有线程完成倒计时。其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); new Thread(() -> { log.debug("begin..."); sleep(1); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.debug("begin..."); sleep(2); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.debug("begin..."); sleep(1.5); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); log.debug("waiting..."); latch.await(); log.debug("wait end..."); }
|
输出
1 2 3 4 5 6 7 8
| 18:44:00.778 c.TestCountDownLatch [main] - waiting... 18:44:00.778 c.TestCountDownLatch [Thread-2] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-0] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-1] - begin... 18:44:01.782 c.TestCountDownLatch [Thread-0] - end...2 18:44:02.283 c.TestCountDownLatch [Thread-2] - end...1 18:44:02.782 c.TestCountDownLatch [Thread-1] - end...0 18:44:02.782 c.TestCountDownLatch [main] - wait end...
|
可以配合线程池使用,改进如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); ExecutorService service = Executors.newFixedThreadPool(4); service.submit(() -> { log.debug("begin..."); sleep(1); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(() -> { log.debug("begin..."); sleep(1.5); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(() -> { log.debug("begin..."); sleep(2); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(()->{ try { log.debug("waiting..."); latch.await(); log.debug("wait end..."); } catch (InterruptedException e) { e.printStackTrace(); } }); }
|
输出
1 2 3 4 5 6 7 8
| 18:52:25.831 c.TestCountDownLatch [pool-1-thread-3] - begin... 18:52:25.831 c.TestCountDownLatch [pool-1-thread-1] - begin... 18:52:25.831 c.TestCountDownLatch [pool-1-thread-2] - begin... 18:52:25.831 c.TestCountDownLatch [pool-1-thread-4] - waiting... 18:52:26.835 c.TestCountDownLatch [pool-1-thread-1] - end...2 18:52:27.335 c.TestCountDownLatch [pool-1-thread-2] - end...1 18:52:27.835 c.TestCountDownLatch [pool-1-thread-3] - end...0 18:52:27.835 c.TestCountDownLatch [pool-1-thread-4] - wait end...
|
应用之同步等待多线程准备完毕·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| AtomicInteger num = new AtomicInteger(0); ExecutorService service = Executors.newFixedThreadPool(10, (r) -> { return new Thread(r, "t" + num.getAndIncrement()); }); CountDownLatch latch = new CountDownLatch(10); String[] all = new String[10]; Random r = new Random(); for (int j = 0; j < 10; j++) { int x = j; service.submit(() -> { for (int i = 0; i <= 100; i++) { try { Thread.sleep(r.nextInt(100)); } catch (InterruptedException e) { } all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")"; System.out.print("\r" + Arrays.toString(all)); } latch.countDown(); }); } latch.await(); System.out.println("\n游戏开始..."); service.shutdown();
|
中间输出
1
| [t0(52%), t1(47%), t2(51%), t3(40%), t4(49%), t5(44%), t6(49%), t7(52%), t8(46%), t9(46%)]
|
最后输出
1 2 3
| [t0, t1, t2, t3, t4, t5, t6, t7, t8, t9] 游戏开始...
|
应用之同步等待多个远程调用结束·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @RestController public class TestCountDownlatchController { @GetMapping("/order/{id}") public Map<String, Object> order(@PathVariable int id) { HashMap<String, Object> map = new HashMap<>(); map.put("id", id); map.put("total", "2300.00"); sleep(2000); return map; } @GetMapping("/product/{id}") public Map<String, Object> product(@PathVariable int id) { HashMap<String, Object> map = new HashMap<>(); if (id == 1) { map.put("name", "小爱音箱"); map.put("price", 300); } else if (id == 2) { map.put("name", "小米手机"); map.put("price", 2000); } map.put("id", id); sleep(1000); return map; } @GetMapping("/logistics/{id}") public Map<String, Object> logistics(@PathVariable int id) { HashMap<String, Object> map = new HashMap<>(); map.put("id", id); map.put("name", "中通快递"); sleep(2500); return map; } private void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
rest 远程调用 (有返回值使用Future比较合适)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| RestTemplate restTemplate = new RestTemplate(); log.debug("begin"); ExecutorService service = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(4); Future<Map<String,Object>> f1 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1); return r; }); Future<Map<String, Object>> f2 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1); return r; }); Future<Map<String, Object>> f3 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2); return r; }); Future<Map<String, Object>> f4 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1); return r; }); System.out.println(f1.get()); System.out.println(f2.get()); System.out.println(f3.get()); System.out.println(f4.get()); log.debug("执行完毕"); service.shutdown();
|
执行结果
1 2 3 4 5 6
| 19:51:39.711 c.TestCountDownLatch [main] - begin {total=2300.00, id=1} {price=300, name=小爱音箱, id=1} {price=2000, name=小米手机, id=2} {name=中通快递, id=1} 19:51:42.407 c.TestCountDownLatch [main] - 执行完毕
|
CyclicBarrier·
[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| CyclicBarrier cb = new CyclicBarrier(2); new Thread(()->{ System.out.println("线程1开始.."+new Date()); try { cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程1继续向下运行..."+new Date()); }).start(); new Thread(()->{ System.out.println("线程2开始.."+new Date()); try { Thread.sleep(2000); } catch (InterruptedException e) { } try { cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程2继续向下运行..."+new Date()); }).start();
|
当计数变为0后,下次再调用又会重新从设置的2开始
注意:
- CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』