ConcurrentHashMap的JDK1.7实现(下)

续《ConcurrentHashMap的JDK1.7实现(上)

rehash操作

rehash也就是扩容操作,扩容之后的容量是之前的两倍,所以扩容之后的newCapacity也是2^n的一个值。

        private void rehash(HashEntry<K,V> node) {
            /*
             * 将table中每个节点重新分配到新的table中去。因为使用的是 *2的方式扩容,
             * 每个元素在table中的索引要么为i(不变),要么是i+oldCapacity。
             * 如:扩容前容量是16,当前HashEntry在table[]中的索引为3,则新的索引可能为3或者19。
             * 在节点拷贝的过程中,有一些节点的next节点是不用调整的,就直接利用了。
             * 据统计,在默认的threshold值时, 扩容只需要1/6的节点需要拷贝。
             * 那些被替换掉的节点,在没有任何线程引用的时候,将会被GC回收。
             * Entry accesses use plain array indexing because they are followed by volatile table write.
             */
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1; // 容量*2操作
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            // 遍历扩容前节点
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  若链表为单节点
                        newTable[idx] = e;
                    else { // 重复利用一些扩容后,next不变的节点,这些节点在原先链表的尾部
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // 对于next变化的节点重新计算hash(链表前面部分节点),然后重新插入
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // 将需要put的新node插入
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

从上面的代码流程可以看出,其扩容步骤大致如下:

(1)首先计算出newCapacity的容量;

(2)然后循环table[],重新分配每条链表上面的元素。因为使用的是 *2的方式扩容,每个元素在table中的索引要么为i(不变),要么是i+oldCapacity。如:扩容前容量是16,当前HashEntry在table[]中的索引为3,则新的索引可能为3或者19。

(3)拷贝过程中,如果为单链表则直接赋值;在节点拷贝的过程中,有一些节点的next节点是不用调整的(链表后端部分片段),就直接利用了;对于前端部分的片段,则重新hash,然后插入到对应的链表中。

(4)最后再将需要put进来的node,在扩容后的结构中插入。

size()

ConcurrentHashMap的size()操作需要统计所有的Segment中的HashEntry数量,最大为Integer.MAX_VALUE。因为在统计个过程中,有可能出现多线程修改的问题。即便如此,ConcurrentHashMap首先会用无锁尝试3次,如果统计失败,再加锁统计。代码如下:

    public int size() {
        // 首先尝试3次无锁的统计,如果失败,再进入加锁统计
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // 当大小超过 32 bits 时为true
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {// 加锁统计
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                // 尝试3次无锁统计,这里面通过统计前后的modCount值的和 变化,这个值在每个Segment中,每一次变更操作都会递增,类似于Segment的版本号
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

containsValue()

containsValue()方法的思想和size()的基本相同,下面贴出代码,相信查看备注即可。

    public boolean containsValue(Object value) {
        // Same idea as size()
        if (value == null)
            throw new NullPointerException();
        final Segment<K,V>[] segments = this.segments;
        boolean found = false;
        long last = 0;
        int retries = -1;
        try {
            outer: for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {// 加锁统计
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                long hashSum = 0L;
                int sum = 0;
                // 尝试3次无锁统计,这里面通过统计前后的modCount值的和 变化,这个值在每个Segment中,每一次变更操作都会递增,类似于Segment的版本号
                for (int j = 0; j < segments.length; ++j) {
                    HashEntry<K,V>[] tab;
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null && (tab = seg.table) != null) {
                        for (int i = 0 ; i < tab.length; i++) {
                            HashEntry<K,V> e;
                            for (e = entryAt(tab, i); e != null; e = e.next) {
                                V v = e.value;
                                if (v != null && value.equals(v)) {
                                    found = true;
                                    break outer;
                                }
                            }
                        }
                        sum += seg.modCount;
                    }
                }
                if (retries > 0 && sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return found;
    }