当前位置: 代码迷 >> 综合 >> java容器之 ConcurrentHashMap jdk1.8 思路整理
  详细解决方案

java容器之 ConcurrentHashMap jdk1.8 思路整理

热度:93   发布时间:2023-10-14 09:41:38.0

下面就来写一下关于jdk1.8的的ConcurrentHashMap的整体逻辑
本文不纠结于源码,因为源码太过于复杂,不想深究,暂时也没有必要深究,源码虽好,但暂时无须贪杯
本文将ConcurrentHashMap简称为chm
本文目的在于,快速理清chm逻辑,便于理解

chm的结构

java容器之 ConcurrentHashMap jdk1.8 思路整理
可以看到此处的结构跟jdk1.8的hashmap结构差不多
也是数组+链表/红黑数的结构
而并发体现在哪里呢,在chm中用了大量的CAS+自旋 和 synchronized关键字,来巧妙的实现并发安全问题,便一定程度上保证并发的效率。

相比jdk1.7的改进

相比jdk1.7来说,变化是翻天覆地的
整体的结构都变了,取消了分段锁

sizeCtl变量

这是源码中一个十分重要状态变量
通过它,可以得知chm内部数组的状态
eg:

  • 0:sizeCtl为0,代表数组未初始化
  • 正数:该值代表当前数组的阈值(跟hashmap的一样,capacity*loadFactor)
  • -1:代表数组正在初始化
  • 负数且不是-1:代表数组正在扩容,该数为-(n+1),表示当前有n个线程在共同完成扩容操作

如上所示,在我们后续的逻辑中,会用到这个变量,通过这个变量,我们就可以执行对应数组状态的逻辑

put操作的大致逻辑

对于任何容器,基本最关心的就是put方法了,这里我们就先来看put方法的逻辑
这里put方法体中,就只调用了putVal方法

	putVal方法:根据key的hash进行扰动,找到对应的桶位置(数组)进行自旋(死循环):if 数组为空,或者数组长度为0:initTable方法进行初始化//1else if 对应的通位置为空:尝试CAS插入(创建新节点)//2 这里没什么好说的,设置成功就跳出循环,失败继续自旋else if  数组正在扩容(该位置第一个元素的hash值是-1)://3当前线程协助扩容else:对当前桶位第一个元素进行加锁(Synchronized)://4if 节点类型是链表:(通过hash值来判断,大于零的是链表节点)链表插入else 红黑树插入if 满足条件:进行红黑树转换统计当前chm总size(addCount方法包含计算size和扩容两个步骤,这里分开写了,便于理解)//5if 需要:进行扩容

整体的逻辑就如上面所示‘
可以看到上面的逻辑和HashMap差不多,主要区别是在并发上体现的
这里需要关注的点,我都打上了1,2,3,4
主要关注这几个点就好了
下面来一一讲解

数组初始化的逻辑

当有两个线程同时发现数组未初始化,那么此时它们都要去尝试初始化,此时就是一个并发问题。

initTable方法:wihle(数组尚未初始化):if sizeCtl值小于0(代表正在初始化or扩容,其实只是初始化,没有扩容):让出cpu控制权(相当于帮忙)else if CAS 更新sizeCtl成功(将其设置为-1,同时只有一个线程成功,此处相当于锁):if 数组尚未初始化(加上while的判断,此处是双重校验了):进行初始化,至少是16的大小更新sizeCtl为0.75*capacityreturn 数组				

这里其实非常像是单例模式懒加载的 双重校验 的实现,只不过将Synchronized改成了CAS
这里的核心是sizeCtl,通过它来实现锁


当然,也可以在chm创建的时候,直接指定好数组大小
不要调用默认的构造方法,而是调用ConcurrentHashMap(int)方法,传入容量

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)throw new IllegalArgumentException();int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap;}

此时会将数组长度设置成传入容量的两倍
同时sizeCtl为0.75*这个两倍

4 加锁插入

在上面的 4 那里,
可以看到是用加锁方式来实现的
将桶位的第一个元素给锁住,那么其他线程就不能修改该桶位了

else {
    V oldVal = null;synchronized (f) {
    if (tabAt(tab, i) == f) {
    //双重校验if (fh >= 0) {
    binCount = 1;for (Node<K,V> e = f;; ++binCount) {
    K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
    oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {
    pred.next = new Node<K,V>(hash, key, value);break;}}}else if (f instanceof TreeBin) {
    Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
    oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}else if (f instanceof ReservationNode)throw new IllegalStateException("Recursive update");}}if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}

这里没有什么好说的,跟hashmap基本是一样的

addCount方法

该方法是一个重点,也是一个关键
addCount方法主要实现了,并发环境下的size计算、扩容
下面一一讲解:


并发环境下的size计算:

它的原理是这样的,在chm中,多个线程进行put之后要进行size的++操作
此时必然有线程安全问题,则此时如果只有一个变量,肯定不行,即使它是原子类,也很慢
所以chm中是这样一种机制
有一个变量是baseCount,还有一个CounterCell数组
当前线程进入addCount方法之后,先尝试对baseCount进行CAS,如果失败,则将目标瞄向CounterCell数组
然后对数组中的值进行修改

最终size为baseCount+数组中所有值

这种机制的好处是,每个线程可以快速的结束size计算的流程,不会被堵塞

addCount方法 size计算:自旋:if 数组为空:创建数组else if 数组正在创建:尝试CAS baseCountelse if 数组不为空:利用随机数来随机确定一个数组位置if 当前位置没有对象:创建CountCell对象,并累加值else if 不为空 且上次CAS失败(没用必要再次尝试,换一个位置尝试):wasUncontended = true rehashelse if 尝试CAS成功:breakelseif 有别的线程对数组扩容||数组容量达到CPU核心数(数组最大值):rehashelse : 数组扩容

数组扩容
(不是CounterCell)

这里有点复杂,讲一下大概的逻辑:

  • 当满足扩容条件(hashmap中有讲,不赘述),进行扩容
  • 新数组容量一般是之前的两倍。
  • 此处调用transfer方法进行扩容
  • 主要逻辑在于:
    从后往前进行复制
    往当前节点放置一个Forward节点,其hash为-1,同时其指向新数组的对应位置
    然后将当前位置的节点(如果有)按照hash值的某一个特殊位置,进行划分两类
    一类在新数组i位置,一类在i+n位置(这里分为两类,其实就是jdk1.8的HashMap的扩容改变之处)
  • 此处可能还要反红黑树转换
  • 对于其他线程,主要负责扩容的线程会将整个扩容复制任务划分成若干个小任务
  • 任务最小为16个元素,然后分给各个线程进行操作

(这里之后有必要再补齐,先mark,)

注释源码地址

有一个比较好的源码注释:https://blog.csdn.net/ioteye/article/details/108634818

参考资料

深入分析ConcurrentHashMap1.8的扩容实现
面试必备之ConcurrentHashMap终结篇-黑马程序员杭州校区出品

  相关解决方案