前言

在我们日常开发中,HashMap无疑是一个被频繁使用的类,但是它本身也有自己的不足。HashMap被设计为一个线程不完全的类,这意味着它在并发读写时会有问题,所以我们日常开发时不会在并发场景使用它。而这次要分析的ConcurrentHashMap不仅具有和HashMap类似的功能,而且在并发场景下也完全没有问题。我们先看一下它的类图。

image-20240326204620784

ConcurrentHashMap同时具有以下特性:

  • 线程安全:ConcurrentHashMap内部使用分段锁技术,不同的线程可以同时操作不同的段,提高并发性。
  • 高并发:ConcurrentHashMap允许多个线程同时读,而不需要任何锁。写操作(如put,remove等)需要锁定部分数据,而不是整个数据结构。
  • 非阻塞算法:ConcurrentHashMap使用一种新的方法CAS(Compare and Swap),它是硬件对于并发操作的直接支持,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知失败,并允许再次尝试。
  • 弱一致性:ConcurrentHashMap的读是弱一致的,也就是说,当你从ConcurrentHashMap中读取元素时,可能会看到其他线程插入的元素,也可能看不到。
  • 支持并行操作:ConcurrentHashMap支持一系列并行操作,如mappingCount,reduce和search等。

关键属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 最大表容量
private static final int MAXIMUM_CAPACITY = 1 << 30;

// 初始表容量,必须是2的幂且最大不超过MAXIMUM_CAPACITY
private static final int DEFAULT_CAPACITY = 16;

// 可能的最大(非2的幂)数组大小, toArray相关的方法会用到它
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 默认的并发级别,已经废弃,这里是为了兼容以前的代码
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 默认的负载因子,主要用在初始化对象时。
private static final float LOAD_FACTOR = 0.75f;

// 使用树而不是列表用于bin的bin计数阈值,
static final int TREEIFY_THRESHOLD = 8;

// 小于这个阈值时触发将树转为链表
static final int UNTREEIFY_THRESHOLD = 6;

// 可能树化的最小表容量,当某个bin中Node的数量超过了TREEIFY_THRESHOLD,但是数字中所有Node的数量小于MIN_TREEIFY_CAPACITY,此时会先触发扩容,而不是将链表转为树
static final int MIN_TREEIFY_CAPACITY = 64;

// 定义每次转移步骤的最小重新分配数量,在调整哈希表大小时,为了减少争用,表的索引会被划分为小块进行转移,MIN_TRANSFER_STRIDE就是这个划分的最小单位
private static final int MIN_TRANSFER_STRIDE = 16;

// 定义扩容标记的位数,决定了可以表示的扩容标记的数量,以支持ConcurrentHashMap的并发扩容操作。
private static final int RESIZE_STAMP_BITS = 16;

// 定义最多多少个线程进行扩容操作。在ConcurrentHashMap的扩容过程中,为了减少争用,表的索引会被划分为小块进行转移,这个过程可以由多个线程并发进行。MAX_RESIZERS就是这个并发线程数量的上限。
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

// 定义了在sizeCtl字段中记录扩容标记(resize stamp)的位移量。sizeCtl的高16位存储一个扩容标记(resize stamp),低16位存储一个扩容的参与者数量(number of resizing workers)
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

// 这个常量用于表示转发节点的哈希值。当ConcurrentHashMap进行扩容操作时,会创建一种特殊的节点(ForwardingNode),这种节点的哈希值就被设置为MOVED。当遇到哈希值为MOVED的节点时,表示当前的哈希表正在进行扩容操作,需要使用新的哈希表进行操作。
static final int MOVED = -1; // hash for forwarding nodes
// 表示树的根节点的哈希值。当一个桶中的元素数量超过一定阈值时,会将链表结构转换为红黑树结构,以提高查找效率。这种转换后的桶被称为TreeBin,其根节点的哈希值被设置为TREEBIN。
static final int TREEBIN = -2; // hash for roots of trees
// 表示临时预留节点的哈希值。在ConcurrentHashMap的computeIfAbsent等方法中,可能会创建一种特殊的节点(ReservationNode),表示一个正在计算但尚未插入到哈希表中的映射,这种节点的哈希值被设置为RESERVED
static final int RESERVED = -3; // hash for transient reservations
// 用于获取节点哈希值的有效位。在ConcurrentHashMap中,节点的哈希值是一个32位的整数,但是最高位被用于特殊目的(如上述的MOVED、TREEBIN和RESERVED),因此只有低31位被用于存储正常的哈希值。HASH_BITS就是一个掩码,用于从节点的哈希值中提取出这31位有效的哈希值。
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 用于存储键值对
transient volatile Node<K,V>[] table;

// 扩容时使用。当ConcurrentHashMap进行扩容时,会创建一个新的哈希表(即nextTable),然后将旧的哈希表中的元素逐个转移到新的哈希表中。在这个过程中,旧的哈希表仍然可以提供服务,直到所有的元素都转移到新的哈希表中,然后将table字段指向新的哈希表,nextTable字段被置为null。这样做的目的是为了支持ConcurrentHashMap的并发扩容操作。
private transient volatile Node<K,V>[] nextTable;

// 表中存储元素的数量。这个字段是ConcurrentHashMap实现其并发特性的关键部分,它允许多个线程在不需要锁定整个数据结构的情况下,安全地更新元素的数量
private transient volatile long baseCount;

// 用于控制表的初始化和调整大小。当sizeCtl为负数时,表示表正在被初始化或调整大小:-1表示正在初始化,否则表示正在调整大小的活动线程数加1。否则,当表为null时,sizeCtl保存创建时使用的初始表大小,或0表示默认。初始化后,sizeCtl保存下一次调整表大小时的元素计数值。
private transient volatile int sizeCtl;

// 在扩容过程中控制下一个要分割的表索引。扩容时会将旧的哈希表中的元素逐个转移到新的哈希表中。在这个过程中,transferIndex字段用于记录下一个要处理的哈希表索引,以此来控制并发的扩容操作。
private transient volatile int transferIndex;

// 用于控制对CounterCell数组的访问。当cellsBusy为0时,表示没有线程正在访问CounterCell数组。当一个线程需要访问CounterCell数组时,它会尝试使用CAS操作将cellsBusy的值从0设置为1。如果设置成功,那么这个线程就可以安全地访问CounterCell数组。当访问完成后,线程会将cellsBusy的值设置回0。如果一个线程尝试设置cellsBusy的值,但发现它已经不是0,那么这个线程就会进入自旋,直到cellsBusy的值变为0。这是一种简单的自旋锁,用于保护CounterCell数组的并发访问
private transient volatile int cellsBusy;

// 用于计算哈希表中的节点数量。当多个线程并发更新ConcurrentHashMap的大小时,为了减少线程间的竞争,ConcurrentHashMap会创建多个CounterCell,每个线程更新自己对应的CounterCell,最后再将所有CounterCell的值累加起来,得到ConcurrentHashMap的总大小。
private transient volatile CounterCell[] counterCells;

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 创建一个默认初始容量(16)、负载因子(0.75)和并发级别(16)的ConcurrentHashMap
public ConcurrentHashMap() {
}

// 创建一个给定初始容量、默认负载因子(0.75)和默认并发级别(16)的ConcurrentHashMap
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, LOAD_FACTOR, 1);
}

// 基于给定的map构造一个新的ConcurrentHashMap
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

// 创建一个指定容量、指定负载因子和并发级别(16)的ConcurrentHashMap
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

// 创建一个给定初始容量、给定加载因子和给定并发级别的ConcurrentHashMap
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

关键方法

寻址算法

给定一个key,ConcurrentHashMap是如何确定这个key应该放在哪个位置的呢?这就是寻址算法。

首先要计算这个key的hash值,在ConcurrentHashMap中它是用spread方法实现的。

1
2
3
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

参数h为key的hash值,可以看到这里对hash做了二次处理,以减少哈希冲突并提高哈希表的性能。在这段代码中,先将h的高低16位进行异或处理,以减少hash冲突;然后又和HASH_BITS进行操作,保证最后的hash值是一个正数。

计算完hash值后,我们就可以计算出这个key在数组中的索引值,算法为:(n - 1) & hash,其中n为数组的长度,即数组的长度减一后和hash值做运算。

有索引了,如何取出索引出的数据呢?这就是tabAt()方法了

1
2
3
4
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
// 可以看到这里用到是底层的原子操作来获取数组中指定索引位置的元素的
return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}

添加元素

put()

将一个键值对元素放入ConcurrentHashMap中,我们直接看底层的putVal方法,可以看到方法接受三个参数:key、value和onlyIfAbsent。key和value是要插入映射的键值对。onlyIfAbsent是一个布尔标志,表示如果映射中已存在键,是否应插入新值。

大致流程为:

  1. 先计算key的hash值,然后无限循环哈希表
  2. 如果此时哈希表还没初始化,则调用initTable()进行初始化
  3. 哈希表已初始化并且索引处的bin位空,就构造新Node插入该索引处
  4. 如果索引处不为空,判断节点是否为转发节点,是的话就进行调整
  5. 如果不是转发节点,并且onlyIfAbsent为true的话,就尝试进行值的替换而不是插入值(如果键相等的话)
  6. 走到这里说明索引处是一个链表或一棵树,构造节点并尝试将Node添加到链表或树中,必要时刻将链表转为树
  7. 累加映射中元素的计数并返回null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0) // 如果hash表没初始化,则初始化hash表
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果索引处没有值,直接插入到该位置
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) // 通过CAS来将Node插入该位置
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) //检查插槽中现有节点是否为转发节点。如果是,就进行调整。
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // onlyIfAbsent为true的话,就进行值的替换而不是插入值
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else { // 到这里就说明索引处为链表或树
V oldVal = null;
synchronized (f) { // 为保证并发安全,链表或数的操作用synchronized进行加锁操作
if (tabAt(tab, i) == f) {
if (fh >= 0) { // hash值大于或等于0,表示bin是一个链表。
binCount = 1;
// 遍历链表,如果找到一个具有相同键的节点,更新节点的值;如果没有,就构造一个Node并加到链表尾部
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 键相同的逻辑
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 遍历完了也没有相同的键,构造新Node加入链表尾部
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) { // 将键值对放入树中
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode) // 说明bin正在被另一个操作使用,此时抛出异常
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 如果超过了TREEIFY_THRESHOLD,则将链表转为树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 插入成功后就增加映射中元素的计数
addCount(1L, binCount);
return null;
}

hash表的初始化过程,可以看到它使用了CAS操作和线程让步来保证在多线程环境下哈希表的正确初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { // 哈希表为空时才进行初始化
if ((sc = sizeCtl) < 0) // sizeCtl小于0说明其它线程正在初始化或扩容,此时就让当前线程处于等待状态
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {// 否则尝试将SIZECTL设置为-1,表示当前线程要对哈希表进行初始化操作
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);// 设置扩容阈值
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
addCount()

addCount()方法主要用来累计哈希表中的映射数量,并确定是否要触发扩容。简单来说,addCount()内部会尝试通过CAS的方式累加baseCount,如果累加失败就说明存在并发竞争,此时要再说在满足以下任意一个条件就要调用fullAddCount()方法:

  1. counterCells为null或者是空数组
  2. counterCells不是null或空数组,但是索引处内容为null;或者counterCells不是null或空数组,索引处也有counterCell,但是CAS累加counterCell的value值失败

完了后要进行扩容检查,我们下面会再讲扩容机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* @param x 要累计的数量
* @param 决定是否需要检查并可能触发扩容
*/
private final void addCount(long x, int check) {
CounterCell[] cs; long b, s;
if ((cs = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m;
boolean uncontended = true;
// 如果满足以下任意条件才调用fullAddCount方法进行完整的计数累加
// cs为空,或者cs不为空但是索引处c为空,或者索引处c不为空,但是通过CAS累加c的value值失败
if (cs == null || (m = cs.length - 1) < 0 || // cs为空
(c = cs[ThreadLocalRandom.getProbe() & m]) == null || // cs不为空,但是索引处内容为空
!(uncontended =
U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x)))// 原子的累加c的value值失败 {
fullAddCount(x, uncontended);
return;
}
if (check <= 1) // 不进行扩容检查
return;
s = sumCount(); // 计算哈希表中的映射数量
}
if (check >= 0) { // 进行扩容检查
// tab是旧的哈希表,nt是新的哈希表
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// rs用于控制扩容的并发数
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) { // 说明已经有其它线程再扩容了
// 如果满足以下任一条件,就跳出循环:
// sc == rs + MAX_RESIZERS || sc == rs + 1 为true,说明已经有足够多的线程在进行扩容
// nextTable为null,表示新的哈希表还没有创建
// transferIndex小于等于0,表示没有更多的元素需要转移
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
// 满足上面条件时就尝试将sc加1,如果累加成功就调用transfer(tab, nt)进行扩容
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2)) // 说明还没有线程开始扩容
transfer(tab, null);
s = sumCount(); // 重新计数
}
}
}

在理清addCount()方法后,我们就可以在梳理它里面的fullAddCount()方法了,它的作用就是在并发的环境下安全的累加计数值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// x 要累加的数值, wasUncontended 为true时表示上一次操作没有发生竞争
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) { // 如果随机数探针值还没有生成,就调用初始化方法去生成
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // 表示最后一个插槽不为空
for (;;) {
CounterCell[] cs; CounterCell c; int n; long v;
if ((cs = counterCells) != null && (n = cs.length) > 0) { // 当前数组不为空
if ((c = cs[(n - 1) & h]) == null) { // 如果插槽处为空
if (cellsBusy == 0) { // 为true时表示当前没有其它线程在更新CounterCell数组
CounterCell r = new CounterCell(x); // 创建一个新的CounterCell
if (cellsBusy == 0 &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { // 能将CELLSBUSY设置为1意味着可以修改数组
boolean created = false;
try {
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) { // 如果插槽处为空,就将新创建的CounterCell放到该处
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; // 释放锁
}
if (created) // 如果创建成功,退出循环
break;
continue; // 如果该位置已经被其他线程填充,则继续下一次循环
}
}
collide = false;
}
else if (!wasUncontended) // 如果上一次CAS操作失败,则设置wasUncontended为true,准备进行重试
wasUncontended = true;
else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x)) // 通过CAS设置值,成功则退出循环
break;
else if (counterCells != cs || n >= NCPU)
// 如果counterCells数组已经被其他线程改变,或者数组长度已经达到最大值,则设置collide为false
collide = false;
else if (!collide) // 如果上一次没有发生碰撞,设置collide = true;
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
// 如果发生碰撞,并且当前没有线程正在更新counterCells数组,则尝试扩展counterCells数组
try {
if (counterCells == cs) // counterCells没有被其它线程修改,就将counterCells做两倍扩容
counterCells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = ThreadLocalRandom.advanceProbe(h);// 如果所有尝试都失败,则重新计算当前线程应该更新的CounterCell的位置
}
// cellsBusy == 0 为true时表示当前线程有权限访问数组,然后如果能通过CAS将CELLSBUSY从0改为1,那么就执行内部逻辑
// 数组没有变更并且获取锁成功,创建一个CounterCell数组,并将CounterCell放入到该数组中,最后将数组指向counterCells
else if (cellsBusy == 0 && counterCells == cs &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == cs) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
// 如果上述所有尝试都失败,则尝试直接更新baseCount,能成功就终止循环
break;
}
}
扩容机制

主要代码是transfer方法中,大体流程是:

  • 首选会先计算每个线程需要处理的数量,这里是根据CPU的核心数量来计算的,如果是单核,那么每个线程处理的元素数就是原哈希表的长度;如果是多核,那么每个线程处理的元素数量为原哈希表长度的1/8;

  • 然后再判断一下新的哈希表是否创建,未创建就创建一个大小是原哈希表两倍的新哈希表并赋值给nextTable;

  • 接着开启一个循环,自链表尾部向前遍历依次处理哈希表中的元素;

    • 循环内部会先确定当前线程处理元素的范围,主要是通过CAS设置TRANSFERINDEX来实现分段逻辑。假设现在是32扩容到64,那么会先处理的元素范围是[16,31],然后再是[0,15];
    • 然后校验一下索引的有效性
    • 如果索引有效,再看下当前索引处是否有元素,有的话就做迁移,迁移成功就继续下次循环;
    • 如果索引处有元素,但节点的hash值是MOVE,表示已经迁移过了,跳过当前循环,继续下次循环;
    • 最后说明当前节点是要被处理的,先对当前这个节点加一个独占锁,然后处理后续逻辑。这里判断了节点是否是树,树的逻辑暂不做阐述;如果是普通节点或者链表,会先构造两个链表:底链表ln和高链表hn,然后根据算法:节点的hash值和旧哈希表的长度做运算,判断结果是否为0,为0就赋值给ln,表示它在新哈希表中的位置和旧哈希表的位置保持一致;否则就将它赋值给hn,表示它在新哈希表中的位置为它在旧哈希表的索引值+旧哈希表的长度之和。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算每个线程需要处理的元素数量,如果CPU核心数大于1,则每个线程处理的元素数量为旧哈希表长度的1/8,否则每个线程处理的元素数量为旧哈希表的长度
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // stride 的最小值为16
if (nextTab == null) { // 如果新的哈希表为空,就初始化一个
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; // 表示要转移多少个元素
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) { // 确定每个线程应该处理的元素的范围
int nextIndex, nextBound;
if (--i >= bound || finishing)
// 如果当前线程还有哈希表中的元素可以处理,或者已经完成了所有的处理工作,那么就跳出循环
advance = false;
else if ((nextIndex = transferIndex) <= 0) { // 所有数据已处理完毕,可以跳出循环
i = -1;
advance = false;
}
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) { // 通过CAS设置TRANSFERINDEX
// 当前线程能处理的边界值也在这里确定,假设nextIndex为64,stride为16,则边界值为48,区间为[48,63]
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) { // 这里是校验索引i是否有效的
int sc;
if (finishing) { // 为true说明已完成扩容,则清空新创建的nextTable并设置sizeCtl
nextTable = null;
table = nextTab;
// 更新sizeCtl为新的哈希表的大小
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 如果没有完成扩容,则尝试更新sizeCtl
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 如果sizeCtl的值不符合预期,直接返回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 否则,标记为完成扩容,并将i设置为n,准备进行下一轮的检查
finishing = advance = true;
i = n;
}
}
else if ((f = tabAt(tab, i)) == null) // 处理节点为null的情况
// 将tab数组索引i位置从null更新为fwd
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) // 判断是否已经处理过,处理过就跳过这个Node,防止重复处理
advance = true;
else {
synchronized (f) { // 对节点f进行加锁
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn; // 定义两个链表,ln为低位链,hn为高位链
if (fh >= 0) { // 为true说明当前节点是一个正常的节点,需要做转移处理。
int runBit = fh & n; // fn & n, 这个算法用来计算Node在新数组中的位置
Node<K,V> lastRun = f;
// 计算出当前链表最后一个需要迁移或者不需要迁移的节点位置
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 为0,那么节点应该放在新的哈希表的前半部分(索引不变)
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else { // 否则就放在哈希表的后半部分,而它的位置就是原数组中的索引+原数组长度
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将 ln 设置到新的哈希表 nextTab 的 i 位置
setTabAt(nextTab, i, ln);
// 将 hn 设置到新的哈希表 nextTab 的 i + n 位置
setTabAt(nextTab, i + n, hn);
// 将 fwd 设置到旧的哈希表 tab 的 i 位置
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // 处理树的情况,这里不在做具体阐述
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
}
}
}

访问元素

这里展示get方法,即从ConcurrentHashMap中获取key的value值。

首先会通过寻址算法找到索引处的Node,如果值为null就返回null,否则会做如下处理

  • 如果Node的哈希值和key值是否和给定的key一致,一致就返回true
  • 如果Node的哈希值小于0,说明这个是特殊节点,比如是一颗树或者转发节点,做特殊处理
  • 否则就说明节点是一个链表,遍历这个链表,找到与给定key的哈希值和key值都相等的元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { // 索引处不为空,就继续处理
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 哈希值相等,key也相等,返回value值
return e.val;
}
else if (eh < 0) // 哈希值小于0,说明这个是特殊节点,比如是一颗树或者转发节点。这里做了特殊查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 否则就说明是链表了,遍历这个链表,找到与给定key的哈希值和key值都相等的元素
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

移除元素

我们分析下remove方法,即从ConcurrentHashMap中移出这个key值的信息。

从第一场代码可以看出,它是调用了replaceNode(key, null, null)。

1
2
3
public V remove(Object key) {
return replaceNode(key, null, null);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null) // 如果元素为空,直接终止循环
break;
else if ((fh = f.hash) == MOVED) // 如果节点被移动,就进行迁移操作
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) { // 对节点进行加锁
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 普通节点,或链表
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null) // 如果是链表,改变指针指向。
pred.next = e.next;
else // 说明当前节点只有一个元素,通过CAS将索引处设置为e的下个节点,可能是null,也可能是下个正常节点
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) { // 处理树的情况
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}

如何计数

最后我们看下size方法,主要是调用sumCount方法。我们之前已经知道ConcurrentHashMap是将数量维护在baseCount字段和CounterCell数组中的,所以数量多计算也是baseCount的值加上所有CounterCell数组中的value值。

1
2
3
4
5
6
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
1
2
3
4
5
6
7
8
9
10
final long sumCount() {
CounterCell[] cs = counterCells;
long sum = baseCount;
if (cs != null) {
for (CounterCell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}