ConcurrentHashMap从入门到入睡
  EYmIKSPPfzd2 2023年12月25日 23 0

ConcurrentHashMap

为什么使用ConcurrentHashMap

前文提到,HashMap无论任何版本都是线程不安全的

但 Hashtable 会给整张表加悲观锁,仅允许单个线程独占,效率低下。

synchronizedMap加入了互斥锁 mutex ,在方法上加上 synchronized,效率同样不高。

所以需要更低粒度的锁以换取更好的并发性能。

ConcurrentHashMap不允许null

无论是键还是值,ConcurrentHashMap都不允许使用null。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    ....
}

为什么不能为null

HashMap的二义性。

public class Test {
    public static void main(String[] args) {
        Map<String, String> map = new HashMap<>();
        map.put("erick", null);
        System.out.println(map.get("erick")); // -> null
        System.out.println(map.get("yunliyunwai")); // -> null
    }
}

由此可知,此时两个返回虽然都是null,但实际上两种结果所代表的意义完全不同。

前一个代表值为null,后一个则是没有该键。

ConcurrentHashMap 作者 Doug Lea 认为在并发条件下这种二义性是无法容忍的。

数据结构

JDK1.7

在JDK1.7,HashMap主要使用数组+链表方式实现。

ConcurrentHashMap主要采用Segment分段锁的形式实现。

Segment分段锁继承自ReentrantLock,可以最高同时支持 Segment 数量大小的写操作(即写入操作完全平分在每个Segment上)。

1.jpg

在并发时,对每个Segment加锁,不允许对其进行非查询操作。

每个Segment包含若干链表。

简单来说,每个Segment就是分割的"小HashMap"。

相关参数
/**
 * 默认的并发度,这里所谓的并发度就是能同时操作ConcurrentHashMap的线程的最大数量,
 * 由于ConcurrentHashMap采用的存储是分段存储,即多个Segment,加锁的单位为Segment,所以一个map的并行度就是Segments数组的长度,
 * 故在构造函数里指定并发度时同时会影响到Segments数组的长度,因为数组长度必须是大于并行度的最小的2的幂。
 */
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
 * 每个Segment最小容量
 */
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

/**
 * 每个Segment最大的容量
 */
static final int MAX_SEGMENTS = 1 << 16;

/**
 * 默认自旋次数,超过这个次数直接加锁,防止在size方法中由于不停有线程在更新map
 * 导致无限的进行自旋影响性能,当然这种会导致ConcurrentHashMap使用了这一规则的方法
 * 如size、clear是弱一致性的。
 */
static final int RETRIES_BEFORE_LOCK = 2;

在加锁时,首先尝试对Segment加锁,如果失败,则通过自旋方式获取锁。

如果重试次数达到MAX_SCAN_RETRIES则改为阻塞获取锁。

static final class HashEntry<K,V>{
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
}

其中,HashEntry中值与下一节点被volatile修饰,保证了多线程环境下数据获取时的可见性

JDK1.8

在JDK1.8下,HashMap采用数组+链表+红黑树的结构。ConcurrentHashMap在原有基础上,抛弃了Segement分段锁思路,采用CAS+synchronized实现更低粒度的锁。

2.jpg

加锁时会锁住头结点

3.jpg

锁头结点的优势是,理论上可以最高同时支持头节点数量大小的写操作(即写入操作完全平分在每个链表或红黑树上)。

为什么抛弃重入锁ReentrantLock
  • 减少内存开销:如果使用ReentrantLock则需要节点继承AQS(AbstractQueuedSynchronizer)来获得同步支持,增加内存开销,而1.8中只有头节点需要进行同步。
  • 内部优化:synchronized则是JVM直接支持的,JVM能够在运行时作出相应的优化措施:锁粗化锁消除锁自旋等等。

并发度

并发度是什么

简单来说,并发度是程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数

当并发度过小,则会导致严重的锁竞争,而并发度过大,会导致CPU缓存命中率下降,影响性能。

JDK1.7

理论上来说,JDK1.7下,每个Segment加分段锁,并发度即为Segment数量,默认为16。

// 默认并发度
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

用户也可以在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32)。

JDK1.8

理论上来说,JDK1.8下,每个头结点加锁,并发度即为头节点数量。

即:ConcurrentHashMap的容量。

ConcurrentHashMap是弱一致的

CAS(Compare and swap:比较与交换)

CAS是对乐观锁的一种实现,假设要改变的值为V,预期为E,更新后的值应为U。则当且仅当V==U时,才会将V更新为E。整个操作过程不需要加锁,比加锁的效率更高。

初始化并发处理

JDK1.7

由源码:

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE;
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 检查该Segment是否被其他线程初始化
        Segment<K,V> proto = ss[0];
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 并发初始化核心
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { 
		   // 再次检查一遍该Segment是否被其他线程初始化
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

可知:JDK1.7对于并发操作使用 CAS 进行控制。

JDK1.8

源码:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 如果 sizeCtl < 0 ,说明由其他的线程执行 CAS 成功,正在进行初始化。
        if ((sc = sizeCtl) < 0)
            // 操作线程礼让
            Thread.yield();
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

在JDK1.8中,初始化是通过自旋和 CAS 操作完成的。

需要注意 sizeCtl 的值,它决定当前的初始化状态。

  • -1 说明正在初始化
  • -N 说明有 N-1 个线程正在进行扩容
  • 0 表示 table 初始化大小,
  • > 0 表示 table 扩容的阈值。

get方法

在ConcurrentHashMap中,get方法比较简单,主要是根据 key 计算下标,然后通过链表或红黑树遍历即可。

需要注意的是 get 方法不需要加锁,它不对数据做任何更改。

扩容方式

在JDK1.8下,ConcurrentHashMap的扩容分两步:

  • 构建一个 nextTable,它的容量是原来的两倍,这个操作单线程完成。新建 table 数组的代码为:Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1],在原容量大小的基础上右移一位。
  • 将原来 table 中的元素复制到 nextTable 中。通过 rehash, 旧数组里的数据移动到新的数组时,位置要么不变,要么变为 index + oldSize
int n = tab.length;
int b = p.hash & n;

在这其中,核心是 forwardingNode 占位符,来确认节点是否被处理过,以此来控制并发有序。

在目前实现方式下ConcurrentHashMap的缺点

ConcurrentHashMap 是设计为非阻塞的。

在更新时会局部锁住某部分数据,但不会把整个表都锁住。

读取操作是完全非阻塞的,这代表读取操作不能保证反映最近的更新

一个线程put了大量数据,期间另一个线程是只能得到当前已经插入成功的数据。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

上一篇: 外网<远程>访问Tomcat 下一篇: 浅谈api接口
  1. 分享:
最后一次编辑于 2023年12月25日 0

暂无评论

EYmIKSPPfzd2