Java并发编程(四) (共享模型之管程)_ wait和notify原理、同步模式之保护性暂停、异步模式之生产者_消费者、park和unpark原理、线程状态转换·
一、 wait和notify (重点)·
1、小故事·
2、wait、notify介绍 (必须要获取到锁对象, 才能调用这些方法)·
- 当线程0获得到了锁, 成为Monitor的Owner, 但是此时它发现自己想要执行synchroized代码块的条件不满足; 此时它就调用obj.wait方法, 进入到Monitor中的WaitSet集合, 此时线程0的状态就变为WAITING
- 处于BLOCKED和WAITING状态的线程都为阻塞状态,CPU都不会分给他们时间片。但是有所区别:
- BLOCKED状态的线程是在竞争锁对象时,发现Monitor的Owner已经是别的线程了 ,此时就会进入EntryList中,并处于BLOCKED状态
- WAITING状态的线程是获得了对象的锁,但是自身的原因无法执行synchroized的临界区资源需要进入阻塞状态时,锁对象调用了wait方法而进入了WaitSet中,处于WAITING状态
- 处于BLOCKED状态的线程会在锁被释放的时候被唤醒
- 处于WAITING状态的线程只有被锁对象调用了notify方法(obj.notify/obj.notifyAll),才会被唤醒。然后它会进入到EntryList, 重新竞争锁 (此时就将锁升级为重量级锁)
3、API介绍·
下面的三个方法都是Object中的方法; 通过锁对象来调用
- wait(): 让获得对象锁的线程到waitSet中一直等待
- wait(long n) : 当该等待线程没有被notify, 等待时间到了之后, 也会自动唤醒
- notify(): 让获得对象锁的线程, 使用锁对象调用notify去waitSet的等待线程中挑一个唤醒
- notifyAll() : 让获得对象锁的线程, 使用锁对象调用notifyAll去唤醒waitSet中所有的等待线程
它们都是线程之间进行协作的手段, 都属于Object对象的方法, 必须获得此对象的锁, 才能调用这些方法 注:只有当对象被锁以后(成为Owner),才能调用wait和notify方法
1 2 3 4 5 6 7 8 9 10
| public class Test1 { final static Object LOCK = new Object(); public static void main(String[] args) throws InterruptedException { synchronized (LOCK) { LOCK.wait(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
@Slf4j(topic = "guizy.WaitNotifyTest") public class WaitNotifyTest { static final Object obj = new Object();
public static void main(String[] args) throws Exception {
new Thread(() -> { synchronized (obj) { log.debug("执行..."); try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码..."); } }, "t1").start();
new Thread(() -> { synchronized (obj) { log.debug("执行..."); try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码..."); } }, "t2").start();
Thread.sleep(1000); log.debug("唤醒waitSet中的线程!"); synchronized (obj) { obj.notifyAll(); } } }
|
13:01:36.176 guizy.WaitNotifyTest [t1] - 执行… 13:01:36.178 guizy.WaitNotifyTest [t2] - 执行… 13:01:37.175 guizy.WaitNotifyTest [main] - 唤醒waitSet中的线程! 13:01:37.175 guizy.WaitNotifyTest [t2] - 其它代码… 13:01:37.175 guizy.WaitNotifyTest [t1] - 其它代码…
4、Sleep(long n) 和 Wait(long n)的区别 (重点)·
不同点
- Sleep是Thread类的静态方法,Wait是Object的方法,Object又是所有类的父类,所以所有类都有Wait方法。
- Sleep在阻塞的时候不会释放锁,而Wait在阻塞的时候会释放锁 (不释放锁的话, 其他线程就无法唤醒该线程了)
- Sleep方法不需要与synchronized一起使用,而Wait方法需要与synchronized一起使用(wait/notify等方法, 必须要使用对象锁来调用)
相同点
- 阻塞状态都为TIMED_WAITING (限时等待) 这里指的是带时间限制的wait
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
@Slf4j(topic = "guizy.SleepTest") public class SleepTest {
public static final Object lock = new Object();
public static void main(String[] args) { new Thread(() -> { synchronized (lock) { log.debug("获得锁了"); try { lock.wait(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start();
Sleeper.sleep(1); synchronized (lock) { log.debug("获得锁了"); } }
|
sleep打印结果 : 表明在sleep期间, 锁是不会被释放的 wait打印结果 : 当调用wait方法后, 锁就会被立刻释放
5、wait/notify的正确使用·
Step 1 : 逐渐向下优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Slf4j(topic = "guizy.WaitNotifyTest") public class WaitNotifyTest { static final Object room = new Object(); static boolean hasCigarette = false; static boolean hasTakeout = false;
public static void main(String[] args) { new Thread(() -> { synchronized (room) { log.debug("有烟没?[{}]", hasCigarette); if (!hasCigarette) { log.debug("没烟,先歇会!"); Sleeper.sleep(2); } log.debug("有烟没?[{}]", hasCigarette); if (hasCigarette) { log.debug("可以开始干活了"); } } }, "小南").start();
for (int i = 0; i < 5; i++) { new Thread(() -> { synchronized (room) { log.debug("可以开始干活了"); } }, "其它人").start(); }
Sleeper.sleep(1); new Thread(() -> { hasCigarette = true; log.debug("烟到了噢!"); }, "送烟的").start(); } }
|
10:16:32.311 guizy.WaitNotifyTest [小南] - 有烟没?[false] 10:16:32.318 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 10:16:33.318 guizy.WaitNotifyTest [送烟的] - 烟到了噢! 10:16:34.320 guizy.WaitNotifyTest [小南] - 有烟没?[true] 10:16:34.320 guizy.WaitNotifyTest [小南] - 可以开始干活了 10:16:34.320 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:34.320 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:34.320 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:34.321 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:34.321 guizy.WaitNotifyTest [其它人] - 可以开始干活了
10:16:57.565 guizy.WaitNotifyTest [小南] - 有烟没?[false] 10:16:57.570 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 10:16:59.574 guizy.WaitNotifyTest [小南] - 有烟没?[false] 10:16:59.574 guizy.WaitNotifyTest [送烟的] - 烟到了噢! 10:16:59.575 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:59.575 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:59.575 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:59.575 guizy.WaitNotifyTest [其它人] - 可以开始干活了 10:16:59.576 guizy.WaitNotifyTest [其它人] - 可以开始干活了
Step2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| @Slf4j(topic = "guizy.WaitNotifyTest") public class WaitNotifyTest { static final Object room = new Object(); static boolean hasCigarette = false; static boolean hasTakeout = false;
public static void main(String[] args) { new Thread(() -> { synchronized (room) { log.debug("有烟没?[{}]", hasCigarette); if (!hasCigarette) { log.debug("没烟,先歇会!"); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("有烟没?[{}]", hasCigarette); if (hasCigarette) { log.debug("可以开始干活了"); } } }, "小南").start();
for (int i = 0; i < 5; i++) { new Thread(() -> { synchronized (room) { log.debug("可以开始干活了"); } }, "其它人").start(); }
Sleeper.sleep(1); new Thread(() -> { synchronized (room) { hasCigarette = true; log.debug("烟到了噢!"); room.notify(); } }, "送烟的").start(); } }
|
11:00:51.840 guizy.WaitNotifyTest [小南] - 有烟没?[false] 11:00:51.847 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:51.847 guizy.WaitNotifyTest [其它人] - 可以开始干活了 11:00:52.847 guizy.WaitNotifyTest [送烟的] - 烟到了噢! 11:00:52.847 guizy.WaitNotifyTest [小南] - 有烟没?[true] 11:00:52.848 guizy.WaitNotifyTest [小南] - 可以开始干活了
如果此时除了小南在等待唤醒, 还有一个线程也在等待唤醒呢? 此时的notify方法会唤醒谁呢? Step3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Slf4j(topic = "guizy.WaitNotifyTest") public class WaitNotifyTest { static final Object room = new Object(); static boolean hasCigarette = false; static boolean hasTakeout = false;
public static void main(String[] args) { new Thread(() -> { synchronized (room) { log.debug("有烟没?[{}]", hasCigarette); if (!hasCigarette) { log.debug("没烟,先歇会!"); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("有烟没?[{}]", hasCigarette); if (hasCigarette) { log.debug("可以开始干活了"); } } }, "小南").start();
new Thread(() -> { synchronized (room) { log.debug("外卖送到没?[{}]", hasTakeout); if (!hasTakeout) { log.debug("没外卖,先歇会!"); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("外卖送到没?[{}]", hasTakeout); if (hasTakeout) { log.debug("可以开始干活了"); } else { log.debug("没干成活..."); } } }, "小女").start();
Sleeper.sleep(1); new Thread(() -> { synchronized (room) { hasTakeout = true; log.debug("外卖到了噢!"); room.notify(); } }, "送外卖的").start(); } }
|
问题: 当外卖送到了, 却唤醒了小南, 此时就出现了问题 Step4:
1 2 3 4 5 6 7 8
| new Thread(() -> { synchronized (room) { hasTakeout = true; log.debug("外卖到了噢!"); room.notifyAll(); } }, "送外卖的").start();
|
11:14:53.670 guizy.WaitNotifyTest [小南] - 有烟没?[false] 11:14:53.676 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 11:14:53.676 guizy.WaitNotifyTest [小女] - 外卖送到没?[false] 11:14:53.676 guizy.WaitNotifyTest [小女] - 没外卖,先歇会! 11:14:54.674 guizy.WaitNotifyTest [送外卖的] - 外卖到了噢! 11:14:54.674 guizy.WaitNotifyTest [小女] - 外卖送到没?[true] 11:14:54.674 guizy.WaitNotifyTest [小女] - 可以开始干活了 11:14:54.675 guizy.WaitNotifyTest [小南] - 有烟没?[false]
还是唤醒了小南, 小南还是回去看看送来的是外卖还是烟. 很麻烦, 怎么解决? Step5:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Slf4j(topic = "guizy.WaitNotifyTest") public class Main { static final Object room = new Object(); static boolean hasCigarette = false; static boolean hasTakeout = false;
public static void main(String[] args) { new Thread(() -> { synchronized (room) { log.debug("有烟没?[{}]", hasCigarette); while (!hasCigarette) { log.debug("没烟,先歇会!"); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("有烟没?[{}]", hasCigarette); if (hasCigarette) { log.debug("可以开始干活了"); } } }, "小南").start();
new Thread(() -> { synchronized (room) { log.debug("外卖送到没?[{}]", hasTakeout); while (!hasTakeout) { log.debug("没外卖,先歇会!"); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("外卖送到没?[{}]", hasTakeout); if (hasTakeout) { log.debug("可以开始干活了"); } else { log.debug("没干成活..."); } } }, "小女").start();
Sleeper.sleep(1); new Thread(() -> { synchronized (room) { hasTakeout = true; log.debug("外卖到了噢!"); room.notifyAll(); } }, "送外卖的").start(); } }
|
11:19:25.275 guizy.WaitNotifyTest [小南] - 有烟没?[false] 11:19:25.282 guizy.WaitNotifyTest [小南] - 没烟,先歇会! 11:19:25.282 guizy.WaitNotifyTest [小女] - 外卖送到没?[false] 11:19:25.283 guizy.WaitNotifyTest [小女] - 没外卖,先歇会! 11:19:26.287 guizy.WaitNotifyTest [送外卖的] - 外卖到了噢! 11:19:26.287 guizy.WaitNotifyTest [小女] - 外卖送到没?[true] 11:19:26.287 guizy.WaitNotifyTest [小女] - 可以开始干活了 11:19:26.288 guizy.WaitNotifyTest [小南] - 没烟,先歇会!
因为改为while如果唤醒之后, 就在while循环中执行了, 不会跑到while外面去执行"有烟没…", 此时小南就不需要每次notify, 就去看是不是送来的烟, 如果是烟, while就为false了.
二、同步模式之保护性暂停 (join、Future的实现)·
- 即Guarded Suspension,用在一个线程等待另一个线程的执行结果
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程 那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
- 一方等待另一方的执行结果举例 :
- 举例, 线程1等待线程2下载的结果,并获取该结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
@Slf4j(topic = "guizy.GuardeObjectTest") public class GuardeObjectTest { public static void main(String[] args) { GuardeObject guardeObject = new GuardeObject(); new Thread(() -> { log.debug("等待结果"); List<String> list = (List<String>) guardeObject.get(); log.debug("结果大小:{}", list.size()); }, "t1").start();
new Thread(() -> { log.debug("执行下载"); try { List<String> list = Downloader.download(); guardeObject.complete(list); } catch (IOException e) { e.printStackTrace(); }
}, "t2").start(); } }
class GuardeObject { private Object response;
public Object get() { synchronized (this) { while (response == null) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } }
public void complete(Object response) { synchronized (this) { this.response = response; this.notifyAll(); } } }
|
- 线程t1 等待 线程t2的结果, 可以设置超时时间, 如果超过时间还没返回结果,此时就不等了.退出while循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| @Slf4j(topic = "guizy.GuardeObjectTest") public class GuardeObjectTest { public static void main(String[] args) { GuardeObject guardeObject = new GuardeObject(); new Thread(() -> { log.debug("begin"); Object obj = guardeObject.get(2000); log.debug("结果是:{}", obj); }, "t1").start();
new Thread(() -> { log.debug("begin"); Sleeper.sleep(3); guardeObject.complete(new Object()); }, "t2").start(); } }
class GuardeObject { private Object response;
public Object get(long timeout) { synchronized (this) { long begin = System.currentTimeMillis(); long passedTime = 0; while (response == null) { long waitTime = timeout - passedTime; if (waitTime <= 0) { break; } try { this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } }
public void complete(Object response) { synchronized (this) { this.response = response; this.notifyAll(); } }
|
// 在等待时间内的情况 16:20:41.627 guizy.GuardeObjectTest [t1] - begin 16:20:41.627 guizy.GuardeObjectTest [t2] - begin 16:20:42.633 guizy.GuardeObjectTest [t1] - 结果是:java.lang.Object@1e1d0168
// 超时的情况 16:21:24.663 guizy.GuardeObjectTest [t2] - begin 16:21:24.663 guizy.GuardeObjectTest [t1] - begin 16:21:26.667 guizy.GuardeObjectTest [t1] - 结果是:null
- 关于超时的增强,在join(long millis) 的源码中得到了体现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0;
if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); }
if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
|
- 多任务版GuardedObject图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类。
- 不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个产生结果的线程和使用结果的线程是一一对应的关系,但是生产者消费者模式并不是。
- rpc框架的调用中就使用到了这种模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
@Slf4j(topic = "guizy.GuardedObjectTest") public class GuardedObjectTest { public static void main(String[] args) { for (int i = 0; i < 3; i++) { new People().start(); } Sleeper.sleep(1); for (Integer id : Mailboxes.getIds()) { new Postman(id, "内容" + id).start(); } } }
@Slf4j(topic = "guizy.People") class People extends Thread { @Override public void run() { GuardedObject guardedObject = Mailboxes.createGuardedObject(); log.debug("开始收信 id:{}", guardedObject.getId()); Object mail = guardedObject.get(5000); log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail); } }
@Slf4j(topic = "guizy.Postman")
class Postman extends Thread { private int id; private String mail;
public Postman(int id, String mail) { this.id = id; this.mail = mail; }
@Override public void run() { GuardedObject guardedObject = Mailboxes.getGuardedObject(id); log.debug("送信 id:{}, 内容:{}", id, mail); guardedObject.complete(mail); } }
class Mailboxes { private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
private static synchronized int generateId() { return id++; }
public static GuardedObject getGuardedObject(int id) { return boxes.remove(id); }
public static GuardedObject createGuardedObject() { GuardedObject go = new GuardedObject(generateId()); boxes.put(go.getId(), go); return go; }
public static Set<Integer> getIds() { return boxes.keySet(); } }
class GuardedObject { private int id; private Object response;
public int getId() { return id; }
public GuardedObject(int id) { this.id = id; }
public Object get(long timeout) { synchronized (this) { long begin = System.currentTimeMillis(); long passedTime = 0; while (response == null) { long waitTime = timeout - passedTime; if (waitTime <= 0) { break; } try { this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } }
public void complete(Object response) { synchronized (this) { this.response = response; this.notifyAll(); } } }
|
三、异步模式之生产者/消费者 (重点)·
- 与前面的保护性暂停中的 GuardedObject 不同,不需要产生结果和消费结果的线程一一对应 (一个生产一个消费)
- 消费队列 可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种 阻塞队列,采用的就是这种模式
异步模式中, 生产者产生消息之后消息没有被立刻消费同步模式中, 消息在产生之后被立刻消费了。
- 我们下面写的小例子是线程间通信的消息队列,要注意区别,像RabbitMQ等消息框架是进程间通信的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
@Slf4j(topic = "giuzy.ProductConsumerTest") public class ProductConsumerTest { public static void main(String[] args) { MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) { int id = i; new Thread(() -> { queue.put(new Message(id, "值" + id)); }, "生产者" + i).start(); }
new Thread(() -> { while (true) { Sleeper.sleep(1); Message message = queue.take(); } }, "消费者").start(); }
}
@Slf4j(topic = "guizy.MessageQueue") class MessageQueue { private LinkedList<Message> list = new LinkedList<>(); private int capcity;
public MessageQueue(int capcity) { this.capcity = capcity; }
public Message take() { synchronized (list) { while (list.isEmpty()) { try { log.debug("队列为空, 消费者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.removeFirst(); log.debug("已消费消息 {}", message); list.notifyAll(); return message; } }
public void put(Message message) { synchronized (list) { while (list.size() == capcity) { try { log.debug("队列已满, 生产者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.addLast(message); log.debug("已生产消息 {}", message); list.notifyAll(); } } }
final class Message { private int id; private Object value;
public Message(int id, Object value) { this.id = id; this.value = value; }
public int getId() { return id; }
public Object getValue() { return value; }
@Override public String toString() { return "Message{" + "id=" + id + ", value=" + value + '}'; } }
|
18:52:53.440 guizy.MessageQueue [生产者1] - 已生产消息 Message{id=1, value=值1} 18:52:53.443 guizy.MessageQueue [生产者0] - 已生产消息 Message{id=0, value=值0} 18:52:53.444 guizy.MessageQueue [生产者2] - 队列已满, 生产者线程等待 18:52:54.439 guizy.MessageQueue [消费者] - 已消费消息 Message{id=1, value=值1} 18:52:54.439 guizy.MessageQueue [生产者2] - 已生产消息 Message{id=2, value=值2} 18:52:55.439 guizy.MessageQueue [消费者] - 已消费消息 Message{id=0, value=值0} 18:52:56.440 guizy.MessageQueue [消费者] - 已消费消息 Message{id=2, value=值2} 18:52:57.441 guizy.MessageQueue [消费者] - 队列为空, 消费者线程等待
四、 park & unpack (重要)·
1、基本使用·
- park/unpark都是LockSupport类中的的方法
- 先调用unpark后,再调用park, 此时park不会暂停线程
1 2 3 4 5
| LockSupport.park();
LockSupport.unpark(thread);
|
特点:
wait
,notify
,notifyAll
必须配合Object Monitor
一起使用,而unpark不必park & unpark
是以线程为单位来阻塞和唤醒线程,而notify
是能随机唤醒一个等待的线程,notifyAll
是唤醒所有等待线程,就不那么精确park & unpark
可以先unpark
,而wait & notify
不能先notify
2、 park、 unpark 原理·
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond和 _mutex
- 打个比喻线程就像一个旅人,Parker 就像他随身携带的背包,条件变量 _ cond就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
- 调用 park 就是要看需不需要停下来歇息
- 如果备用干粮耗尽,那么钻进帐篷歇息
- 如果备用干粮充足,那么不需停留,继续前进
- 调用 unpark,就好比令干粮充足
- 如果这时线程还在帐篷,就唤醒让他继续前进
- 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
- 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
先调用park再调用upark的过程·
先调用park的情况·
先调用upark再调用park的过程·
- 调用 Unsafe.unpark(Thread_0)方法,设置 _counter 为 1
- 当前线程调用 Unsafe.park() 方法
- 检查 _counter,本情况为 1,这时线程 无需阻塞,继续运行
- 设置 _counter 为 0
五、 线程状态转换 (重点)·
假设有线程 Thread t
- 1、NEW <–> RUNNABLE
- t.start()方法时, NEW --> RUNNABLE
- 2、RUNNABLE <–> WAITING
- 线程用synchronized(obj)获取了对象锁后
- 调用 obj.wait()方法时,t 线程进入waitSet中, 从RUNNABLE --> WAITING
- 调用 obj.notify(),obj.notifyAll(),t.interrupt() 时, 唤醒的线程都到entrySet阻塞队列成为BLOCKED状态, 在阻塞队列,和其他线程再进行 竞争锁
- 竞争锁成功,t 线程从 WAITING --> RUNNABLE
- 竞争锁失败,t 线程从 WAITING --> BLOCKED
- 3、RUNNABLE <–> WAITING
- 当前线程调用 t.join() 方法时,当前线程从 RUNNABLE --> WAITING ,注意是当前线程在t线程对象在waitSet上等待
- t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING --> RUNNABLE
- 4、RUNNABLE <–> WAITING
- 当前线程调用 LockSupport.park() 方法会让当前线程从RUNNABLE --> WAITING
- 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING --> RUNNABLE
Runnable 和 Timed-Waiting的相互转换
- 5、RUNNABLE <–> TIMED_WAITING (带超时时间的wait)
- t 线程用synchronized(obj) 获取了对象锁后
- 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE --> TIMED_WAITING
- t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时; 唤醒的线程都到entrySet阻塞队列成为BLOCKED状态, 在阻塞队列,和其他线程再进行 竞争锁
- 竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE
- 竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED
- 6、RUNNABLE <–> TIMED_WAITING
- 当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE --> TIMED_WAITING 注意是当前线程在t 线程对象的waitSet等待
- 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING --> RUNNABLE
- 7、RUNNABLE <–> TIMED_WAITING
- 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
- 当前线程等待时间超过了 n 毫秒或调用了线程的 interrupt() ,当前线程从 TIMED_WAITING --> RUNNABLE
- 8、RUNNABLE <–> TIMED_WAITING
- 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线程从 RUNNABLE --> TIMED_WAITING
- 调用LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE
- 9、RUNNABLE <–> BLOCKED
- t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED, 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED
- 10、 RUNNABLE <–> TERMINATED
- 当前线程所有代码运行完毕,进入 TERMINATED