`
zhaomingzm_23
  • 浏览: 33053 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Java并发包探秘 (二) ConcurrentHashMap

阅读更多
本文是Java并发包探秘的第二篇,旨在介绍一下Java并发容器中用的一些思路和技巧,帮助大家更好的理解Java并发容器,让我们更好的使用并发容器打造更高效的程序。本人能力有限,错误难免。希望及时指出。

Java并发包中有很多精心设计的高并发容器。有ConcurrentLinkedQueueConcurrentSkipListMapConcurrentHashMap等。ConcurrentHashMap就是其中设计独特,受到开发者一致好评的key-value高并发容器,现在就让我们来一步一步揭开他们神秘的面纱。

G+

正文开始:
为了照顾到所有读者,本文在分析ConcurrentHashMap之前先从HashMap开始介绍。为了叙述方便ConcurrentHashMap简称为并发HashMap,而HashMap就称之为HashMap。

一说到HashMap,我们首先需要明白什么是Hash。Hash其实是数学中的一个“散列”算法,可以把这个算法理解成 : address = hash(key) 其中 key 是输入参数,address 是我们关心的地址。那么hash()就是“散列”函数或者称之为hash函数。什么是输入参数呢?在HashMap中是指关系映射的键、address就是用来存储数据的数组下标。采用这个方法来算地址在时间复杂度上是最快的。否则我们要一一比对容器中的内容和我们要的数据是否满足。链表的时间复杂度是O(n),B+Tree的时间复杂度是O(log2n),相关知识请点击。Hash算法的种类非常多。我们理解只需要明确。Hash()函数的计算结果要在目标空间竟可能的分散和均匀,当计算结果相同时如何解决好冲突。在理解HashMap之前让我们来看看HashTable是怎么处理的
// Makes sure the key is not already in the hashtable.
	Entry tab[] = table;
	int hash = key.hashCode();
	int index = (hash & 0x7FFFFFFF) % tab.length;


我们从上面的代码不难看出,在HashTable中的处理十分简单,数组下标的值仅仅是放入元素Key的hashCode()函数与数组长度取余。Object的hashCode()函数是一个native方法。方法说明做出了一下三条保证:
  • Whenever it is invoked on the same object more than once during an execution of a Java application, the hashCode method must consistently return the same integer, provided no information used in equals comparisons on the object is modified. This integer need not remain consistent from one execution of an application to another execution of the same application.
  • If two objects are equal according to the equals(Object) method, then calling the hashCode method on each of the two objects must produce the same integer result.
  • It is not required that if two objects are unequal according to the java.lang.Object.equals(java.lang.Object) method, then calling the hashCode method on each of the two objects must produce distinct integer results. However, the programmer should be aware that producing distinct integer results for unequal objects may improve the performance of hashtables.



做一个假设,Object的hashCode()方法返回一个常数在HashTable中会出现什么情况。没错!此时我们存储在tab中的数据将呈现一个链表的结构。原因就是我们的hashCode与表长度求余的结果是一个常数,所以要不断的解决hash冲突。
我们再来看看HashMap中的hash算法:
/**
     * Applies a supplemental hash function to a given hashCode, which
     * defends against poor quality hash functions.  This is critical
     * because HashMap uses power-of-two length hash tables, that
     * otherwise encounter collisions for hashCodes that do not differ
     * in lower bits. Note: Null keys always map to hash 0, thus index 0.
     */
    static int hash(int h) {
        // This function ensures that hashCodes that differ only by
        // constant multiples at each bit position have a bounded
        // number of collisions (approximately 8 at default load factor).
        h ^= (h >>> 20) ^ (h >>> 12);
        return h ^ (h >>> 7) ^ (h >>> 4);
    }


从代码和说明中,我们不难发现。HashMap的hash算法比HashTable的要略微复杂了一点。目的也是使结果在目标空间竟可能分散。

HashMap是比HashTable要快的key-value容器,原因很简单,在访问HashMap的时候比访问HashTable的数据少了同步的性能消耗。但带来的问题则是该数据容器在多线程环境下会导致数据不安全。它是一个非线程安全的键值对容器。在高并发的环境要禁止使用它。但key-value的访问方式又是必不可少的。所以在JDK 1.5引入了一个并发的HashMap,它几乎是在并发环境下使用key-value容器的必选对象。我们先来看看它的内部结构。
首先还是hash算法:
/**
     * Applies a supplemental hash function to a given hashCode, which
     * defends against poor quality hash functions.  This is critical
     * because ConcurrentHashMap uses power-of-two length hash tables,
     * that otherwise encounter collisions for hashCodes that do not
     * differ in lower or upper bits.
     */
    private static int hash(int h) {
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }


又复杂了,没错! 目的只有一个使结果在目标空间均匀。相关文章请参考这里

除了hash算法的改进,还有就是在并发HashMap中使用了锁,而且这把锁是分离的锁。目的就是绕过访问必须加锁的技术障碍,当然又要保护数据的安全。这样比HashTable中方法级别的synchronized更加细粒度的Segment诞生了。该类自身就是继承自ReentrantLock可重入锁对象,目的是方便加锁操作。
/**
     * Segments are specialized versions of hash tables.  This
     * subclasses from ReentrantLock opportunistically, just to
     * simplify some locking and avoid separate construction.
     */
    static final class Segment<K,V> extends ReentrantLock implements Serializable {


并发HashMap中默认使用16个Segment对HashMap的数据进行分段,读取方法如下:
public V get(Object key) {
        int hash = hash(key.hashCode());
        return segmentFor(hash).get(key, hash);
    }

读取方法使用了二次hash操作,第一次时命中一个Segment,第二次调用Segment的get方法:
/* Specialized implementations of map methods */

        V get(Object key, int hash) {
            if (count != 0) { // read-volatile 1.这里是确保可见性
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)// 2.这里的数据为空说明有并发的修改。
                            return v;
                        return readValueUnderLock(e); // recheck
                    }
                    e = e.next;
                }
            }
            return null;
        }


只有当数据被并发修改的时候才加锁读,否则都是直接返回数据。这样提高了并发性能。get方法还用到了JDK 1.5对volatile字段的语义增强(JSR 166),确保happens-before原则。相关文章这里

Segment中的table存放的是HashEntry数组,那么它的结构又是怎样的呢?

static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;

        HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
            this.key = key;
            this.hash = hash;
            this.next = next;
            this.value = value;
        }

	@SuppressWarnings("unchecked")
	static final <K,V> HashEntry<K,V>[] newArray(int i) {
	    return new HashEntry[i];
	}
    }


我们从代码可以看出除了value字段是可以修改的,其它都是final 字段,这样造成的结果是删除一个元素的时候我们无法修改HashEntry的next引用。这样我们删除操作时只能其它部分逐个复制了。相关代码如下:
/**
         * Remove; match on key only if value null, else match both.
         */
        V remove(Object key, int hash, Object value) {
            lock();//1.删除操作是加锁的,当然这个锁得粒度仅仅在Segment范围之内。
            try {
                int c = count - 1;
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue = null;
                if (e != null) {
                    V v = e.value;
                    if (value == null || value.equals(v)) {
                        oldValue = v;
                        // All entries following removed node can stay
                        // in list, but all preceding ones need to be
                        // cloned.
                        ++modCount;
                        HashEntry<K,V> newFirst = e.next;
                        for (HashEntry<K,V> p = first; p != e; p = p.next)
                            newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                          newFirst, p.value);//2.注意看这里,就是逐个复制。
                        tab[index] = newFirst;
                        count = c; // write-volatile 3.这里依然是 JSR 166 保证线程间可见性。
                    }
                }
                return oldValue;
            } finally {
                unlock();
            }
        }


在删除之前依然是:
public V remove(Object key) {
	int hash = hash(key.hashCode());
        return segmentFor(hash).remove(key, hash, null);
    }


其中get方法和remove方法都调用了一个segmentFor()方法,其实并发HashMap的所有操作都和这个方法有关。到底是如何实现的呢?

/**
     * Returns the segment that should be used for key with given hash
     * @param hash the hash code for the key
     * @return the segment
     */
    final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }


需要特别注意一下segmentShift和segmentMask,假设构造函数确定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。一般最常见的值是Segment数量 2^4 = 16,segmentShift=28,segmentMask=15,经过segmentFor()运算就能快速的定位到具体的Segment上面。

public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
    }
 public V putIfAbsent(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, true);
    }


put方法和putIfAbsent方法都调用了Segment的put(etc...)方法,仅仅只是最后一个参数不同:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)//不同之处,在这里体现。
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c; // write-volatile 最后写volatile变量,保证可见性
                }
                return oldValue;
            } finally {
                unlock();
            }
        }


并发HashMap的put和putIfAbsent的区别就是一个是直接放入并替换,另一个是有就不替换。其它方法诸如clear(),replace()等,都是在细粒度的锁下完成的。这里不做更细的讨论,详见并发HashMap源代码

最后我们看看size操作的实现:
public int size() {
        final Segment<K,V>[] segments = this.segments;
        long sum = 0;
        long check = 0;
        int[] mc = new int[segments.length];
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
            check = 0;
            sum = 0;
            int mcsum = 0;
            for (int i = 0; i < segments.length; ++i) {
                sum += segments[i].count;//1.注意这语句
                mcsum += mc[i] = segments[i].modCount;//2.还有这一句。
//以上两句是不能调换位置的,还是JSR166 happens-before原则。count之前修改的内容一定会被读取到,jdk 1.5 volatile 关键字的语意增强。
            }
            if (mcsum != 0) {
                for (int i = 0; i < segments.length; ++i) {
                    check += segments[i].count;
                    if (mc[i] != segments[i].modCount) {
                        check = -1; // force retry
                        break;
                    }
                }
            }
            if (check == sum)
                break;
        }
        if (check != sum) { // Resort to locking all segments
            sum = 0;
            for (int i = 0; i < segments.length; ++i)
                segments[i].lock();//全部加锁
            for (int i = 0; i < segments.length; ++i)
                sum += segments[i].count;
            for (int i = 0; i < segments.length; ++i)
                segments[i].unlock();//全部解锁
        }
        if (sum > Integer.MAX_VALUE)
            return Integer.MAX_VALUE;
        else
            return (int)sum;
    }


RETRIES_BEFORE_LOCK 的默认值是2,2次循环检测看看modCount的数据是否一致,(modCount的值是只增不减的)一致说明此时没有并发修改。直接返回,否则进行全Segment的加锁计算。

总结:
并发HashMap是Java并发包中名气最大的并发容器。在刚刚发布不久的Java 7 中也有更新。当然仅仅只是版权和类结构上的优化。并没有从算法上更新,所以这里不做展开讨论。并发HashMap中将锁、hash算法和JSR166使用到了极致,是Java中不可多得的财富。只可学习不要轻易模仿。

Java8推迟发布,其中也涵盖ConcurrentHashMap的更新,感兴趣的同行可以关注下这个链接
ConcurrentHashMapV8
2
6
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics