Java并发编程(十二)(线程安全集合类) : JUC线程安全集合类 ConcurrentHashMap、BlockingQueue、ConcurrentLinkedQueue、CopyOnWriteArrayList· 线程安全集合类概述·
线程安全集合类可以分为三大类:
遗留的线程安全集合如 Hashtable , Vector
使用 Collections 装饰的线程安全集合,如:
Collections.synchronizedCollection
Collections.synchronizedList
Collections.synchronizedMap
Collections.synchronizedSet
Collections.synchronizedNavigableMap
Collections.synchronizedNavigableSet
Collections.synchronizedSortedMap
Collections.synchronizedSortedSet
java.util.concurrent.*
重点介绍java.util.concurrent.*
下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:Blocking、CopyOnWrite、Concurrent
Blocking 大部分实现基于锁,并提供用来阻塞的方法 CopyOnWrite 之类容器修改开销相对较重 Concurrent 类型的容器内部很多操作使用 cas 优化,一般可以提供较高吞吐量 弱一致性遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的 求大小弱一致性,size 操作未必是 100% 准确 读取弱一致性 遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出ConcurrentModificationException,不再继续遍历
ConcurrentHashMap· 练习:单词计数· 生成测试数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static final String ALPHA = "abcedfghijklmnopqrstuvwxyz" ;public static void main (String[] args) { int length = ALPHA.length(); int count = 200 ; List<String> list = new ArrayList <>(length * count); for (int i = 0 ; i < length; i++) { char ch = ALPHA.charAt(i); for (int j = 0 ; j < count; j++) { list.add(String.valueOf(ch)); } } Collections.shuffle(list); for (int i = 0 ; i < 26 ; i++) { try (PrintWriter out = new PrintWriter ( new OutputStreamWriter ( new FileOutputStream ("tmp/" + (i+1 ) + ".txt" )))) { String collect = list.subList(i * count, (i + 1 ) * count).stream() .collect(Collectors.joining("\n" )); out.print(collect); } catch (IOException e) { } } }
模版代码,模版代码中封装了多线程读取文件的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private static <V> void demo (Supplier<Map<String,V>> supplier, BiConsumer<Map<String,V>,List<String>> consumer) { Map<String, V> counterMap = supplier.get(); List<Thread> ts = new ArrayList <>(); for (int i = 1 ; i <= 26 ; i++) { int idx = i; Thread thread = new Thread (() -> { List<String> words = readFromFile(idx); consumer.accept(counterMap, words); }); ts.add(thread); } ts.forEach(t->t.start()); ts.forEach(t-> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(counterMap); } public static List<String> readFromFile (int i) { ArrayList<String> words = new ArrayList <>(); try (BufferedReader in = new BufferedReader (new InputStreamReader (new FileInputStream ("tmp/" + i +".txt" )))) { while (true ) { String word = in.readLine(); if (word == null ) { break ; } words.add(word); } return words; } catch (IOException e) { throw new RuntimeException (e); } }
你要做的是实现两个参数
一是提供一个 map 集合,用来存放每个单词的计数结果,key 为单词,value 为计数 二是提供一组操作,保证计数的安全性,会传递 map 集合以及 单词 List 正确结果输出应该是每个单词出现 200 次
1 2 {a =200, b =200, c =200, d =200, e =200, f =200, g =200, h =200, i =200, j =200, k =200, l =200, m =200, n =200, o =200, p =200, q =200, r =200, s =200, t =200, u =200, v =200, w =200, x =200, y =200, z =200}
下面的实现为:
1 2 3 4 5 6 7 8 9 10 11 12 13 demo( () -> new HashMap <String, Integer>(), (map, words) -> { for (String word : words) { Integer counter = map.get(word); int newValue = counter == null ? 1 : counter + 1 ; map.put(word, newValue); } } );
有没有问题?请改进
参考解答1
1 2 3 4 5 6 7 8 9 demo( () -> new ConcurrentHashMap <String, LongAdder>(), (map, words) -> { for (String word : words) { map.computeIfAbsent(word, (key) -> new LongAdder ()).increment(); } } );
参考解答2
1 2 3 4 5 6 7 8 9 demo( () -> new ConcurrentHashMap <String, Integer>(), (map, words) -> { for (String word : words) { map.merge(word, 1 , Integer::sum); } } );
ConcurrentHashMap 原理· JDK 7 HashMap 并发死链· HashMap原理演示· 请看VCRhttps://www.bilibili.com/video/BV16J411h7Rd?p=277&vd_source=92b714d958a38feb7688d3e50183082d
死链出现在扩容的时候
测试代码· 注意
要在 JDK 7 下运行,否则扩容机制和 hash 的计算方法都变了 以下测试代码是精心准备的,不要随便改动 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public static void main (String[] args) { System.out.println("长度为16时,桶下标为1的key" ); for (int i = 0 ; i < 64 ; i++) { if (hash(i) % 16 == 1 ) { System.out.println(i); } } System.out.println("长度为32时,桶下标为1的key" ); for (int i = 0 ; i < 64 ; i++) { if (hash(i) % 32 == 1 ) { System.out.println(i); } } final HashMap<Integer, Integer> map = new HashMap <Integer, Integer>(); map.put(2 , null ); map.put(3 , null ); map.put(4 , null ); map.put(5 , null ); map.put(6 , null ); map.put(7 , null ); map.put(8 , null ); map.put(9 , null ); map.put(10 , null ); map.put(16 , null ); map.put(35 , null ); map.put(1 , null ); System.out.println("扩容前大小[main]:" +map.size()); new Thread () { @Override public void run () { map.put(50 , null ); System.out.println("扩容后大小[Thread-0]:" +map.size()); } }.start(); new Thread () { @Override public void run () { map.put(50 , null ); System.out.println("扩容后大小[Thread-1]:" +map.size()); } }.start(); } final static int hash (Object k) { int h = 0 ; if (0 != h && k instanceof String) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); h ^= (h >>> 20 ) ^ (h >>> 12 ); return h ^ (h >>> 7 ) ^ (h >>> 4 ); }
死链复现· 调试工具使用 idea 在 HashMap 源码 590 行加断点
1 int newCapacity = newTable.length;
断点的条件如下,目的是让 HashMap 在扩容为 32 时,并且线程为 Thread-0 或 Thread-1 时停下来
1 2 3 4 5 newTable.length==32 && ( Thread.currentThread().getName().equals("Thread-0" )|| Thread.currentThread().getName().equals("Thread-1" ) )
断点暂停方式选择 Thread,否则在调试 Thread-0 时,Thread-1 无法恢复运行运行代码,程序在预料的断点位置停了下来,输出
1 2 3 4 5 6 7 8 9 长度为16 时,桶下标为1 的key 1 16 35 50 长度为32 时,桶下标为1 的key 1 35 扩容前大小[main]:12
接下来进入扩容流程调试在 HashMap 源码 594 行加断点
1 2 3 Entry<K,V> next = e.next; if (rehash)
这是为了观察 e 节点和 next 节点的状态,Thread-0 单步执行到 594 行,再 594 处再添加一个断点(条件Thread.currentThread().getName().equals(“Thread-0”))这时可以在 Variables 面板观察到 e 和 next 变量,使用 view as -> Object 查看节点状态
1 2 e (1 ) ->(35 ) ->(16 ) ->null next (35 ) ->(16 ) ->null
在 Threads 面板选中 Thread-1 恢复运行,可以看到控制台输出新的内容如下,Thread-1 扩容已完成
1 newTable[1 ] (35 ) ->(1 ) ->null
这时 Thread-0 还停在 594 处, Variables 面板变量的状态已经变化为
1 2 e (1 ) ->null next (35 ) ->(1 ) ->null
为什么呢,因为 Thread-1 扩容时链表也是后加入的元素放入链表头,因此链表就倒过来了,但 Thread-1 虽然结果正确,但它结束后 Thread-0 还要继续运行接下来就可以单步调试(F8)观察死链的产生了
下一轮循环到 594,将 e 搬迁到 newTable 链表头
1 2 3 newTable[1 ] (1 ) ->null e (35 ) ->(1 ) ->null next (1 ) ->null
下一轮循环到 594,将 e 搬迁到 newTable 链表头
1 2 3 newTable[1 ] (35 ) ->(1 ) ->null e (1 ) ->null next null
再看看源码
1 2 3 4 5 6 7 e.next = new Table [1 ]; new Table [1 ] = e;e = next;
源码分析· HashMap 的并发死链发生在扩容时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void transfer (Entry[] newTable, boolean rehash) { int newCapacity = newTable.length; for (Entry<K,V> e : table) { while (null != e) { Entry<K,V> next = e.next; if (rehash) { e.hash = null == e.key ? 0 : hash(e.key); } int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } } }
假设 map 中初始元素是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 原始链表,格式:[下标] (key,next) [1] (1 ,35 )->(35 ,16 )->(16 ,null)线程 a 执行到 1 处 ,此时局部变量 e 为 (1 ,35 ),而局部变量 next 为 (35 ,16 ) 线程 a 挂起 线程 b 开始执行 第一次循环 [1] (1 ,null)第二次循环 [1] (35 ,1 )->(1 ,null)第三次循环 [1] (35 ,1 )->(1 ,null)[17] (16 ,null)切换回线程 a ,此时局部变量 e 和 next 被恢复,引用没变但内容变了:e 的内容被改为 (1 ,null),而 next 的内 容被改为 (35 ,1 ) 并链向 (1 ,null) 第一次循环 [1] (1 ,null)第二次循环,注意这时 e 是 (35 ,1 ) 并链向 (1 ,null) 所以 next 又是 (1 ,null) [1] (35 ,1 )->(1 ,null)第三次循环,e 是 (1 ,null),而 next 是 null,但 e 被放入链表头,这样 e.next 变成了 35 (2 处) [1] (1 ,35 )->(35 ,1 )->(1 ,35 )已经是死链了
究其原因,是因为在多线程环境下使用了非线程安全的 map 集合 JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据) JDK 8 ConcurrentHashMap· 原理演示· https://www.bilibili.com/video/BV16J411h7Rd?p=281&spm_id_from=pageDriver&vd_source=92b714d958a38feb7688d3e50183082d
重要属性和内部类· 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private transient volatile int sizeCtl;static class Node <K,V> implements Map .Entry<K,V> {}transient volatile Node<K,V>[] table;private transient volatile Node<K,V>[] nextTable;static final class ForwardingNode <K,V> extends Node <K,V> {}static final class ReservationNode <K,V> extends Node <K,V> {}static final class TreeBin <K,V> extends Node <K,V> {}static final class TreeNode <K,V> extends Node <K,V> {}
重要方法· 1 2 3 4 5 6 static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) static final <K,V> boolean casTabAt (Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) static final <K,V> void setTabAt (Node<K,V>[] tab, int i, Node<K,V> v)
构造器分析· 可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建
1 2 3 4 5 6 7 8 9 10 11 12 13 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException (); if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; long size = (long )(1.0 + (long )initialCapacity / loadFactor); int cap = (size >= (long )MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int )size); this .sizeCtl = cap; }
get 流程· 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
put 流程· 以下数组简称(table),链表简称(bin)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 public V put (K key, V value) { return putVal(key, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException (); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node <K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node <K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; } private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield (); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node <?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; } private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ( (as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x) ) { CounterCell a; long v; int m; boolean uncontended = true ; if ( as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) ) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } }
size 计算流程· size 计算实际发生在 put,remove 改变集合元素的操作之中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public int size () { long n = sumCount(); return ((n < 0L ) ? 0 : (n > (long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int )n); } final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
Java 8 数组(Node) +( 链表 Node | 红黑树 TreeNode ) 以下数组简称(table),链表简称(bin)
初始化,使用 cas 来保证并发安全,懒惰初始化 table 树化,当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程会用 synchronized 锁住链表头 put,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素添加至 bin 的尾部 get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新table 进行搜索 扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时妙的是其它竞争线程也不是无事可做,它们会帮助把其它 bin 进行扩容,扩容时平均只有 1/6 的节点会把复制到新 table 中 size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中。最后统计数量时累加即可 源码分析 http://www.importnew.com/28263.html 其它实现 Cliff Click’s high scale lib
JDK 7 ConcurrentHashMap· 它维护了一个 segment 数组,每个 segment 对应一把锁
构造器分析· 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException (); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0 ; int ssize = 1 ; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } this .segmentShift = 32 - sshift; this .segmentMask = ssize - 1 ; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1 ; Segment<K,V> s0 = new Segment <K,V>(loadFactor, (int )(cap * loadFactor), (HashEntry<K,V>[])new HashEntry [cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment [ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); this .segments = ss; }
构造完成,如下图所示
可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment 例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位
结果再与 this.segmentMask 做位于运算,最终得到 1010 即下标为 10 的 segment
put 流程· 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public V put (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException (); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null ) { s = ensureSegment(j); } return s.put(key, hash, value, false ); }
segment 继承了可重入锁(ReentrantLock),它的 put 方法为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 final V put (K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null ) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break ; } e = e.next; } else { if (node != null ) node.setNext(first); else node = new HashEntry <K,V>(hash, key, value, first); int c = count + 1 ; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally { unlock(); } return oldValue; }
rehash 流程· 发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 private void rehash (HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1 ; threshold = (int )(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry [newCapacity]; int sizeMask = newCapacity - 1 ; for (int i = 0 ; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null ) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null ) newTable[idx] = e; else { HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null ; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry <K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
附,调试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public static void main (String[] args) { ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap <>(); for (int i = 0 ; i < 1000 ; i++) { int hash = hash(i); int segmentIndex = (hash >>> 28 ) & 15 ; if (segmentIndex == 4 && hash % 8 == 2 ) { System.out.println(i + "\t" + segmentIndex + "\t" + hash % 2 + "\t" + hash % 4 + "\t" + hash % 8 ); } } map.put(1 , "value" ); map.put(15 , "value" ); map.put(169 , "value" ); map.put(197 , "value" ); map.put(341 , "value" ); map.put(484 , "value" ); map.put(545 , "value" ); map.put(912 , "value" ); map.put(941 , "value" ); System.out.println("ok" ); } private static int hash (Object k) { int h = 0 ; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); h += (h << 15 ) ^ 0xffffcd7d ; h ^= (h >>> 10 ); h += (h << 3 ); h ^= (h >>> 6 ); h += (h << 2 ) + (h << 14 ); int v = h ^ (h >>> 16 ); return v; }
get 流程· get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新表取内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public V get (Object key) { Segment<K,V> s; HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null ) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long )(((tab.length - 1 ) & h)) << TSHIFT) + TBASE); e != null ; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null ; }
size 计算流程· 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public int size () { final Segment<K,V>[] segments = this .segments; int size; boolean overflow; long sum; long last = 0L ; int retries = -1 ; try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) ensureSegment(j).lock(); } sum = 0L ; size = 0 ; overflow = false ; for (int j = 0 ; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null ) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0 ) overflow = true ; } } if (sum == last) break ; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
BlockingQueue· LinkedBlockingQueue 原理· 基本的入队出队· 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class LinkedBlockingQueue <E> extends AbstractQueue <E> implements BlockingQueue <E>, java.io.Serializable { static class Node <E> { E item; Node<E> next; Node(E x) { item = x; } } }
初始化链表 last = head = new Node<E>(null);
Dummy 节点用来占位,item 为 null
当一个节点入队last = last.next = node;
再来一个节点入队 last = last.next = node;
出队
1 2 3 4 5 6 7 Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item;first.item = null ; return x;
h = head
first = h.next
h.next = h
head = first
1 2 3 E x = first.item;first.item = null ; return x;
加锁分析· 高明之处 在于用了两把锁和 dummy 节点
线程安全分析
当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是head 节点的线程安全。两把锁保证了入队和出队没有竞争 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞 1 2 3 4 private final ReentrantLock putLock = new ReentrantLock ();private final ReentrantLock takeLock = new ReentrantLock ();
put 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); int c = -1 ; Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
take 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull() return x; }
由 put 唤醒 put 是为了避免信号不足
性能比较· 主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较
Linked 支持有界,Array 强制有界 Linked 实现是链表,Array 实现是数组 Linked 是懒惰的,而 Array 需要提前初始化 Node 数组 Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的 Linked 两把锁,Array 一把锁 ConcurrentLinkedQueue· ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是
两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行 dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争 只是这【锁】使用了 cas 来实现 事实上,ConcurrentLinkedQueue 应用还是非常广泛的例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用
ConcurrentLinkedQueue 原理· 模仿 ConcurrentLinkedQueue· 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 package cn.itcast.concurrent.thirdpart.test;import java.util.Collection;import java.util.Iterator;import java.util.Queue;import java.util.concurrent.atomic.AtomicReference;public class Test3 { public static void main (String[] args) { MyQueue<String> queue = new MyQueue <>(); queue.offer("1" ); queue.offer("2" ); queue.offer("3" ); System.out.println(queue); } } class MyQueue <E> implements Queue <E> { @Override public String toString () { StringBuilder sb = new StringBuilder (); for (Node<E> p = head; p != null ; p = p.next.get()) { E item = p.item; if (item != null ) { sb.append(item).append("->" ); } } sb.append("null" ); return sb.toString(); } @Override public int size () { return 0 ; } @Override public boolean isEmpty () { return false ; } @Override public boolean contains (Object o) { return false ; } @Override public Iterator<E> iterator () { return null ; } @Override public Object[] toArray() { return new Object [0 ]; } @Override public <T> T[] toArray(T[] a) { return null ; } @Override public boolean add (E e) { return false ; } @Override public boolean remove (Object o) { return false ; } @Override public boolean containsAll (Collection<?> c) { return false ; } @Override public boolean addAll (Collection<? extends E> c) { return false ; } @Override public boolean removeAll (Collection<?> c) { return false ; } @Override public boolean retainAll (Collection<?> c) { return false ; } @Override public void clear () { } @Override public E remove () { return null ; } @Override public E element () { return null ; } @Override public E peek () { return null ; } public MyQueue () { head = last = new Node <>(null , null ); } private volatile Node<E> last; private volatile Node<E> head; private E dequeue () { return null ; } @Override public E poll () { return null ; } @Override public boolean offer (E e) { return true ; } static class Node <E> { volatile E item; public Node (E item, Node<E> next) { this .item = item; this .next = new AtomicReference <>(next); } AtomicReference<Node<E>> next; } }
offer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean offer (E e) { Node<E> n = new Node <>(e, null ); while (true ) { AtomicReference<Node<E>> next = last.next; if (next.compareAndSet(null , n)) { last = n; return true ; } } }
CopyOnWriteArrayList· CopyOnWriteArraySet
是它的马甲 底层实现采用了 写入时拷贝 的思想,增删改操作会将底层数组拷贝一份,更改操作在新数组上执行,这时不影响其它线程的并发读,读写分离。 以新增为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean add (E e) { synchronized (lock) { Object[] es = getArray(); int len = es.length; es = Arrays.copyOf(es, len + 1 ); es[len] = e; setArray(es); return true ; } }
这里的源码版本是 Java 11,在 Java 1.8 中使用的是可重入锁而不是 synchronized
其它读操作并未加锁,例如:
1 2 3 4 5 6 7 public void forEach (Consumer<? super E> action) { Objects.requireNonNull(action); for (Object x : getArray()) { @SuppressWarnings("unchecked") E e = (E) x; action.accept(e); } }
适合『读多写少』的应用场景
get 弱一致性·
时间点 操作 1 Thread-0 getArray() 2 Thread-1 getArray() 3 Thread-1 setArray(arrayCopy) 4 Thread-0 array[index]
不容易测试,但问题确实存在
迭代器弱一致性· 1 2 3 4 5 6 7 8 9 10 11 12 13 CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList <>(); list.add(1 ); list.add(2 ); list.add(3 ); Iterator<Integer> iter = list.iterator(); new Thread (() -> { list.remove(0 ); System.out.println(list); }).start(); sleep1s(); while (iter.hasNext()) { System.out.println(iter.next()); }
不要觉得弱一致性就不好
数据库的 MVCC 都是弱一致性的表现 并发高和一致性是矛盾的,需要权衡