java集合:concurrentHashMap源码解析


一个线程安全且高效的集合。在日常开发中使用常用的集合有很多,HashMap肯定是名列前茅,因为其性能高效,但不安全。如果想使用线程安全的集合ConcurrentHashMap是首选之一,它是HashMap的线程安全版本,不仅保留其高效的性能,还保证多线程下的安全问题。那么它是如何保证高效且安全的?下面通过源码解析看看它是如何做到

特性

先看看ConcurrentHashMap有哪些特性:

  • 同hashmap一样,但是能保证其线程安全,并且k,v不能为null。
  • 底层数据结构:数组 + 链表 +红黑树
  • 通过CAS + synchronized保证线程安全

结构

成员属性

// 最大容量值
private static final int MAXIMUM_CAPACITY = 1 << 30;

// 默认容量值16
private static final int DEFAULT_CAPACITY = 16;

// 最大的数组大小(非2的幂) toArray和相关方法需要(并不是核心属性)
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// jdk1.7遗留下来的,用来表示并发级别的属性
// jdk1.8只有在初始化的时候用到,不再表示并发级别了~ 1.8以后并发级别由散列表长度决定
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 负载因子:表示散列表的填满程度~ 在ConcurrentHashMap中,该属性是固定值0.75,不可修改~
private static final float LOAD_FACTOR = 0.75f;

// 树化阈值:数组中链表长度达到8时候,可能发生链表树化
static final int TREEIFY_THRESHOLD = 8;

// 反树化阈值:数组中的红黑树元素个数小于6时候,将红黑树转换回链表结构
static final int UNTREEIFY_THRESHOLD = 6;

// 数组长度达到64,且某个索引位置的数组中链表长度达到8,才会发生树化
static final int MIN_TREEIFY_CAPACITY = 64;

// 控制线程迁移数据的最小步长(桶位的跨度~)
private static final int MIN_TRANSFER_STRIDE = 16;

// 固定值16,与扩容相关,计算扩容时会根据该属性值生成一个扩容标识戳
private static int RESIZE_STAMP_BITS = 16;

// (1 << (32 - RESIZE_STAMP_BITS)) - 1 = 65535:1 << 16 -1
// 表示并发扩容最多容纳的线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

// 也是扩容相关属性,在扩容分析的时候会用到~
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

// 当node节点的hash值为-1:表示当前节点是FWD(forwarding)节点(已经被迁移的节点)
static final int MOVED     = -1;
// 当node节点的hash值为-2:表示当前节点已经树化,且当前节点为TreeBin对象~,TreeBin对象代理操作红黑树
static final int TREEBIN   = -2; 
// 当node节点的hash值为-3:
static final int RESERVED  = -3;
// 0x7fffffff 十六进制转二进制值为:1111111111111111111111111111111(31个1)
// 作用是将一个二进制负数与1111111111111111111111111111111 进行按位与(&)运算时,会得到一个正数,但不是取绝对值
static final int HASH_BITS = 0x7fffffff; 

// 当前系统的CPU数量
static final int NCPU = Runtime.getRuntime().availableProcessors();

// JDK1.8 序列化为了兼容 JDK1.7的ConcurrentHashMap用到的属性 (非核心属性)
private static final ObjectStreamField[] serialPersistentFields = {
    new ObjectStreamField("segments", Segment[].class),
    new ObjectStreamField("segmentMask", Integer.TYPE),
    new ObjectStreamField("segmentShift", Integer.TYPE)
};

//容器: 真正用于存储元素的数组
transient volatile Node<K,V>[] table;

// 临时容器:扩容过程中,会将扩容中的新table赋值给nextTable,(保持引用),扩容结束之后,这里就会被设置为NULL
private transient volatile Node<K,V>[] nextTable;

// 与LongAdder中的baseCount作用相同: 当未发生线程竞争或当前LongAdder处于加锁状态时,增量会被累加到baseCount
private transient volatile long baseCount;

// 表示容器table的状态: 
// sizeCtl<0时:
// 情况一、sizeCtl=-1: 表示当前table正在进行初始化(即,有线程在创建table数组),当前线程需要自旋等待...
// 情况二、表示当前table容器正在进行扩容,高16位表示扩容的标识戳,低16位表示扩容线程数:(1 + nThread) 即,当前参与并发扩容的线程数量。
// sizeCtl=0时:表示创建table容器时,使用默认初始容量DEFAULT_CAPACITY=16
// sizeCtl>0时:
// 情况一、如果table未初始化,表示初始化大小
// 情况二、如果table已经初始化,表示下次扩容时,触发条件(阈值)
private transient volatile int sizeCtl;

// 扩容过程中,记录当前进度。所有的线程都需要从transferIndex中分配区间任务,并去执行自己的任务
private transient volatile int transferIndex;

// LongAdder中,cellsBusy表示对象的加锁状态:
// 0: 表示当前LongAdder对象处于无锁状态
// 1: 表示当前LongAdder对象处于加锁状态
private transient volatile int cellsBusy;

// LongAdder中的cells数组,当baseCount发生线程竞争后,会创建cells数组,
// 线程会通过计算hash值,去取到自己的cell,将增量累加到指定的cell中
// 总数 = sum(cells) + baseCount
private transient volatile CounterCell[] counterCells;

// Unsafe 类:用于提供一些底层的操作。可以直接操作内存
private static final sun.misc.Unsafe U;

构造方法

/**
* 创建一个默认的容器,大小为16
*/
public ConcurrentHashMap() {
}

/**
* 指定容器的初始化大小
*/
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
            MAXIMUM_CAPACITY :
            tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

/**
* 根据给定的map构造一个容器
*/
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    // 1.5倍的初始容量+1,再往上取最接近的2的整数次方
    // 初始化时只是设置了初始值,并没有初始化数组,懒加载,put时在初始化数组
    this.sizeCtl = cap;
}


/**
* 指定容器大小和负载系数(为了兼容历时版本)
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

/**
* 为了兼容历时版本,java8已经没有了显式设置扩容因子的定义,扩容因子固定是3/4,也没有了concurrencyLevel并发级别的概念。
*/
public ConcurrentHashMap(int initialCapacity,
                     float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // concurrencyLevel的用处不大,为了兼容java7版本
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    // initialCapacity和loadFactor都是使用者外部传入,所以可对initialCapacity进行优化
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    // 获取一个大于等于且离size最近的2的整数次方的数
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    // 此时sizeCtl存的是初始容量
    this.sizeCtl = cap;
}

tableSizeFor方法

获取大于等于且离传入值最近的2的整数次的数。其运算过程是使一个数二进制最左边的1右边位全部转化为1,然后+1就可得一个2的整数次方的数。

/**
 * 找到大于等于c的最近2的整数次的数
 * @param c
 * @return
 */
private static final int tableSizeFor(int c) {
    // 为什么要减1呢?
    int n = c - 1;
    // 向右移1位,与旧值|,逻辑上可得到2个1
    n |= n >>> 1;
    // 向右移2位,与旧值|,逻辑上可得到4个1
    n |= n >>> 2;
    // 向右移4位,与旧值|,逻辑上可得到8个1
    n |= n >>> 4;
    // 向右移8位,与旧值|,逻辑上可得到16个1
    n |= n >>> 8;
    // 向右移16位,与旧值|,逻辑上可得到32个1
    n |= n >>> 16;
    // n+1 得到大于等于c的最近2的整数次的数
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

put方法

添加元素的整体流程如下:

  1. table 如果为空,先初始化 table,也就是第一次扩容。
  2. 计算 key-value 要插入的桶的位置 i。
  3. table[i] 如果为空,直接将 key-value —> Node,添加到 table[i] 位置处。
  4. table[i] 不为空,那说明 i 位置的桶中有元素,要么是链表,要么是树。key-value要么追加,要么替代。
  5. 如果 table[i].hash == -1,说明此时的 table 正在执行扩容,而且 i 桶已经被迁移过了,此时再添加元素是无效的。所以当前线程执行协助扩容,在下一次循环的时候再添加元素。
  6. 当桶中是链表时,遍历链表,执行替代或者追加元素。
  7. 当桶中是树时,执行树的替代或者追加。
  8. 元素插入后,再统计 table 中所有元素的个数,如果超过了阈值,还要执行扩容。
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key 和 value 不能为 null
        if (key == null || value == null) throw new NullPointerException();

        int hash = spread(key.hashCode());    // 计算 key 的 hash 值,计算方式和 HashMap 类似
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            // f 为插入数组位置的头结点,n 为数组长度,i 为数组插入位置,fh 是 f 的 hash 值
            Node<K,V> f; int n, i, fh;    

            if (tab == null || (n = tab.length) == 0)    // 第一次插入时,初始化数组
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    // 插入的数组位置为 null
                // 通过 CAS 的方式在该位置放入新节点
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))    
                    break;                   
            }
            else if ((fh = f.hash) == MOVED)    // 该位置的元素已被移动,说明正在扩容,去协助扩容
                tab = helpTransfer(tab, f);
            else {    // 添加节点到该数组位置的链表或红黑树上
                V oldVal = null;
                synchronized (f) {    // 插入过程对插入的数组位置加锁
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {    // 头结点的 hash >= 0, 说明是链表
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {    // 遍历链表
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {    // 已存在相同节点,判断是否要更新 value
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {    // 不存在相同节点,将新节点插入到链表尾部
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {    // 插入到红黑树中
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {    // 检测节点个数
                    // 当链表节点达到树化阀值(8),将链表转化为红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount); // 更新相关容量,达到扩容阀值时进行扩容
        return null;
    }    

put方法解析:首先检查k,v是否为null否则抛空指针,通过spread(key.hashCode());方法计算一个hash值,进入无边界循环开始存放元素,循环中有4大分支:第一分支首先检查容器是否初始化,如果没有则开始初始化调用 initTable();(可能会有多个线程进入初始化,initTable()保证了多线程下的初始化安全问题),第二个分支计算元素在数组中的位置(计算下标: i = (n - 1) & hash),调用了unsafe类的getObjectVolatile()方法获取数组的在其索引的位置(该方法能强制从主内存中获取属性值),如果对应的下标处没有值,那么就通过cas的方式存放元素(cas能保证其原子性,如果多个线程同时存放元素,那么就只有一个线程能存放成功并且跳出,其他失败的线程继续循环),第三个分支通过k的hash值判断该元素是否正在迁移,(fh = f.hash) == MOVED),如果正在迁移那么就帮助扩容。第四个分支,如果容器初始化完成,并且产生了hash冲突,也没有正在扩容,那么就开始形式链表或红黑树,通过synchronized锁住数组中对应下标的元素也就是链表的表头,锁住表头来保证链表在插入元素时的安全问题,在链表存放元素时,通过(tabAt(tab, i) == f)来判断是否要操作链表(为什么用了锁之后还要这个判断?因为有可能其他线程正在扩容,导致此处的值已经变了),如果不操作就继续循环,否则就又有两个分支,一:fh>=0 的话遍历链表存放元素,并且根据遍历的次数叠加bincount(此值是是否形成红黑树的依据和扩容检查依据)。二:是否是树,如果是调用插入树的方法,锁结束之后bincount如果不等0那么说明有元素插入,bincount大于8进入形成红黑树方法treeifyBin(tab, i);返回被替换的值。以上操作都是通过无边界循环加上cas以及synchronized的方式保证了多线程下的安全问题。最后调用addcount检查是否需要扩容和计算size()的大小(如果有值被替换则无需此步)。

put方法中一些值得思考的问题:

  1. 为什么key和value为null直接抛出异常(不允许存在null值)?
    因为程序中null表示不存在,如果可以添加null值,ConcurrentHashMap没法区分这个值到底存不存在,因为存在和不存在都是null,而ConcurrentHashMap需要保证线程安全,所以添加null时直接抛异常。

initTable方法

是如何保证多线程下的初始化安全问题?看如下代码

    /*
     * sizeCtl的作用:
     * 值为0,表示数组未初始化,并且数组的初始容量是16
     * 为正数,表示如果数组未初始化,则记录的是数组的初始容量;如果数据已经初始化,则记录的是数组的扩容阈值
     * 为-1,表示数组正在进行初始化
     * 非-1的负数,表示数组正在扩容,-(1+n)的值为正在完成数组扩容的线程数量
     * */
    private final Node<K, V>[] initTable() {
        Node<K, V>[] tab;
        int sc;
        //第①步,判断数组是否未初始化
        while ((tab = table) == null || tab.length == 0) {
            //第②步,用sc保存sizeCtl的值,作为后面CAS的预期值
            //第③步判断sizeCtl的值是否<0,是的话则发现有其他线程在做数据的初始化,让出CPU
            if ((sc = sizeCtl) < 0)
                Thread.yield();

            //走到这里,说明sizeCtl的值大于或等于0,则有两种清况:一是数组未初始化。
            //二是有其他线程在第①步之后,第②步之前将数组初始化完成,此时sizeCtl为数组的扩容阈值.

            //第④步,CAS将SIZECTL值修改为-1,表示本线程开始进行数组的初始化
            //如果修改成功,开始初始化操作;如果修改失败,则表示有其他线程在①②之后抢先修改了SIZECTL
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    /*第⑤步,double check数组是否初始化,这里为什么要采用双重校验呢?
                     *因为在数组初始化完成之后,sizeCtl的值会,被改成数组的扩容阈值,会是一个大于0的值。
                     *所以完全有可能本线程在①之后,有其他线程完成了数组的初始化全过程,
                     * 使得本线程也能进到这个代码块里来。
                     */
                    if ((tab = table) == null || tab.length == 0) {
                        //第⑥步,执行哈希表的创建工作,
                        //确定哈希表的容量
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //创建哈希表
                        Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
                        table = tab = nt;
                        //第sc=n-n/4,作为扩容的阈值,赋值给sc
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //第⑦步,如果初始化完成,sizeCtl记录的是扩容阈值;
                    //如果初始化失败,则还原sizeCtl
                    sizeCtl = sc;
                }
                //走到这里,说明本线程初始化完成了或者其他线程在本线程的
                //①、②两步之间完成了哈希表的初始化全过程,此时结束循环
                break;
            }
            //走到这里,说明在发现数组未初始化之后,准确初始化之前,
            //有其他线程已经抢先开始初始化了
            //但是其他线程是否将初始化的工作全部正确的完成,并不知道,所以重新开始循环检查
        }
        return tab;
    }

initTable方法解析:通过cas+自旋的方式不断尝试初始化容器。
首先判断当前容器是否为空,为空检查sizectl是否为负数,如果为负数表示有线程正在初始化,那么让出cpu的执行权,否则通过cas的方式该表sizectl的大小为-1,开始初始化容器,再次检查容器是否为空,这里的检查是为了避免多个线程同时初始化或者说只能有一个线程进来。为什么呢?因为此处当一个线程刚对table赋完值,将sizectl改为阈值大小的时候其他线程是可以进来的,如果其他线程对table赋值,table是被volatile修饰的,其他线程是能立马感应到table的变化,所以再当其他线程想要初始化的时候就进不去了。只能跳出当前自旋结束方法。

Unsafe类

Unsafe类的作用?
在ConcurrentHashMap中用于乐观锁的实现,提供直接管理内存的操作。

Java并发工具包JUC的基石。Java和C++、C语言的一个重要区别,就是Java中我们无法直接操作一块内存区域,而在C++、C中却可以自己申请内存和释放内存。Unsafe类的设计,为我们提供了手动管理内存的能力。使用Unsafe类直接操纵内存,意味着速度更快,效率更高,但也更加危险。

addCount方法

addCount方法做了 2 件事情:

  1. 对 table 的长度加一。无论是通过修改 baseCount,还是通过使用 CounterCell。当 CounterCell 被初始化了,就优先使用他,不再使用 baseCount。
  2. 检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。
// 从 putVal 传入的参数是 1, binCount,binCount 默认是0,只有 hash 冲突了才会大于 1.且他的大小是链表的长度(如果不是红黑数结构的话)。
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 如果计数盒子不是空 或者
    // 如果修改 baseCount 失败
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // 如果计数盒子是空(尚未出现并发)
        // 如果随机取余一个数组位置为空 或者
        // 修改这个槽位的变量失败(出现并发了)
        // 执行 fullAddCount 方法。并结束
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    // 如果需要检查,检查是否需要扩容,在 putVal 方法调用时,默认就是要检查的。
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容) 且
        // table 不是空;且 table 的长度小于 1 << 30。(可以扩容)
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            // 根据 length 得到一个标识
            int rs = resizeStamp(n);
            // 如果正在扩容
            if (sc < 0) {
                // 如果 sc 的低 16 位不等于 标识符(校验异常 sizeCtl 变化了)
                // 如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
                // 如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
                // 如果 nextTable == null(结束扩容了)
                // 如果 transferIndex <= 0 (转移状态变化了)
                // 结束循环 
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 扩容
                    transfer(tab, nt);
            }
            // 如果不在扩容,将 sc 更新:标识符左移 16 位 然后 + 2. 也就是变成一个负数。高 16 位是标识符,低 16 位初始是 2.
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 更新 sizeCtl 为负数后,开始扩容。
                transfer(tab, null);
            s = sumCount();
        }
    }
}

addCount方法解析:

  1. 此方法主要是计算元素个数和是否需要扩容。
  2. 那么此方法是如何保证多线程情况下,size的叠加安全的?首先先判断counterCells是否被初始化过,如果已经初始化了那么直接使用countercells数组来记录size,如果还没初始化,那么采用cas的方式叠加basecount(cas在多线程情况下能保证原子性,如果多个线程竞争此方法,只有一个成功),叠加成功则判断是否需要进行扩容,失败则表示此处存在多个线程竞争,那么就调用fulladdcount()方法开始初始化countercells数组,在调用fulladdcount()方法前,需要判断countercells是否被初始化过,长度是否大于1,ThreadLocalRandom.getProbe() & m 算出countercells下标位置是否有元素,如果有元素那么通过cas方式叠加countercells数组对象中的value值,如果叠加失败那么说明存在多线程竞争,调用fulladdcount()结束方法。如果计算器叠加成功,那么判断check的值是否大于1,大于1则计算出size的值,去检查是否需要扩容,不大于1说明容器有位置可以存放元素或者容器位置的链表的长度最多为2,无需扩容检查(此处待探索),如果走到check>=0 那么就需要检查扩容,从put方法进来的一定满足这个条件,首先是一个无边界的循环,只有size大于sizectl才会触发扩容(容器不为空并且长度小于最大容器值),通过int rs = resizeStamp(n);计算扩容标记,判断sc<0 ,sc是当前赋值sizectl的值,如果为负数则表示正在扩容,否则通过cas的形式(rs << RESIZE_STAMP_SHIFT) + 2 计算得出的数,高16位是扩容标记,低16位标识正在扩容的线程数) 调用transfer()方法扩容。如果正在扩容,那么试着看看是否需要协助扩容,协助扩容的前提条件:

     ((sc >>> RESIZE_STAMP_SHIFT) != rs   --->sc的高16位不等标识符位 表示扩容异常退出
      || sc == rs + 1  --> 表示扩容结束。 因为默认第一次扩容 sc=sizectl =rs<<16+2 扩容完成 sizectl-1  所以扩容完成时 rs+1 =sc
      || sc == rs + MAX_RESIZERS  -->已经共同线程扩容数已经到达最大
      || (nt = nextTable) == null   --> 表示扩容刚刚进入还未初始化或者扩容已经结束
      ||
      transferIndex <= 0)  -->同上
    

如果都没有这些阻碍那么调用transfer()开始协助扩容 。
不管是协助扩容还是正常扩容都会计算size的大小

fullAddCount方法

fullAddCount主要是用来初始化CounterCell,来记录元素个数,里面包含扩容,初始化等操作

private final void fullAddCount(long x, boolean wasUncontended) {
        int h;

        //获取当前线程的probe的值,如果值为0,则初始化当前线程的probe的值,probe就是随机数
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();

            // 由于重新生成了probe,未冲突标志位设置为true
            wasUncontended = true;
        }

        boolean collide = false;                // True if last slot nonempty

        //自旋
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;

            //说明counterCells已经被初始化过了,我们先跳过这个代码,先看初始化部分
            if ((as = counterCells) != null && (n = as.length) > 0) {

                // 通过该值与当前线程probe求与,获得cells的下标元素,和hash 表获取索引是一样的
                if ((a = as[(n - 1) & h]) == null) {

                    //cellsBusy=0表示counterCells不在初始化或者扩容状态下
                    if (cellsBusy == 0) {            // Try to attach new Cell

                        //构造一个CounterCell的值,传入元素个数
                        CounterCell r = new CounterCell(x); // Optimistic create

                        //通过cas设置cellsBusy标识,防止其他线程来对counterCells并发处理
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;

                                //将初始化的r对象的元素个数放在对应下标的位置
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //恢复标志位
                                cellsBusy = 0;
                            }
                            if (created)
                                break;

                            //说明指定cells下标位置的数据不为空,则进行下一次循环
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //说明在addCount方法中cas失败了,并且获取probe的值不为空
                else if (!wasUncontended)       // CAS already known to fail
                    //设置为未冲突标识,进入下一次自旋
                    wasUncontended = true;      // Continue after rehash

                //由于指定下标位置的cell值不为空,则直接通过cas进行原子累加,如果成功,则直接退出
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;

                //如果已经有其他线程建立了新的counterCells或者CounterCells大于CPU核心数(很巧妙,线程的并发数不会超过cpu核心数)
                else if (counterCells != as || n >= NCPU)
                    //设置当前线程的循环失败不进行扩容
                    collide = false;            // At max size or stale

                //恢复collide状态,标识下次循环会进行扩容
                else if (!collide)
                    collide = true;

                //进入这个步骤,说明CounterCell数组容量不够,线程竞争较大,所以先设置一个标识表示为正在扩容
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            //扩容一倍 2变成4,这个扩容比较简单
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        //恢复标识
                        cellsBusy = 0;
                    }
                    collide = false;

                    //继续下一次自旋
                    continue;                   // Retry with expanded table
                }

                //继续下一次自旋
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //cellsBusy=0表示没有在做初始化,通过cas更新cellsbusy的值标注当前线程正在做初始化操作
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        //初始化容量为2
                        CounterCell[] rs = new CounterCell[2];
                        //将x也就是元素的个数 放在指定的数组下标位置
                        rs[h & 1] = new CounterCell(x);
                        //赋值给counterCells
                        counterCells = rs;
                        //设置初始化完成标识
                        init = true;
                    }
                } finally {
                    //恢复标识
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                //竞争激烈,其它线程占据cell 数组,直接累加在base变量中
                break;                          // Fall back on using base
        }
    }

fullAddCount方法解析:此方法作用主要是在多线程在叠加baseCount失败时,去叠加countercells的值,从而达到共同计算size的长度。
首先第一次进入会拿到当前线程的探侦值(探针值用于计算countercells的下标位置),首次进入默认表示没有线程竞争此方法,然后就是通过自旋的方式去初始化,扩容,叠加计算器。进入自选,分为三大步:

  1. 判断countercells是否为空,为空则通过cas的方式设置cellsbusy去初始化计算器,通过双重检查和cas的方式防止重复初始化,初始化为创建countercell长度为2的数组并且设置其下标对象的值为1,初始化完成则跳出自选。

  2. 如果cuntercells未被初始化,并且当前线程没有争夺到初始化资格,那么当前线程就会去叠加basecount的值,如果叠加成功则跳出自选,否则继续自选

  3. countercells已经初始化过了就又分为六步。_
    1:那么通过探针值计算出数组对应下标位置是否为空,如果此下标没有元素则检查cellsbusy==0 (通过cellsbusy等于判断是否有其他线程正在操作计算器)成立则创建新的元素并且通过cas的方式设置计算器为忙碌状态,此时就是开始设置对应下标元素,先创建created为false,在设置元素之前再次检查确认对应位置是否存在元素,不存在则直接设置元素并且created的改为true,到最后不管是否存放过元素都会将cellbusy改为空闲,判断是否创建过了新元素,创建了则说明该线程目的达到,跳出循环,如果没有创建那么说明该线程还未做任何事,则继续自旋。

    _2:如果wasUncontended为false那么就说明在调用此方法之前存在线程竞争并且该线程竞争失败了,那么将wasUncontended改为true继续自旋(为什么要这样操作呢?因为走完了这一步说明:存在多个线程竞争,有可能尝试过了设置计算器对应下标失败,也又可能当时对应下标存在元素,那么继续自选之后会重新获得最新的计算器,重新获取下标,重新设置对应元素,如果还是不行那么就继续下面的分支,这样的好处是尽量阻止了多个线程竞争扩容计算器)。

    3:如果没有多个线程竞争此方法,对应下标也有元素,那么通过cas的方式叠加对应下标对象的值,叠加完成了就表示任务完成跳出循环。

    4:判断计数器是否已被重新创建或者计算器的长度大于cpu的核心数,如果成立那么就不再进行扩容。_5:恢复collide的值为true,表示下次循环如果还是无法叠加值那么就进行扩容。_6:到了这一步表示自旋了这么多次,计算器长度无法满足多线程竞争了,既然阻止不了,那么就只能进行扩容。通过cas设置计算为忙碌,扩容之前检查计算器是否已被改变,没有则扩容一倍,迁移元素,调整计算器为空闲,继续自旋。最后如果这几步都不行,那么就更新当前线程的随机值

扩容方法

transfer 方法

在每次往map中put数据时会重新计算map中的size,如果达到扩容阈值就会触发扩容逻辑,扩容因子是0.75,即:当容量达到总容量的3/4时,触发扩容

  1. ConcurrentHashMap扩容时新建数组
  2. ConcurrentHashMap扩容时获取迁移数据区域
// tab是旧表,nextTab是一个两倍容量的空的新表
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        // stide是步长的意思,会将旧表按照这个步长进行分段,在并发扩容时,每个线程只负责自己段内数据的转移
        int n = tab.length, stride;
        // NCPU是当前系统的内核数,如果内核数只有一个,那么就不需要进行并发扩容,因为扩容是个纯计算密集型的逻辑,只有一个核心的时候反而得不偿失,因为无法真正的并发,反而会额外付出线程上下文切换的开销
        // 这里步长最小是16,也就是说每个线程最少要负责16个桶的数据迁移,这个值设置的太小会导致并发线程数增多,从而导致线程间的竞争变大,这个竞争是只下面的一些CAS逻辑,比如对transferIndex、sizeCtl变量的cas操作
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                // 如果新表没有初始化,则新建一个双倍容量的新表
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            // 设置【开始数据转移】的桶的下标,从尾部的桶开始往头部逐个处理,将transferIndex设置为老表的length(比最后一个桶的下标大1,所以后面的代码会-1)
            transferIndex = n;
        }
        int nextn = nextTab.length;
        // 生成用于表示【扩容中】状态的节点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 线程每次根据步长从数组上截取一段桶,如果线程处理完自己截取的一段内的桶后,还有未处理的数据,则需要重新从数组上截取一段来处理
        // true则标识当前线程需要继续在老表的数组上截取新的一段桶来处理数据(可能没有线程来帮忙,就只能自己一个人干完了)
        boolean advance = true;
        // 标记是否已结束扩容,做收尾工作
        boolean finishing = false; // to ensure sweep before committing nextTab
        // i是当前线程需要转移的桶的下标,bound是当前线程负责的桶的最小下标
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // 这个while的逻辑就是为了检查当前线程负责的段内的桶是否都处理完毕,如果处理完毕,则看下老表的数据里是否还有未处理的桶,如果有未处理的桶,则再次截取一段来处理
            while (advance) {
                int nextIndex, nextBound;
                // 如果下一个桶的下标(--i是下一个需要操作的桶的下标)还在自己负责的段内,就不需要截取新段了,就继续处理下一个桶的数据
                // 如果已经结束,则不需要继续截取新的段
                if (--i >= bound || finishing)
                    advance = false;
                // transferIndex用来表示这个下标及其后面的桶都已经被其他线程处理了,新的线程需要从transferIndex往前截取自己需要负责的桶,如果transferIndex小于等于0说明桶都已经转移完毕,不需要再处理了    
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 以nextIndex(在上面已经赋值为transferIndex)为起始位置,往数组头部方向截取相应步长的段来转移数据,通过cas将transferIndex设置到新的下标
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    // cas成功后,设置当前线程负责的下标边界(比如负责下标32到48的桶,那么这个bound就是32)
                    bound = nextBound;
                    // cas成功后,设置当前线程开始处理的桶的下标(比如负责下标32到48的桶,那么这个i就是48)
                    // transferIndex默认是从tab.length开始取值,所以要减1来表示正确的下标
                    i = nextIndex - 1;
                    // cas成功则表示当前线程已经成功截取了自己需要负责的一段数据了,不需要再往前截取了
                    advance = false;
                }
            }
            // i是需要转移的桶的下标,n是老表的容量
            // i<0说明旧表中的桶都已经转移完毕
            // i>=n|| i + n >= nextn 不是很明白这个判断条件,正常情况下,i作为开始转移的桶的下标肯定会小于老表的容量的,因为转移的是老表内的桶
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 判断是否已经完成扩容,已完成扩容则做收尾逻辑
                if (finishing) {
                    // 完成扩容后,将引用设置为null
                    nextTable = null;
                    // 将table引用指向新表,这里的table是个volatile变量,所以这个赋值操作对其他线程是可见的
                    table = nextTab;
                    // 设置新的扩容阈值,将阈值设置为新容量的3/4
                    // 这里的n是老表的容量,因为是双倍扩容,所以新表容量是2n,下面计算的结果是2n-0.5n = 1.5n,也就是新表容量的3/4
                    sizeCtl = (n << 1) - (n >>> 1);
                    // 返回结果,扩容结束
                    return;
                }
                // 在扩容开始时,会将sizeCtl设置成一个负数,每次有新的线程并发扩容时,会将sizeCtl+1,而当有线程处理完扩容逻辑后,再减1,以此来判断是否是最后一个线程
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // cas成功,则判断当前线程是不是最后一个完成扩容的线程,由最后一个完成扩容逻辑的线程将finishing和advance设为true,重新循环到上面的if(finishing)里的收尾逻辑
                // 这里减2是因为在执行扩容的入口处,第一个触发扩容的线程会负责将sc加2,至于为什么第一个扩容的线程要加2,而不是加1,这个没理解
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 如果是空桶,则在老表对应的下标出放一个ForwardingNode,在有别的线程往这个空桶写数据时会感知到扩容过程,一起来扩容
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // ForwardingNode节点的hash值就是MOVED(在ForwardingNode的构造方法里会设置hash值为MOVED),说明已经有线程处理了这个桶内的数据
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
            // 在桶上加锁,防止对同一个桶数据并发操作
                synchronized (f) {
                // 有点双重校验锁的味道,防止获得锁后,该桶内数据被别的线程插入了新的数据,因为这个f是在未加锁之前获取的node对象,在这期间,可能这个下标处插入了新数据
                // 比如有别的线程调用了put方法往这个桶内链表插入新节点,这时候这个桶的node就变成了新插入的数据node(put操作会生成新的node,并将新node的next引用指向原node)
                // 如果不做这层校验,会导致新加入到桶内的数据没有被处理,导致数据丢失
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // fh>=0表示是正常的链表
                        if (fh >= 0) {
                        // 这里需要注意的是在put操作里是通过hash&(n-1)来选取下标位置,表容量n都是2的幂,所以这里hash&n的结果只有两个要么是n要么是0
                        // 值为0时的节点在新表的i下标出,而值为n的节点需要迁移到新表的i+n下标下,因为是双倍扩容
                        // 所以老表下标为i的桶内的数据在迁移rehash时,一半仍然在下标为i的桶内,另一半在i+n的桶内,不会出现第三种情况
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            // 这个循环的目的有两个
                            // 1、遍历出整个链表尾部不需要改变next指针的最长链,这样可以将这个链整个搬到新桶内,不用再逐个遍历了
                            // 2、由于是将老的完整节点链条搬到新桶内,所以也就不需要创建新的node节点,减少迁移过程中的gc压力
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                // 逐个遍历节点,0表示仍然放到下标为i的桶内的链表
                                // 这里每次都是生成新的node对象而不修改原node对象的next指针,这也是get()方法不用加锁的关键所在
                                // 但是会带来gc压力,所以才有上面的那次遍历,希望减少对象的创建
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                // 否则就是放到下标为i+n的桶内的链表
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // 设置新表的下标为i的桶内数据
                            setTabAt(nextTab, i, ln);
                            // 设置新表的下标为i+n的桶内数据
                            setTabAt(nextTab, i + n, hn);
                            // 将老表下标为i的桶内放上ForwardingNode对象,用来标识当前处于扩容过程
                            setTabAt(tab, i, fwd);
                            // 处理完后,将这个字段设置为true,以便走到上面的while(advance)里检查当前线程负责的数据是否处理完成,并且查看是否需要截取新段
                            advance = true;
                        }
                        // 红黑树结构的迁移,逻辑与链表差不多,也是将整棵树拆成两颗,一棵树放到下标为i的桶内,一棵放到下标i+n的桶内
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

transfer方法解析:该方法主要是针对容器进行扩容,支持多个线程同时i扩容。

  1. 该方法默认第一次进入创建新容器(新容器的长度为旧容器的两倍),计算每个线程负责的的元素个数((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 这种计算方式配合划分区间数据的方式,当只有一个cpu的时候就让一条线程负责,多个cpu的时候将容器划分为 cpu个数*8 等分,每条线程负责各自的区间。
    那么它是如何做到的?首先当单个线程初始化新容器(如果初始化失败则直接设置阈值为 Integer.MAX_VALUE;表示无法继续扩容)。
    变量赋值及意义:
    • transferIndex赋值记录旧数组的长度,在之后扩容的时候transferIndex不断的变化记录下一个要处理容器区间的边界值。
    • 创建ForwardingNode节点,此后每处理一个容器对应的下标元素,旧容器的节点处由此对象替换,表示此节点已经迁移,容器正在扩容。
    • 容器扩容时因为考虑到多线程扩容存在分配容器区间,advance用于在分配区间时或者处理区间元素或者完成区间元素迁移时跳出 容器区间自旋。
      finishing 只有这个变量为true的时候才是真正扩容完成。
  2. 进入扩容自旋,设置当前线程的区间索引位置,以及区间边界值。紧接着又是自旋,开始计算当前线程的区间的起始和结束索引位置,一共三步(这个自旋的作用刚开始是用来计算步长,然后每次处理完一个区间的桶就向前推进一步,或者当处理完当前区间处理完成之后继续计算下一个区间,或者当所有的区间处理完就设置变量让外层跳出扩容),
    • 首先进入的是第三个条件,通过cas的方式计算区间的步长,并且用transferindex记录下一个区间的临界值,设置当前线程的始末索引,跳出自旋。
    • 第一个条件结束位置索引减一(即向前推进一步,处理下一个桶),是否超出边界(即当前区间元素是否处理完毕),如果处理完毕判断是否完成扩容,没有的话继续自旋,如果都没有则跳出自旋,前进一步处理下一个位置。
    • 第二个条件通过transferindex记录的值(下一个边界)赋值给当前线程下一个边界值,如果<=0那么表示容器处理完了,跳出自旋
  3. 自旋外面紧接着又是四大步,
    • 第一步当前桶位置是否为空,为空则用fwd占用表示正在迁移,
    • 第二步根据当前元素hash判断是否正在迁移,
    • 第三步如果当前桶存在元素且还未迁移那么加锁 锁住链表的头开始迁移到新容器,加锁之后判断当前元素是否被其他线程迁移了,没有继续,hash是否为正数,通过fn&n将当前桶的链表分为两部分,通过遍历链表找到最后一个 hash & n 不相同的元素,正是此元素将链表分为两部分,之后通过找到的元素hash计算是否为0,再次循环链表创建两条新的链表,分别放入容器i和i+原数组长度位置,设置旧数组桶为fwd。
    • 第四步判断i是否为负数以及i >= n || i + n >= nextn 为什么这么判断待解析,如果进入此步那么至少说明当前线程的区间处理完毕。首先判断是否扩容标识是否完成,如果没有则使用cas的方式将sizectl减一,此时的sizectl为负数表示正在扩容,而且减一表示一个线程协助扩容完成,cas成功那么再次判断((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)是否成立,不成立则退出,结束当前线程协助扩容(表示还有线程在扩容。为什么?因为第一个线程进来的时候设置这两个值相等的,此后有线程进入就在此基础上加一,每次有一个线程扩容完成就减一,也就是说只有最后一个线程退出的时候此条件才相等,不等就说明还有线程扩容)如果还能往下走说明最后一个线程扩容完毕了,设置扩容标识为完成,重新设置i为原来容器长度重新检查一次是否有遗漏元素未被迁移。如果扩容标识完成了则清空协助扩容容器,将新数组赋值给容器,重新设置阈值,最后结束扩容(正真结束扩容的线程最多且最少要进入第四步两次)。

如何保证线程扩容时的线程安全问题?

这里扩容时存在的线程安全问题无非两种情况。

  • 第一是扩容时正在操作map。
    一、在触发扩容流程时,需要通过CAS将sizeCtl从正数改成负数,并且+2,这样只会有一个线程cas成功,避免其他的写线程也触发扩容流程。
    二、怎么避免其他写线程往处于扩容中、扩容完毕的桶里写数据,因为扩容线程是遍历桶内的链表或者B树来rehash,如果往已经遍历的链表或者B树中插入新数据,扩容线程是无法感知到的,会导致新表中没有这些数据,这个要结合put(k,v)方法来说,对于空桶来说,不管是put操作还是扩容操作,都是通过cas操作来往空桶中添加数据,所以在出现并发往空桶写时,只会有一个线程成功,而不管是put的线程失败还是扩容的线程失败时,都会重新获取里面的值,再重新触发对应的put或者扩容逻辑。
    三、对于有数据的桶,put操作和扩容操作都是通过synchronized在桶上加锁来避免并发写问题。

  • 第二是多个线程同时扩容。
    一、扩容线程和扩容线程间在进行任务分配时,是从数组尾部往头部以桶为单位截取,并且用来标记已分配区域的指针transferIndex是volatile修饰的,所以线程间是可见的,通过cas来修改transferIndex值,保证线程间没有重复的桶
    二、其他线程怎么感知到扩容状态,从而一起进行扩容?在对某个桶进行扩容时候,在完成扩容后会生成一个ForwardingNode放在老表的对应下标的位置下,当有其他线程修改这个桶内数据时,如果发现这个类型的节点,就会一起进行扩容

helpTransfer方法

在扩容时,如果发现线程当前 hash 冲突了,也就是当前 hash 值对应的槽位有值了,且如果这个值是 -1 (MOVED),说明 Map 正在扩容。那么就帮助 Map 进行扩容。以加快速度

/**
 * Helps transfer if a resize is in progress.
 */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 如果 table 不是空 且 node 节点是转移类型,数据检验
    // 且 node 节点的 nextTable(新 table) 不是空,同样也是数据校验
    // 尝试帮助扩容
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // 根据 length 得到一个标识符号
        int rs = resizeStamp(tab.length);
        // 如果 nextTab 没有被并发修改 且 tab 也没有被并发修改
        // 且 sizeCtl  < 0 (说明还在扩容)
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            // 如果 sizeCtl 无符号右移  16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了)
            // 或者 sizeCtl == rs + 1  (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
            // 或者 sizeCtl == rs + 65535  (如果达到最大帮助线程的数量,即 65535)
            // 或者转移下标正在调整 (扩容结束)
            // 结束循环,返回 table
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 进行转移
                transfer(tab, nextTab);
                // 结束循环
                break;
            }
        }
        return nextTab;
    }
    return table;
}

helpTransfer方法解析:此方法桶addCount方法中的一段类似,只不过加了一些判断条件,判断当前节点是否位fwd节点,计算扩容标识,sizectl是否为负数,cas的方式将sizectl加一,多个一个线程进入扩容

  1. 判 tab 空,判断是否是转移节点。判断 nextTable 是否更改了。
  2. 根据length 得到标识符。
  3. 判断是否并发修改了,判断是否还在扩容。
  4. 如果还在扩容,判断标识符是否变化,判断扩容是否结束,判断是否达到最大线程数,判断扩容转移下标是否在调整(扩容结束),如果满足任意条件,结束循环。
  5. 如果不满足,并发转移。

tryPresize方法

putAll批量插入或者插入节点后发现链表长度达到8个或以上,但数组长度为64以下时触发的扩容会调用到这个方法

 private final void tryPresize(int size) {
 //如果大小为MAXIMUM_CAPACITY最大总量的一半,那么直接扩容为MAXIMUM_CAPACITY,否则计算最小幂次方
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
          //如果sizeCtl为正数或0
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
             //如果table还未进行初始化
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                 //cas修改sizeCtl为-1,表示table正在进行初始化
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                     //确认其他线程没有对table修改
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            //0.75*n
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
             //如果扩容大小没有达到阈值,或者超过最大容量
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
             /**生成表的生成戳,每个n都有不同的生成戳
             * static final int resizeStamp(int n) {
             *   return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
             *    }
             *   Integer.numberOfLeadingZeros(n)在指定 int 值的二进制补码表示形式中最高位(最左边)的 1 位之前,返回零位的数量
             * 例如 n为16 0001 0000 则Integer.numberOfLeadingZeros(n)为27,因为n为2的幂次方,因此不同的n此结果也不同
             * 然后与(1 << (RESIZE_STAMP_BITS - 1)) | ,相当于2^15 | n中0的个数。
             * (因此其左移16位后符号位为1,结果肯定是个负数)
             */
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    /**1.第一个判断 sc右移RESIZE_STAMP_SHIFT位,也就是比较高ESIZE_STAMP_BITS位生成戳和rs是否相等
                    * 相等则代表是同一个n,是在同一容量下进行的扩容,
                    *  2.第二个和第三个判断 判断当前帮助扩容线程数是否已达到MAX_RESIZERS最大扩容线程数
                    *  3.第四个和第五个判断 为了确保transfer()方法初始化完毕
                    */
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                 /**如果没有线程在进行扩容,那么cas修改sizeCtl值,作为扩容的发起,rs左移RESIZE_STAMP_SHIFT位+2
                 * 上面说了,左移RESIZE_STAMP_SHIFT位,肯定是个负数,代表有一个线程正在进行扩容
                 * 此时sizeCtl高RESIZE_STAMP_BITS位为生成戳,低RESIZE_STAMP_SHIFT位为扩容线程数
                 */
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

tryPresize方法解析:如果集合长度没有超过最大值。那么调用tableSizeFor将长度扩大1.5倍加1计算出大于该值最接近2的幂次的数,判断当前容器是否正在扩容或初始化,有的话就什么也不干,没有则继续,第一步先判断当前容器是否为空,那么就开始初始化容器(采用了cas),初始化之前判断table==tab 这步是非常有必要的,此步避免了多个线程进入初始化。第二步如果新插入的集合长度本身就小于容器的阈值,那么就直接退出。第三步,到了这步说明容器本身就存在元素并且新的集合长度超过了阈值,那么就需要扩容了,此处的扩容阻塞条件同addcount方法一样

get方法

此方法主要作用用于获取集合指定的key的元素。是不需要加锁的,可以支持高并发的读操作。

get方法的实现主要是通过volatile关键字和Unsafe类来保证的。在ConcurrentHashMap中,Node数组和Node节点的value字段都是用volatile关键字修饰的,这可以保证线程间的可见性,也就是说,当一个线程修改了数据,其他线程可以立即看到修改后的值。

get方法为弱一致性,在get方法时有可能获得过时的数据。

  public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get方法解析:
计算key的hash,先尝试着从容器通过hash定位找到指定的桶位,先尝试获取表头的元素比较hash和key,如果找到就返回,不行就看看hash是否小于0(说明是红黑树或者元素已被迁移),那么就直接调用元素的find方法查找元素(不同类型的元素重写了对应的find方法),如果hash不小于0那么说明就是在当前容器的位置的链表了,直接遍历链表查找元素。 如果元素正在迁移那么元素属于fwd类型,在此前元素迁移创建fwd的时候会将新容器给fwd就是为了get方法遍历,如果正在扩容旧容器上没有就可以通过fwd的find方法查找新容器遍历新容查找。

remove方法

移除 key 元素对应的节点

/* 两个重载函数,区别在于是否指定元素的value值,底层都是调用 replaceNode */
public V remove(Object key) {
    return replaceNode(key, null, null);
}
public boolean remove(Object key, Object value) {
    if(key == null)
        throw new NullPointerException();
    return value != null && replaceNode(key, null, value) != null;
}

/* value: 替换的值,为null即删除, cv:指定key的旧值,不指定为null,指定需新旧值相等才删除 */
final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode()); // 获取 hash 值
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        /* tab没有元素,直接退出 */
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        /* 正在扩容,帮助扩容获取新数组 */    
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                /* 判断当前桶位首节点有没有被更改 */
                if (tabAt(tab, i) == f) {
                    /* 链表节点 */
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            /* 找到节点 */
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                /* 不指定旧值 || 新旧值相等,进行替换 or 删除 */
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    /* 替换和删除用的同一方法,使用 value进行区分 */
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    /* 树节点 */
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        /* 根节点出发,寻找对应的节点 */
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                /* 替换和删除用的同一方法,使用 value 进行区分 */
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            /* validated为true代表进入过节点查找 */
            if (validated) {
                /* oldVal 不为null即有对应的节点,value不为null是删除,为null是替换,都返回旧值oldVal */
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1); // 删除节点添加个数为-1,可回看04篇
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

remove方法解析:
大多数删除方法都是调用的replaceNode方法。首先计算key的hash值。开始自旋,查找对应位置的桶是否为空,为空直接跳出表示没有此元素,否则判断此节点是否正在迁移,如果正在迁移那么调用helpTransfer帮助扩容,继续自旋。如果对应的桶位置没有迁移,锁住表头,开始遍历链表,通过比较key的hash和equals方法找到对应的元素,如果元素是链表头那么通过cas的方式设置下一个元素为表头,如果不是那么只需要将下一个元素的引用给到上一个元素的next(斩断链表),如果是树则调用树的方法。最后如果有元素被剔除,那么调用addcount重新计算size的大小,删除元素不需要检查扩容


文章作者: Needle
转载声明:本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Needle !
  目录
  评论