面试突击 ConcurrentHashMap 篇

面试突击 ConcurrentHashMap

前置关键词

  • segments 分段(jdk1.7)

老生常谈

  • jdk1.7 采用 Segment数组(segments),每个 Segment都有独立的 ReentrantLock锁,并发操作互不影响
  • jdk1.8 CAS + synchronized 实现每个 Node一个锁,缩小了锁粒度,提高了并发性能

加锁

并发情况下,HashMap 存在线程不安全的情况,线程不安全,当然要加锁了。

HashTable

HashTable,get和put都加了synchronized修饰,这样带来的直接问题就是,性能比较差。

JDK 1.7

1.7 分段锁

采用分段的思想,切分为多个Segment,默认为16个,可以初始化时指定,后期不能修改,Segment就相当于一个小的HashMap。

理论上支持segments.length个线程并发操作,默认也就是16个线程并发操作。

扩容时,也是每个Segment里边的table自己扩容,Segment数量如前边所说,初始化时指定,不可更改。

1.7 使用分段锁 get/put

类似HashMap,只不过一个由一个数组,换成了一组数组,每个Segment中有一个数组,看下边源码。

static final class Segment<K, V> extends ReentrantLock implements Serializable {

    private static final long serialVersionUID = 2249069246763182397L;

    static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

    transient volatile HashEntry<K, V>[] table;

    transient int count;

    transient int modCount;

    transient int threshold;

    final float loadFactor;

    Segment(float lf, int threshold, HashEntry<K, V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K, V>[] tab = table;
            int index = (tab.length - 1) & hash;
            HashEntry<K, V> first = entryAt(tab, index);
            // ...
        } finally {
            unlock();
        }
        return oldValue;
    }

}

put操作时,通过两次hash定位HashEntry位置,第一次找到在第几个Segment,第二次找到在Segment中table中的位置。

1.7 计算 size(初见,挺有趣)

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K, V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            // 这个for循环至少会执行两次吧,除非sum始终为0,也就是空的Map
            for (; ; ) {
                // retries++ 第一次-1,第二次0,第三次1,第四次2(此时就要加锁了,if执行完retries = 3)
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K, V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        // seg.modCount 是只加不减的
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                // 初见,是不是想last在哪赋值了,在下边。下次循环才能用到,既然是比较,至少也得两两对比吧,for循环至少执行两次
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            // 解锁
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }
}

JDK 1.8

锁粒度细化,为每个Node,CAS + synchronized

和 HashMap 一样,只不过在 put 的时候,如果多个线程操作同一个 Node,会先加锁再操作,开始为 CAS,如果有冲突,再升级为 synchronized。

JDK1.6之后,synchronized做过优化,会有锁升级的过程,无锁、偏向锁、轻量级锁、重量级锁,以此来保证并发安全。

 static class Node<K, V> implements Map.Entry<K, V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K, V> next;
}
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K, V>[] tab = table; ; ) {
            Node<K, V> f;
            int n, i, fh;
            K fk;
            V fv;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // CAS 操作(注意:仅在table中头结点为null时使用CAS,也就是没发生Hash冲突的时候)
                if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value)))
                    break;                   // no lock when adding to empty bin
            } else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent // check first node without acquiring lock
                    && fh == hash
                    && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                    && (fv = f.val) != null)
                return fv;
            else {
                V oldVal = null;
                // synchronized 加锁操作(f 肯定不为空了)
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K, V> e = f; ; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K, V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K, V>(hash, key, value);
                                    break;
                                }
                            }
                        } else if (f instanceof TreeBin) {
                            Node<K, V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        } else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
}