Java并发编程(四) (共享模型之管程)_ wait和notify原理、同步模式之保护性暂停、异步模式之生产者_消费者、park和unpark原理、线程状态转换·

一、 wait和notify (重点)·

1、小故事·

image.png image.png

2、wait、notify介绍 (必须要获取到锁对象, 才能调用这些方法)·

image.png

  • 当线程0获得到了锁, 成为Monitor的Owner, 但是此时它发现自己想要执行synchroized代码块的条件不满足; 此时它就调用obj.wait方法, 进入到Monitor中的WaitSet集合, 此时线程0的状态就变为WAITING
  • 处于BLOCKED和WAITING状态的线程都为阻塞状态,CPU都不会分给他们时间片。但是有所区别:
    • BLOCKED状态的线程是在竞争锁对象时,发现Monitor的Owner已经是别的线程了 ,此时就会进入EntryList中,并处于BLOCKED状态
    • WAITING状态的线程是获得了对象的锁,但是自身的原因无法执行synchroized的临界区资源需要进入阻塞状态时,锁对象调用了wait方法而进入了WaitSet中,处于WAITING状态
  • 处于BLOCKED状态的线程会在锁被释放的时候被唤醒
  • 处于WAITING状态的线程只有被锁对象调用了notify方法(obj.notify/obj.notifyAll),才会被唤醒。然后它会进入到EntryList, 重新竞争锁 (此时就将锁升级为重量级锁)

3、API介绍·

下面的三个方法都是Object中的方法; 通过锁对象来调用

  • wait(): 让获得对象锁的线程到waitSet中一直等待
  • wait(long n) : 当该等待线程没有被notify, 等待时间到了之后, 也会自动唤醒
  • notify(): 让获得对象锁的线程, 使用锁对象调用notify去waitSet的等待线程中挑一个唤醒
  • notifyAll() : 让获得对象锁的线程, 使用锁对象调用notifyAll去唤醒waitSet中所有的等待线程

它们都是线程之间进行协作的手段, 都属于Object对象的方法, 必须获得此对象的锁, 才能调用这些方法 注:只有当对象被锁以后(成为Owner),才能调用wait和notify方法

1
2
3
4
5
6
7
8
9
10
public class Test1 {
final static Object LOCK = new Object();
public static void main(String[] args) throws InterruptedException {
//只有在对象被锁住后才能调用wait方法
synchronized (LOCK) {
LOCK.wait();
}
}
}

  • 演示wait和notify方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Description:
*
* @author guizy
* @date 2020/12/20 09:12
*/
@Slf4j(topic = "guizy.WaitNotifyTest")
public class WaitNotifyTest {
static final Object obj = new Object();

public static void main(String[] args) throws Exception {

new Thread(() -> {
synchronized (obj) {
log.debug("执行...");
try {
// 只有获得锁对象之后, 才能调用wait/notify
obj.wait(); // 此时t1线程进入WaitSet等待
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码...");
}
}, "t1").start();

new Thread(() -> {
synchronized (obj) {
log.debug("执行...");
try {
obj.wait(); // 此时t2线程进入WaitSet等待
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码...");
}
}, "t2").start();

// 让主线程等两秒在执行,为了`唤醒`,不睡的话,那两个线程还没进入waitSet,主线程就开始唤醒了
Thread.sleep(1000);
log.debug("唤醒waitSet中的线程!");
// 只有获得锁对象之后, 才能调用wait/notify
synchronized (obj) {
// obj.notify(); // 唤醒waitset中的一个线程
obj.notifyAll(); // 唤醒waitset中的全部等待线程
}
}
}

13:01:36.176 guizy.WaitNotifyTest [t1] - 执行… 13:01:36.178 guizy.WaitNotifyTest [t2] - 执行… 13:01:37.175 guizy.WaitNotifyTest [main] - 唤醒waitSet中的线程! 13:01:37.175 guizy.WaitNotifyTest [t2] - 其它代码… 13:01:37.175 guizy.WaitNotifyTest [t1] - 其它代码…

4、Sleep(long n) 和 Wait(long n)的区别 (重点)·

不同点

  • Sleep是Thread类的静态方法,Wait是Object的方法,Object又是所有类的父类,所以所有类都有Wait方法。
  • Sleep在阻塞的时候不会释放锁,而Wait在阻塞的时候会释放锁 (不释放锁的话, 其他线程就无法唤醒该线程了)
  • Sleep方法不需要与synchronized一起使用,而Wait方法需要与synchronized一起使用(wait/notify等方法, 必须要使用对象锁来调用)

相同点

  • 阻塞状态都为TIMED_WAITING (限时等待) 这里指的是带时间限制的wait
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Description: 测试sleep不释放锁
*
* @author guizy
* @date 2020/12/20 09:33
*/
@Slf4j(topic = "guizy.SleepTest")
public class SleepTest {

public static final Object lock = new Object();

public static void main(String[] args) {
new Thread(() -> {
synchronized (lock) {
log.debug("获得锁了");
try {
// Thread.sleep(5000); // 主线程需要等5s才能获得到锁.所以所在sleep期间, 是不会释放锁的
lock.wait(5000); // 调用wait方法会立刻释放锁, 不然主线程就拿不到lock锁了, 当等待5s后程序才结束
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();

// 主线程睡一秒
Sleeper.sleep(1);
synchronized (lock) {
log.debug("获得锁了");
}
}

sleep打印结果 : 表明在sleep期间, 锁是不会被释放的 image.png wait打印结果 : 当调用wait方法后, 锁就会被立刻释放 image.png

5、wait/notify的正确使用·

Step 1 : 逐渐向下优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Slf4j(topic = "guizy.WaitNotifyTest")
public class WaitNotifyTest {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;

public static void main(String[] args) {
//思考下面的解决方案好不好,为什么?
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
Sleeper.sleep(2); // 会阻塞2s, 不会释放锁
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小南").start();

for (int i = 0; i < 5; i++) {
new Thread(() -> {
synchronized (room) {
log.debug("可以开始干活了");
}
}, "其它人").start();
}

Sleeper.sleep(1);
new Thread(() -> {
// 此时没有加锁, 所以会优先于其他人先执行
// 这里能不能加 synchronized (room)?
//synchronized (room) { // 如果加锁的话, 送烟人也需要等待小南睡2s的时间,此时即使送到了,小南线程也将锁释放了..
hasCigarette = true;
log.debug("烟到了噢!");
//}
}, "送烟的").start();
}
}

  • 不给送烟线程加synchronized输出情况

10:16:32.311 guizy.WaitNotifyTest [小南] - 有烟没?[false] 10:16:32.318 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 10:16:33.318 guizy.WaitNotifyTest [送烟的] - 烟到了噢! 10:16:34.320 guizy.WaitNotifyTest [小南] - 有烟没?[true] 10:16:34.320 guizy.WaitNotifyTest [小南] - 可以开始干活了 10:16:34.320 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:34.320 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:34.320 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:34.321 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:34.321 guizy.WaitNotifyTest [其它人] - 可以开始干活了

  • 给送烟线程加synchronized输出情况

10:16:57.565 guizy.WaitNotifyTest [小南] - 有烟没?[false] 10:16:57.570 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 10:16:59.574 guizy.WaitNotifyTest [小南] - 有烟没?[false] 10:16:59.574 guizy.WaitNotifyTest [送烟的] - 烟到了噢! 10:16:59.575 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:59.575 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:59.575 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:59.575 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:59.576 guizy.WaitNotifyTest [其它人] - 可以开始干活了

image.png Step2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j(topic = "guizy.WaitNotifyTest")
public class WaitNotifyTest {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;

public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait(); // 此时进入到waitset等待集合, 同时会释放锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小南").start();

for (int i = 0; i < 5; i++) {
new Thread(() -> {
// 小南进入等待状态了, 其他线程就可以获得锁了
synchronized (room) {
log.debug("可以开始干活了");
}
}, "其它人").start();
}

Sleeper.sleep(1);
new Thread(() -> {
synchronized (room) {
hasCigarette = true;
log.debug("烟到了噢!");
room.notify();
}
}, "送烟的").start();
}
}

11:00:51.840 guizy.WaitNotifyTest [小南] - 有烟没?[false] 11:00:51.847 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:52.847 guizy.WaitNotifyTest [送烟的] - 烟到了噢! 11:00:52.847 guizy.WaitNotifyTest [小南] - 有烟没?[true] 11:00:52.848 guizy.WaitNotifyTest [小南] - 可以开始干活了

如果此时除了小南在等待唤醒, 还有一个线程也在等待唤醒呢? 此时的notify方法会唤醒谁呢? image.png Step3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Slf4j(topic = "guizy.WaitNotifyTest")
public class WaitNotifyTest {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;

public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait(); // 此时进入到waitset等待集合, 同时会释放锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小南").start();

new Thread(() -> {
synchronized (room) {
log.debug("外卖送到没?[{}]", hasTakeout);
if (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小女").start();

Sleeper.sleep(1);
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notify();
}
}, "送外卖的").start();
}
}

问题: 当外卖送到了, 却唤醒了小南, 此时就出现了问题 image.png Step4:

1
2
3
4
5
6
7
8
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notifyAll();
}
}, "送外卖的").start();

11:14:53.670 guizy.WaitNotifyTest [小南] - 有烟没?[false] 11:14:53.676 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 11:14:53.676 guizy.WaitNotifyTest [小女] - 外卖送到没?[false] 11:14:53.676 guizy.WaitNotifyTest [小女] - 没外卖,先歇会! 11:14:54.674 guizy.WaitNotifyTest [送外卖的] - 外卖到了噢! 11:14:54.674 guizy.WaitNotifyTest [小女] - 外卖送到没?[true] 11:14:54.674 guizy.WaitNotifyTest [小女] - 可以开始干活了 11:14:54.675 guizy.WaitNotifyTest [小南] - 有烟没?[false]

还是唤醒了小南, 小南还是回去看看送来的是外卖还是烟. 很麻烦, 怎么解决? image.png Step5:

  • 使用while循环来解决虚假唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Slf4j(topic = "guizy.WaitNotifyTest")
public class Main {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;

public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait(); // 此时进入到waitset等待集合, 同时会释放锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小南").start();

new Thread(() -> {
synchronized (room) {
log.debug("外卖送到没?[{}]", hasTakeout);
while (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小女").start();

Sleeper.sleep(1);
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notifyAll();
}
}, "送外卖的").start();
}
}

11:19:25.275 guizy.WaitNotifyTest [小南] - 有烟没?[false] 11:19:25.282 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 11:19:25.282 guizy.WaitNotifyTest [小女] - 外卖送到没?[false] 11:19:25.283 guizy.WaitNotifyTest [小女] - 没外卖,先歇会! 11:19:26.287 guizy.WaitNotifyTest [送外卖的] - 外卖到了噢! 11:19:26.287 guizy.WaitNotifyTest [小女] - 外卖送到没?[true] 11:19:26.287 guizy.WaitNotifyTest [小女] - 可以开始干活了 11:19:26.288 guizy.WaitNotifyTest [小南] - 没烟,先歇会!

因为改为while如果唤醒之后, 就在while循环中执行了, 不会跑到while外面去执行"有烟没…", 此时小南就不需要每次notify, 就去看是不是送来的烟, 如果是烟, while就为false了.

二、同步模式之保护性暂停 (join、Future的实现)·

  • 即Guarded Suspension,用在一个线程等待另一个线程的执行结果
    • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
    • 如果有结果不断从一个线程到另一个线程 那么可以使用消息队列(见生产者/消费者)
    • JDK 中,join 的实现、Future 的实现,采用的就是此模式
    • 因为要等待另一方的结果,因此归类到同步模式

image.png

  • 一方等待另一方的执行结果举例 :
  • 举例, 线程1等待线程2下载的结果,并获取该结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* Description: 多线程同步模式 - 一个线程需要等待另一个线程的执行结果
*
* @author guizy1
* @date 2020/12/21 14:51
*/
@Slf4j(topic = "guizy.GuardeObjectTest")
public class GuardeObjectTest {
public static void main(String[] args) {
// 线程1等待线程2的下载结果
GuardeObject guardeObject = new GuardeObject();
new Thread(() -> {
log.debug("等待结果");
List<String> list = (List<String>) guardeObject.get();
log.debug("结果大小:{}", list.size());
}, "t1").start();

new Thread(() -> {
log.debug("执行下载");
try {
List<String> list = Downloader.download();
guardeObject.complete(list);
} catch (IOException e) {
e.printStackTrace();
}

}, "t2").start();
}
}

class GuardeObject {
// 结果
private Object response;

// 获取结果
public Object get() {
synchronized (this) {
// 防止虚假唤醒
// 没有结果
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}

// 产生结果
public void complete(Object response) {
synchronized (this) {
// 给结果变量赋值
this.response = response;
this.notifyAll();
}
}
}

  • 线程t1 等待 线程t2的结果, 可以设置超时时间, 如果超过时间还没返回结果,此时就不等了.退出while循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Slf4j(topic = "guizy.GuardeObjectTest")
public class GuardeObjectTest {
public static void main(String[] args) {
// 线程1等待线程2的下载结果
GuardeObject guardeObject = new GuardeObject();
new Thread(() -> {
log.debug("begin");
Object obj = guardeObject.get(2000);
log.debug("结果是:{}", obj);
}, "t1").start();

new Thread(() -> {
log.debug("begin");
// Sleeper.sleep(1); // 在等待时间内
Sleeper.sleep(3);
guardeObject.complete(new Object());
}, "t2").start();
}
}

class GuardeObject {
// 结果
private Object response;

// 获取结果
// timeout表示等待多久. 这里假如是2s
public Object get(long timeout) {
synchronized (this) {
// 假如开始时间为 15:00:00
long begin = System.currentTimeMillis();
// 经历的时间
long passedTime = 0;
while (response == null) {
// 这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
// 经历的时间超过了最大等待时间, 退出循环
if (waitTime <= 0) {
break;
}
try {
// this.wait(timeout)的问题: 虚假唤醒在15:00:01的时候,此时response还null, 此时经历时间就为1s,
// 进入while循环的时候response还是空,此时判断1s<=timeout 2s,此时再次this.wait(2s)吗,此时已经经历了
// 1s,所以只要再等1s就可以了. 所以等待的时间应该是 超时时间(timeout) - 经历的时间(passedTime)
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 经历时间
passedTime = System.currentTimeMillis() - begin; // 15:00:02
}
return response;
}
}

// 产生结果
public void complete(Object response) {
synchronized (this) {
// 给结果变量赋值
this.response = response;
this.notifyAll();
}
}

// 在等待时间内的情况 16:20:41.627 guizy.GuardeObjectTest [t1] - begin 16:20:41.627 guizy.GuardeObjectTest [t2] - begin 16:20:42.633 guizy.GuardeObjectTest [t1] - 结果是:java.lang.Object@1e1d0168

// 超时的情况 16:21:24.663 guizy.GuardeObjectTest [t2] - begin 16:21:24.663 guizy.GuardeObjectTest [t1] - begin 16:21:26.667 guizy.GuardeObjectTest [t1] - 结果是:null

  • 关于超时的增强,在join(long millis) 的源码中得到了体现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
// join一个指定的时间
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

  • 多任务版GuardedObject图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类。
  • 不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个产生结果的线程和使用结果的线程是一一对应的关系,但是生产者消费者模式并不是。
  • rpc框架的调用中就使用到了这种模式。

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* Description: 同步模式保护性暂停模式 (多任务版)
*
* @author guizy1
* @date 2020/12/21 14:51
*/
@Slf4j(topic = "guizy.GuardedObjectTest")
public class GuardedObjectTest {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new People().start();
}
Sleeper.sleep(1);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}

@Slf4j(topic = "guizy.People")
class People extends Thread {
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}

@Slf4j(topic = "guizy.Postman")
// 邮寄员类
class Postman extends Thread {
private int id;
private String mail;

public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}

@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}

// 信箱类
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();

private static int id = 1;

// 产生唯一 id
private static synchronized int generateId() {
return id++;
}

public static GuardedObject getGuardedObject(int id) {
//根据id获取到box并删除对应的key和value,避免堆内存爆了
return boxes.remove(id);
}

public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}

public static Set<Integer> getIds() {
return boxes.keySet();
}
}

// 用来传递信息的作用, 当多个类使用GuardedObject,就很不方便,此时需要一个设计一个解耦的中间类
class GuardedObject {
// 标记GuardedObject
private int id;
// 结果
private Object response;

public int getId() {
return id;
}

public GuardedObject(int id) {
this.id = id;
}

// 获取结果
// timeout表示等待多久. 这里假如是2s
public Object get(long timeout) {
synchronized (this) {
// 假如开始时间为 15:00:00
long begin = System.currentTimeMillis();
// 经历的时间
long passedTime = 0;
while (response == null) {
// 这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
// 经历的时间超过了最大等待时间, 退出循环
if (waitTime <= 0) {
break;
}
try {
// this.wait(timeout)的问题: 虚假唤醒在15:00:01的时候,此时response还null, 此时经历时间就为1s,
// 进入while循环的时候response还是空,此时判断1s<=timeout 2s,此时再次this.wait(2s)吗,此时已经经历了
// 1s,所以只要再等1s就可以了. 所以等待的时间应该是 超时时间(timeout) - 经历的时间(passedTime)
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 经历时间
passedTime = System.currentTimeMillis() - begin; // 15:00:02
}
return response;
}
}

// 产生结果
public void complete(Object response) {
synchronized (this) {
// 给结果变量赋值
this.response = response;
this.notifyAll();
}
}
}

三、异步模式之生产者/消费者 (重点)·

  • 与前面的保护性暂停中的 GuardedObject 不同,不需要产生结果和消费结果的线程一一对应 (一个生产一个消费)
  • 消费队列 可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种 阻塞队列,采用的就是这种模式

异步模式中, 生产者产生消息之后消息没有被立刻消费同步模式中, 消息在产生之后被立刻消费了。 image.png

  • 我们下面写的小例子是线程间通信的消息队列,要注意区别,像RabbitMQ等消息框架是进程间通信的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* Description: 异步模式之生产者/消费者
*
* @author guizy1
* @date 2020/12/21 18:23
*/
@Slf4j(topic = "giuzy.ProductConsumerTest")
public class ProductConsumerTest {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);

for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id, "值" + id));
}, "生产者" + i).start();
}

new Thread(() -> {
while (true) {
Sleeper.sleep(1);
Message message = queue.take();
}
}, "消费者").start();
}

}

// 消息队列类,在线程之间通信
@Slf4j(topic = "guizy.MessageQueue")
class MessageQueue {
// 消息的队列集合
private LinkedList<Message> list = new LinkedList<>();
// 队列容量
private int capcity;

public MessageQueue(int capcity) {
this.capcity = capcity;
}

// 获取消息
public Message take() {
// 检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
log.debug("队列为空, 消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列头部获取消息并返回
Message message = list.removeFirst();
log.debug("已消费消息 {}", message);
list.notifyAll();
return message;
}
}

// 存入消息
public void put(Message message) {
synchronized (list) {
// 检查对象是否已满
while (list.size() == capcity) {
try {
log.debug("队列已满, 生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(message);
log.debug("已生产消息 {}", message);
list.notifyAll();
}
}
}

final class Message {
private int id;
private Object value;

public Message(int id, Object value) {
this.id = id;
this.value = value;
}

public int getId() {
return id;
}

public Object getValue() {
return value;
}

@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}

18:52:53.440 guizy.MessageQueue [生产者1] - 已生产消息 Message{id=1, value=值1} 18:52:53.443 guizy.MessageQueue [生产者0] - 已生产消息 Message{id=0, value=值0} 18:52:53.444 guizy.MessageQueue [生产者2] - 队列已满, 生产者线程等待 18:52:54.439 guizy.MessageQueue [消费者] - 已消费消息 Message{id=1, value=值1} 18:52:54.439 guizy.MessageQueue [生产者2] - 已生产消息 Message{id=2, value=值2} 18:52:55.439 guizy.MessageQueue [消费者] - 已消费消息 Message{id=0, value=值0} 18:52:56.440 guizy.MessageQueue [消费者] - 已消费消息 Message{id=2, value=值2} 18:52:57.441 guizy.MessageQueue [消费者] - 队列为空, 消费者线程等待

四、 park & unpack (重要)·

1、基本使用·

  • park/unpark都是LockSupport类中的的方法
  • 先调用unpark后,再调用park, 此时park不会暂停线程
1
2
3
4
5
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(thread);

特点:

  • wait,notify,notifyAll必须配合Object Monitor 一起使用,而unpark不必
  • park & unpark是以线程为单位来阻塞唤醒线程,而notify是能随机唤醒一个等待的线程,notifyAll是唤醒所有等待线程,就不那么精确
  • park & unpark可以先unpark,而wait & notify不能先notify

2、 park、 unpark 原理·

每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond和 _mutex

  • 打个比喻线程就像一个旅人,Parker 就像他随身携带的背包,条件变量 _ cond就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
  • 调用 park 就是要看需不需要停下来歇息
    • 如果备用干粮耗尽,那么钻进帐篷歇息
    • 如果备用干粮充足,那么不需停留,继续前进
  • 调用 unpark,就好比令干粮充足
    • 如果这时线程还在帐篷,就唤醒让他继续前进
    • 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
    • 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮

先调用park再调用upark的过程·

先调用park的情况·

  • 当前线程调用 Unsafe.park() 方法

  • 检查 _counter, 本情况为0, 这时, 获得_mutex 互斥锁 (mutex对象有个等待队列 _cond)

  • 线程进入 _cond 条件变量阻塞

  • 设置_counter = 0 (没干粮了)

  • image.png

  • 调用unpark

    • 调用Unsafe.unpark(Thread_0)方法,设置_counter 为 1
    • 唤醒 _cond 条件变量中的 Thread_0
    • Thread_0 恢复运行
    • 设置 _counter 为 0

image.png

先调用upark再调用park的过程·

  • 调用 Unsafe.unpark(Thread_0)方法,设置 _counter 为 1
  • 当前线程调用 Unsafe.park() 方法
  • 检查 _counter,本情况为 1,这时线程 无需阻塞,继续运行
  • 设置 _counter 为 0

image.png

五、 线程状态转换 (重点)·

image.png image.png 假设有线程 Thread t

  • 1、NEW <–> RUNNABLE
    • t.start()方法时, NEW --> RUNNABLE
  • 2、RUNNABLE <–> WAITING
    • 线程用synchronized(obj)获取了对象锁后
      • 调用 obj.wait()方法时,t 线程进入waitSet中, 从RUNNABLE --> WAITING
      • 调用 obj.notify(),obj.notifyAll(),t.interrupt() 时, 唤醒的线程都到entrySet阻塞队列成为BLOCKED状态, 在阻塞队列,和其他线程再进行 竞争锁
        • 竞争锁成功,t 线程从 WAITING --> RUNNABLE
        • 竞争锁失败,t 线程从 WAITING --> BLOCKED
  • 3、RUNNABLE <–> WAITING
    • 当前线程调用 t.join() 方法时,当前线程从 RUNNABLE --> WAITING ,注意是当前线程在t线程对象在waitSet上等待
    • t 线程运行结束,或调用了当前线程的 interrupt() 时当前线程从 WAITING --> RUNNABLE
  • 4、RUNNABLE <–> WAITING
    • 当前线程调用 LockSupport.park() 方法会让当前线程从RUNNABLE --> WAITING
    • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING --> RUNNABLE

Runnable 和 Timed-Waiting的相互转换

  • 5、RUNNABLE <–> TIMED_WAITING (带超时时间的wait)
    • t 线程用synchronized(obj) 获取了对象锁后
      • 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE --> TIMED_WAITING
      • t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时; 唤醒的线程都到entrySet阻塞队列成为BLOCKED状态, 在阻塞队列,和其他线程再进行 竞争锁
        • 竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE
        • 竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED
  • 6、RUNNABLE <–> TIMED_WAITING
    • 当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE --> TIMED_WAITING 注意是当前线程在t 线程对象的waitSet等待
    • 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING --> RUNNABLE
  • 7、RUNNABLE <–> TIMED_WAITING
    • 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
    • 当前线程等待时间超过了 n 毫秒或调用了线程的 interrupt() ,当前线程从 TIMED_WAITING --> RUNNABLE
  • 8、RUNNABLE <–> TIMED_WAITING
    • 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线程从 RUNNABLE --> TIMED_WAITING
    • 调用LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE
  • 9、RUNNABLE <–> BLOCKED
    • t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED, 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED
  • 10、 RUNNABLE <–> TERMINATED
    • 当前线程所有代码运行完毕,进入 TERMINATED