一、简介
ConcurrentHashMap 是 HashMap 的线程安全版本。
JDK 7 采用的是分段锁,整个 Map 会被分为若干段,每个段都可以独立加锁。不同的线程可以同时操作不同的段,从而实现并发。
JDK 8 使用了一种更加细粒度的锁——桶锁,再配合 CAS + synchronized 代码块控制并发写入,以最大程度减少锁的竞争。
对于读操作,ConcurrentHashMap 使用了 volatile 变量来保证内存可见性。
对于写操作,ConcurrentHashMap 优先使用 CAS 尝试插入,如果成功就直接返回;否则使用 synchronized 代码块进行加锁处理。
二、JDK7中原理
JDK 7 的 ConcurrentHashMap 采用的是分段锁,整个 Map 会被分为若干段,每个段都可以独立加锁,每个段类似一个 Hashtable。
每个段维护一个键值对数组 HashEntry<K, V>[] table
,HashEntry 是一个单项链表。
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
final HashEntry<K,V> next;
}
段继承了 ReentrantLock,所以每个段都是一个可重入锁,不同的线程可以同时操作不同的段,从而实现并发。
static final class Segment<K,V> extends ReentrantLock {
transient volatile HashEntry<K,V>[] table;
transient int count;
}
JDK 7 中 put 流程
put 流程和 HashMap 非常类似,先定位到具体的段,再通过 ReentrantLock 去操作。一共可以分为 4 个步骤:
第一步,计算 key 的 hash,定位到段,段如果是空就先初始化;
第二步,使用 ReentrantLock 进行加锁,如果加锁失败就自旋,自旋超过次数就阻塞,保证一定能获取到锁;
第三步,遍历段中的键值对 HashEntry,key 相同直接替换,key 不存在就插入。
第四步,释放锁。
JDK 7 中get 流程
先计算 key 的 hash 找到段,再遍历段中的键值对,找到就直接返回 value。
get 不用加锁,因为是 value 是 volatile 的,所以线程读取 value 时不会出现可见性问题。
三、JDK8中原理
JDK 8 中的 ConcurrentHashMap 取消了分段锁,采用 CAS + synchronized 来实现更细粒度的桶锁,并且使用红黑树来优化链表以提高哈希冲突时的查询效率,性能比 JDK 7 有了很大的提升。
JDK 8 中put 流程
第一步,计算 key 的 hash,以确定桶在数组中的位置。如果数组为空,采用 CAS 的方式初始化,以确保只有一个线程在初始化数组。
// 计算 hash
int hash = spread(key.hashCode());
// 初始化数组
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 计算桶的位置
int i = (n - 1) & hash;
第二步,如果桶为空,直接 CAS 插入节点。如果 CAS 操作失败,会退化为 synchronized 代码块来插入节点。
// CAS 插入节点
if (tabAt(tab, i) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
// 否则,使用 synchronized 代码块插入节点
else {
synchronized (f) { // **只锁当前桶**
if (tabAt(tab, i) == f) { // 确保未被其他线程修改
if (f.hash >= 0) { // 链表处理
for (Node<K,V> e = f;;) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (key != null && key.equals(ek)))) {
e.val = value;
break;
}
e = e.next;
}
} else if (f instanceof TreeBin) { // **红黑树处理**
((TreeBin<K,V>) f).putTreeVal(hash, key, value);
}
}
}
}
插入的过程中会判断桶的哈希是否小于 0(f.hash >= 0
),小于 0 说明是红黑树,大于等于 0 说明是链表。
补充:在 ConcurrentHashMap 的实现中,红黑树节点 TreeBin 的 hash 值固定为 -2。
第三步,如果链表长度超过 8,转换为红黑树。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
第四步,在插入新节点后,会调用 addCount()
方法检查是否需要扩容。
addCount(1L, binCount);
JDK 8 中get 流程
get 也是通过 key 的 hash 进行定位,如果该位置节点的哈希匹配且键相等,则直接返回值。
如果节点的哈希为负数,说明是个特殊节点,比如说如树节点或者正在迁移的节点,就调用find
方法查找。
否则遍历链表查找匹配的键。如果都没找到,返回 null。
四、区别联系
为什么JDK 1.7 中要用 ReentrantLock,而在 JDK 1.8 要用 synchronized
JDK 1.7 中的 ConcurrentHashMap 使用了分段锁机制,每个 Segment 都继承了 ReentrantLock,这样可以保证每个 Segment 都可以独立地加锁。
而在 JDK 1.8 中,ConcurrentHashMap 取消了 Segment 分段锁,采用了更加精细化的锁——桶锁,以及 CAS 无锁算法,每个桶都可以独立地加锁,只有在 CAS 失败时才会使用 synchronized 代码块加锁,这样可以减少锁的竞争,提高并发性能。
HashMap 和 ConcurrentHashMap 的区别?
HashMap 是非线程安全的,多线程环境下应该使用 ConcurrentHashMap。
hash 的计算方法上,ConcurrentHashMap 的 spread 方法接收一个已经计算好的 hashCode,然后将这个哈希码的高 16 位与自身进行异或运算。
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
比 HashMap 的 hash 计算多了一个
& HASH_BITS
的操作。这里的 HASH_BITS 是一个常数,值为 0x7fffffff,它确保结果是一个非负整数。static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); }
ConcurrentHashMap 对节点 Node 做了进一步的封装,比如说用 Forwarding Node 来表示正在进行扩容的节点。
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } }
put 方法,通过 CAS + synchronized 代码块来进行并发写入。
评论区