# runtime.map 🗺️

# 1. 基本用法

# 1.1 概述

非线程安全的 ⚠️

# 不能作为 map 类型的 key ❌

  • slice(可将其序列化为字符串,做为 key)
  • map (可将其序列化为字符串,作为 key)
  • func

# 可以作为 map 类型的 key ✅

  • 基本类型:int,float64,string,bool 等
  • 指针类型:*int,*string 等
  • 结构体:但结构体的所有字段必须可以比较
  • 接口类型:当接口的底层动态类型和动态值都支持比较时,可以作为键。

# nil map 和 空 map 的区别 🔍

  • nil map:未初始化的 map,不能进行写操作,会 panic;可以进行读取操作,但始终返回零值。
  • 空 map:已初始化的 map,可以进行写操作,但不会分配内存

# 1.2 初始化 🚀

# 1.2.1 几种初始化方法

  • 通过 make 关键字初始化,同时指定 map 预分配容量

    myMap := make(map[int]int,2)
  • 通过 make 关键字初始化,不显示声明容量,因此默认容量为 0

    myMap := make(map[int]int)
  • 初始化操作连带赋值

    myMap := map[int]int {
      1:2,
      3:4
    }

# 1.2.2 key 的类型要求

map 中,key 的数据类型必须为可比较的类型,map、func 不可比较

# 1.3 读 📖

  • 直接读

    v := myMap[10]
  • 读的同时添加一个 bool 类型的 flag 标识是否读取成功。倘若 ok == false,说明读取失败,key 不存在,或者 map 未初始化

    v,ok := myMap[10]

# 1.4 写 ✍️

# 1.5 删 🗑️

# 1.6 遍历 🔄

# 1.7 并发冲突 ⚡

map 不是并发安全的数据结构,倘若存在并发读写行为,会抛出 fatal error

具体规则:

  • 并发读没有问题
  • 并发读写中的 “写” 是广义上的,包含写入、更新、删除等操作
  • 读的时候发现其他 goroutine 在并发写,抛出 fatal error
  • 写的时候发现其他 goroutine 在并发写,抛出 fatal error

# 2. 核心原理 🔧

# 2.1 hash 🔐

将任意长度的输入压缩到某一固定长度的输出摘要的过程,属于压缩映射,输入空间远大于输出空间,因此不同输入可能会映射成相同的输出结果

  1. 可重入性:相同的 key,必然产生相同的 hash 值
  2. 离散性:只要两个 key 不同,无论其相似度的高低,产生的 hash 值会在整个输出域内均匀的离散化
  3. 单向性:无法通过 hash 值反向映射回 key
  4. hash 冲突:由于输入域(key)无穷大,输出域(hash 值)有限,因此必然存在不同 key 映射到相同 hash 值的情况

# 2.2 桶数组 🪣

map 中,会通过长度为 2 的整数次幂的桶数组进行 key-value 对的存储

  1. 每个桶固定可以存放 8 个 key-value 对
  2. 倘若超过 8 个 key-value 对打到数组的同一个索引当中,此时会通过创建桶链表的方式来化解这一问题

# 2.3 拉链法解决 hash 冲突 ⛓️

  1. 拉链法

    将命中同一个桶的元素通过链表的形式进行链接,因此很方便动态扩展

  2. 开放定址法

    在插入新条目时,会基于一定的探测策略持续寻找,直到找到一个可用于存放数据的空位为止

# 2.4 扩容优化性能 📈

倘若 map 的桶数组长度固定不变,那么随着 key-value 对数量的增长,当下一个桶下挂在的 key-value 达到一定的量级,此时操作的时间复杂度趋于线性,无法满足要求。

因此在实现上,map 桶数组的长度会随着 key-value 对数量的变化而实时调整,以保证每个桶内的 key-value 对数量始终控制在常量级别,满足各项操作位 O (1) 时间复杂度的要求。

map 扩容机制的核心点包括:

  1. 扩容分为增量扩容和等量扩容
  2. 当桶内 key-value 总数 / 桶数组长度 > 6.5 时发生增量扩容,桶数组长度增长为原值的两倍
  3. 当桶内溢出桶数量大于等于 2^B 时(B 为桶数组长度的指数,B 最大取 15),发生等量扩容,桶的长度保持为原值
  4. 采用渐进式扩容,当桶被实际操作到时,由使用者负责完成数据迁移,避免因为一次性的全量数据迁移引发性能抖动

# map 的扩容(growing)🔄

# 为什么扩容 🤔

扩容是 go 中 map 维护性能和效率的重要机制,其核心目的是 避免哈希冲突过多导致查找、插入效率下降。

  1. 控制负载因子

    • 负载因子 = 元素总数 / 桶(bucket)数量
    • go 的 map 默认最大负载因子为 6.5
    • 当超过这个值时,说明每个 bucket 平均存储了超过 6.5 个键值对,哈希冲突严重,会影响性能
    • 扩容可以降低负载因子,提高查找 / 插入效率
  2. 减少哈希碰撞

    • 每个 bucket 只能存 8 个键值对(bucketCnt=8)

    • 如果某个 bucket 已满,会使用溢出桶(overflow bucket)

    • 溢出桶越多,查找链就越长,性能越差

    • 扩容增加 bucket 数量,减少冲突链长度

  3. 维持 O (1) 时间复杂度

    • 理性情况下,map 的插入和查找操作应该接近常数时间复杂度 O (1)

    • 随着数据增长,不扩容 hi 导致操作卑微 O (n),由其是在大量冲突的情况下

    • 扩容保证 map 操作保持高效,维持阶级恩 O (1) 的性能。

  4. 渐进式迁移

    • Go 的扩容不是一次性完成的

    • 使用增量迁移,每次操作之前已一部分 bucket

    • 避免一次性迁移所有数据带来的性能抖动

# 触发扩容条件 ⚡

mapassign 函数中,触发 map 扩容的条件主要包括以下两种情况:

  1. 超过负载因子阈值

    • 当前 map 中的元素数量(包含所有 bucket 中的有效键值对)超过了 bucketCount * loadFactor / 100,其中 loadFactor 是负载因子(默认为 6.5)

    • 这个阈值确保了 map 的查找效率不会因为过渡填充而下降

    • func overLoadFactor(count int, B uint8) bool {
      	return count > abi.MapBucketCount && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
      }
  2. 溢出桶过多

    • 即使元素总数未达到负载因子上限,但如果出现了太多溢出桶(overflow buckets),也会触发扩容。这是因为某些 bucket 已满的情况下,新元素只能插入到溢出桶中,形成链表结构,随着链表增长,查找效率下降。
    • Go 在运行时会检查溢出桶的数量,并在必要时扩容。

# 扩容策略 🎯

  1. 扩容方式

    扩容为原来的 2 倍(即 bucket 数量翻倍)

  2. 增量迁移

    扩容后并不会立即迁移所有数据,而是通过后续的每次操作(如插入、删除、遍历)逐步迁移数据,以减少

    一次性性能开销。

# 3. 数据结构 🏗️

# 3.1 hmap

Map 的底层在 runtime/map.go
hmap 结构体

image-20250725160836655

type hmap struct {
	count     int // 当前 map 中的元素个数
	flags     uint8 // 标记 map 的状态,如是否有迭代器、是否正在写入等(使用位标志)
	B         uint8  // 当前 bucket 数量对数(即 buckets 的数量是 1<<B )
	noverflow uint16  // 近似记录溢出 bucket 的数量,用于触发相同大小的扩容
	hash0     uint32 //hash 种子,用于随机化哈希值,防止碰撞攻击
	buckets    unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
	oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
	nevacuate  uintptr        // progress counter for evacuation (buckets less than this have been evacuated)
	extra *mapextra // 可选字段,包含溢出 bucket 的引用和下一块可用的溢出 bucket 指针
}

# 3.2 mapextra 结构体 🔧

type mapextra struct {
  //overflow 和 oldoverflow 用于保存溢出桶的指针。
  // 如果 key 和 elem 不包含指针,则标记 bucket 类型为不包含指针。
  // 这避免了扫描此类 maps。
	overflow    *[]*bmap // 指向当前桶溢出的 bucket 列表
	oldoverflow *[]*bmap // 指出旧桶溢出的 bucket 列表
	// nextOverflow holds a pointer to a free overflow bucket.
	nextOverflow *bmap // 下一个可用的溢出 bucket
}
  • overflow

    作用:存储指向所有当前主桶溢出的 bucket 的指针列表。当竹筒装不下新键值对时,会分配新的 bucket 并通过 overflow 维护这些溢出 bucket

    使用场景:当插入的键值对数量超过当前桶容量时,运行时会动态分配新的 bucket,并将其添加到 overflow 中

  • oldoverflow

    作用:存储指向所有旧桶(hmap.oldbuckets)溢出的 bucket 的指针列表。在扩容过程中,旧桶的数据会被逐步迁移到新桶中,此字段用于保留旧桶的溢出链。

    使用场景:在 map 扩容期间,为了确保迭代器能正确访问旧桶及其溢出 bucket,需要维护 oldoverflow。

  • nextOverflow

    作用:指向下一个可以使用的溢出 bucket。用于快速分配新的溢出 bucket。

    使用场景:插入操作导致当前 bucket 已满时,从 nextOverflow 取出下一个空闲溢出 bucket 使用;若无则新建。

总结:

  • overflow 和 oldoverflow 主要用于 GC 跟踪和迭代器支持,在 map 缩小或扩容时保留溢出 bucket。
  • nextOverflow 提高溢出 bucket 分配效率,减少频分内存分配开销。

# 3.3 bmap 结构体 🪣

// A bucket for a Go map.
const bucketCnt = 8
type bmap struct {
	tophash [abi.MapBucketCount]uint8
}
  • bmap 就是 map 中的桶,可以存储 8 组 key-value 对的数据,以及一个指向下一个溢出桶的指针

  • 每组 key-value 对数据包含 key 高 8 位 hash 值 tophash,key 和 val 三部分

  • 在代码层面之展示了 tophash 部分,但由于 tophash、key 和 val 的数据长度固定,因此可以通过内存地址偏移的方式寻找到后续的 key 数组、val 数组以及溢出桶指针

    每个 bmap 表示一个桶,里面存放 8 个数据:

type bmap struct {
	tophash [abi.MapBucketCount]uint8
  key 
  values
  overflow
}
  • tophash: 哈希值的高 8 位
  • key: 键(没在代码中,在编译时加入)
  • elem: 值(没在代码中,在编译时加入)
  • overflow: 溢出指针 (没在代码中,在编译时加入)

# 4. 构造方法 🔨

image-20250725175703951

# 4.1 makemap 🏗️

func makemap(t *maptype, hint int, h *hmap) *hmap {
  //hint 为 map 拟分配的容量;在分配前,会提前对拟分配的内存大小进行判断,倘若超限,会将 hint 置为零
	mem, overflow := math.MulUintptr(uintptr(hint), t.Bucket.Size_)
	if overflow || mem > maxAlloc {
		hint = 0
	}
  // 通过 new 方法初始化 hmap
	if h == nil {
		h = new(hmap)
	}
  // 构造 hash 因子
	h.hash0 = uint32(rand())
  // 大致上基于 log2 (B) >= hint 的思路,计算桶数组的容量 B
	B := uint8(0)
	for overLoadFactor(hint, B) {
		B++
	}
	h.B = B
	
	if h.B != 0 {
		var nextOverflow *bmap
    // 调用 makeBucketArray 方法,初始化桶数组 hmap.buckets
		h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
    // 倘若 map 容量较大,会提前申请一批溢出桶 hmap.extra
		if nextOverflow != nil {
			h.extra = new(mapextra)
			h.extra.nextOverflow = nextOverflow
		}
	}
	return h
}

# 4.2 overLoadFactor ⚖️

通过 overLoadFactor 方法,对 map 预分配容量和桶数组长度指数进行判断,决定是否仍需要增长 B 的数值

const	loadFactorDen = 2
const loadFactorNum = 13
MapBucketCount = 1 << 3
func overLoadFactor(count int, B uint8) bool {
	return count > abi.MapBucketCount && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
func bucketShift(b uint8) uintptr {
	// Masking the shift amount allows overflow checks to be elided.
	return uintptr(1) << (b & (goarch.PtrSize*8 - 1))
}

(1) 如果 map 预分配容量小于等于 8,B 取 0,桶的个数为 1
(2) 保证 map 预分配容量大小等于桶数组长度 * 6.5

kv 对数量 桶数组长度指数 B 桶数组长度 2^B
0 ~ 8 0 1
9 ~ 13 1 2
14 ~ 26 2 4
27 ~ 52 3 8
2^(B-1) * 6.5+1 ~ 2^B*6.5 B 2^B

# 4.3 makeBucketArray 🪣

makeBucketArray 方法会进行桶数组的初始化,并根据桶的数量决定是否需要提前做溢出桶的初始化。

func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
	// 通过 base 记录桶数组的长度,不包含溢出桶;
	// 通过 nbuckets 记录累加上溢出桶后,桶数组的总长度
	base := bucketShift(b)
	nbuckets := base
	// 桶数组的指数 b>=4 时(桶数组的数量 >=54),会需要提前创建溢出桶
	if b >= 4 {
		nbuckets += bucketShift(b - 4)
		sz := t.Bucket.Size_ * nbuckets
		up := roundupsize(sz, !t.Bucket.Pointers())
		if up != sz {
			nbuckets = up / t.Bucket.Size_
		}
	}
	if dirtyalloc == nil {
		// 调用 newarray 方法为桶数组申请内存空间,连带着需要初始化的溢出桶
		buckets = newarray(t.Bucket, int(nbuckets))
	} else {
		buckets = dirtyalloc
		size := t.Bucket.Size_ * nbuckets
		if t.Bucket.Pointers() {
			memclrHasPointers(buckets, size)
		} else {
			memclrNoHeapPointers(buckets, size)
		}
	}
	// 如果 base != bbuckets, 说明需要创建溢出桶,会基于地址偏移的方式,通过 nextOverflow 指向首个溢出桶的地址
	if base != nbuckets {
	
		nextOverflow = (*bmap)(add(buckets, base*uintptr(t.BucketSize)))
		last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.BucketSize)))
		last.setoverflow(t, (*bmap)(buckets))
	}
	return buckets, nextOverflow
}
// 如果需要创建溢出桶,会在将最后一个溢出桶的 overflow 指针指向 buckets 数组,以次来表示申请的溢出桶已经用完
func (b *bmap) setoverflow(t *maptype, ovf *bmap) {
	*(**bmap)(add(unsafe.Pointer(b), uintptr(t.BucketSize)-goarch.PtrSize)) = ovf
}

# 5. 读流程 📖

image-20250726210312160

map 度流程大概分为以下几步:

  1. 根据 key 取得 hash 值
  2. 根据 hash 值对桶数组取模,确定所在的桶
  3. 沿着桶链表依次遍历各个桶内的 key-value 对
  4. 命中相同的 key,则返回 value;倘若 key 不存在,则返回零值。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	// 倘若 map 未初始化,或此时存在 key-value 对数量为 0,直接返回零值
	if h == nil || h.count == 0 {
		if err := mapKeyError(t, key); err != nil {
			panic(err) // see issue 23734
		}
		return unsafe.Pointer(&zeroVal[0])
	}
  // 如果发现其他 goroutine 在写 map,直接抛出并发读写的 fatal error;其中,并发写标志,位于 hmap.flags 的第 3 个 bit 位
  // const hashWriting = 4
	if h.flags&hashWriting != 0 {
		fatal("concurrent map read and map write")
	}
  // 通过 maptype.hasher () 方法计算得到 key 的 hash 值,并对桶数组长度取模,取得对应的桶。
  // 关于 hash 方法的内部实现,golang 并未暴露
	hash := t.Hasher(key, uintptr(h.hash0))
  //bucketMask 方法会根据 B 求得桶数组长度 - 1 的值,用于后续 & 运算,实现取模的效果
	m := bucketMask(h.B)
	b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.BucketSize)))
  // 在取桶时,会关注当前 map 是否处于扩容的流程
  // 倘若是,需要在老的桶数组 oldBuckets 中取桶,通过 evacuated 方法判断统数据是已迁到新桶还是仍存留在老桶中,如果仍在老桶,需要取老桶进行遍历
	if c := h.oldbuckets; c != nil {
    // 在取老桶前,会先判断 map 的扩容流程是否增量扩容,倘若是,说明老桶数组的长度是新桶数组的一半,需要将桶长度 m 除以 2
    // const sameSizeGrow = 8
		if !h.sameSizeGrow() {
			// There used to be half as many buckets; mask down one more power of two.
			m >>= 1
		}
		oldb := (*bmap)(add(c, (hash&m)*uintptr(t.BucketSize)))
		if !evacuated(oldb) {
			b = oldb
		}
	}
  // 取 key hash 值的高 8 位值 top
	top := tophash(hash)
  
  // 开启两层 for 循环进行遍历流程,外层基于桶链表,依次遍历首个桶和后续的每个溢出桶,内层一次遍历一个桶内的 key-value 对
bucketloop:
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < abi.MapBucketCount; i++ {
      // 首先查询高 8 位的 tophash 值,看是否和 key 的 top 值匹配
      // 倘若不匹配且当前 tophash 值为 0,说明桶的后续位置都未放入过元素,当前 key 在 map 中不存在,可以直接打破循环,返回零值
			if b.tophash[i] != top {
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
			if t.IndirectKey() {
				k = *((*unsafe.Pointer)(k))
			}
      // 若找到了相等的 key,则通过地址偏移的方式取到 value 并返回
      // 其中 dataOffset 为一个桶中 tophash 数组所占的空间大小
			if t.Key.Equal(key, k) {
				e := add(unsafe.Pointer(b), dataOffset+abi.MapBucketCount*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
				if t.IndirectElem() {
					e = *((*unsafe.Pointer)(e))
				}
				return e
			}
		}
	}
  // 倘若遍历完成,仍未找到匹配的目标,则返回零值兜底
	return unsafe.Pointer(&zeroVal[0])
}
// 取老桶时,会调用 evacuated 方法判断数据是否已经迁移到新桶。
// 判断的方式是,取桶中首个 tophash 值,倘若该值为 2,3,4 中的一个,都代表数据已经完成迁移
// const emptyOne = 1
// const evacuatedX = 2
// const evacuatedY = 3
// cosnt evacuatedEmpty = 4
// cosnt minTopHash = 5
func evacuated(b *bmap) bool {
	h := b.tophash[0]
	return h > emptyOne && h < minTopHash
}
// 取 key hash 值的高 8 位值 top. 倘若该值 < 5,会累加 5,以避开 0 ~ 4 的取值。因为这几个值会用于枚举,具有一些特殊的含义.
// const minTopHash = 5
func tophash(hash uintptr) uint8 {
	top := uint8(hash >> (goarch.PtrSize*8 - 8))
	if top < minTopHash {
		top += minTopHash
	}
	return top
}

# 6. 写流程 ✍️

map 写流程主要分为以下几步:

  1. 根据 key 计算 hash 值
  2. 根据 hash 值对桶数组取模,确定所在的桶
  3. 如果 map 处于扩容,则迁移命中的桶,帮助推进渐进式扩容
  4. 沿着桶链表依次遍历各个桶内的 key-value 对
  5. 命中相同的 key,则更新 value;若 key 不存在,则插入新的 key-value 对
  6. 倘若发现 map 达成扩容条件,则会开启扩容模式,并重新返回第 2 步

image-20250726232807808

map 写操作位于 runtime/map.gomapassign 方法中,主要分为以下几步:

// 负载在 go 的 map 中插入或更新一个键值对,处理哈希冲突、扩容逻辑以及内存分配,是 map 写操作的核心实现。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	// 判空,如果 map 为 nil,直接 panic
  if h == nil {
		panic(plainError("assignment to entry in nil map"))
	}
	
  // 检查是否有并发写或者删除操作,抛出并发写 fatal error
	if h.flags&hashWriting != 0 {
		fatal("concurrent map writes")
	}
  // 通过 maptype.hasher () 计算 key 的哈希值
	hash := t.Hasher(key, uintptr(h.hash0))
	// Set hashWriting after calling t.hasher, since t.hasher may panic,
	// in which case we have not actually done a write.
	// 通过异或位运算,将 map.flags 的第 3 个 bit 位置为 1,添加写标志 
  	// 设置 hashWriting 标志,防止并发写(放在 hasher 后,避免 hasher panic 导致未初始化标志)
	h.flags ^= hashWriting
	// 如果 buckets 为 nil,分配初始 bucket
	if h.buckets == nil {
		h.buckets = newobject(t.Bucket) // newarray(t.Bucket, 1)
	}
again:
  // 找到当前 key 对应的桶索引 bucket
	bucket := hash & bucketMask(h.B)
  // 如果正在扩容,迁移当前 bucket 数据
	if h.growing() {
		growWork(t, h, bucket)
	}
  // 从 map 的桶数组 buckets 出发,结合桶索引和桶容量大小,进行地址偏移,获得对应桶 b
	b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize)))
  // 计算该 key 对应的高 8 位 tophash 值
	top := tophash(hash)
	// 提前声明好的三个指针,用于指向存放 key-value 的空槽
	var inserti *uint8 //tophash 拟插入的位置
	var insertk unsafe.Pointer //key 拟插入的位置
	var elem unsafe.Pointer //val 拟插入的位置
  // 开启两层 for 循环,外层沿着桶链表依次遍历,内层一次遍历桶内的 key-value 对
bucketloop:
	for {
    // 遍历当前 bucket 中所有的槽位
		for i := uintptr(0); i < abi.MapBucketCount; i++ {
      // 如果 key 的 tophash 和当前位置 tophash 不同,则会尝试将 inserti,insertk,elem 调整指向首个空位置,用于后续的插入操作
			if b.tophash[i] != top {
        // 如果是新槽位且尚未找到插入点
				if isEmpty(b.tophash[i]) && inserti == nil {
					inserti = &b.tophash[i]
					insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
					elem = add(unsafe.Pointer(b), dataOffset+abi.MapBucketCount*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
				}
        // 如果发现当前位置 tophash 标识位 emptyRest (0),则说明当前链表后续位置都为空,无需继续比那里,直接 break 遍历流程即可
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
      // 如果找到了相等的 key,则执行更新操作,并且直接跳转到方法的 done 标志位处,进行收尾处理
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
			if t.IndirectKey() {
				k = *((*unsafe.Pointer)(k))
			}
			if !t.Key.Equal(key, k) {
				continue
			}
			// already have a mapping for key. Update it.
      // 已存在 key,更新 key 和 value
			if t.NeedKeyUpdate() {
				typedmemmove(t.Key, k, key)
			}
			elem = add(unsafe.Pointer(b), dataOffset+abi.MapBucketCount*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
			goto done
		}
    // 查找 overflow bucket
		ovf := b.overflow(t)
		if ovf == nil {
			break
		}
		b = ovf
	}
  // 如果没有找到相等的 key,则会在执行插入操作前,判断 map 是否需要开启扩容模式
  // 倘若需要扩容,会在开启扩容模式后,跳转回 again 标志位,重新开始桶的定位以及遍历流程
	if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
		hashGrow(t, h)
		goto again // Growing the table invalidates everything, so try again
	}
  // 如果遍历完桶链表,都没有找到插入位置,则新建 overflow bucket,挂在在桶链表的尾部,并将 inserti,insertk,elem 指向溢出桶的首个空位
	if inserti == nil {
		// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
		newb := h.newoverflow(t, b)
		inserti = &newb.tophash[0]
		insertk = add(unsafe.Pointer(newb), dataOffset)
		elem = add(insertk, abi.MapBucketCount*uintptr(t.KeySize))
	}
	// 将 tophash,key,value 插入到取得空位中,并且将 map 的 key-value 对计数器 count 值加 1
	if t.IndirectKey() {
		kmem := newobject(t.Key)
		*(*unsafe.Pointer)(insertk) = kmem
		insertk = kmem
	}
	if t.IndirectElem() {
		vmem := newobject(t.Elem)
		*(*unsafe.Pointer)(elem) = vmem
	}
	typedmemmove(t.Key, insertk, key)
	*inserti = top
	h.count++
// 收尾环节,再次检验是否有其他协程并发写,倘若有,则抛出 fatal error。将 hmap.flags 中的写标记抹去,然后退出方法
done:
  // 检查是否仍在写状态(防止并发问题)
	if h.flags&hashWriting == 0 {
		fatal("concurrent map writes")
	}
  // 清除 hashWriting 标志
	h.flags &^= hashWriting
  // 如果 elem 是间接的,解引用
	if t.IndirectElem() {
		elem = *((*unsafe.Pointer)(elem))
	}
	return elem
}

# newoverflow 🌊

image-20250726225720223

创建溢出桶:

  1. 如果 hmap.extra 中还有剩余可用的溢出桶,则直接获取 hmap.extra.nextOverflow,并将 nextOverflow 调整指向下一个空闲可用的溢出桶
  2. 如果 hmap 已经没有空闲溢出桶了,则创建一个新的溢出桶
  3. hmap 的溢出桶数量 hmap.noverflow 累加 1
  4. 将新获得的溢出桶添加到原桶链表的尾部
  5. 返回溢出桶
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
	var ovf *bmap
	if h.extra != nil && h.extra.nextOverflow != nil {
		// We have preallocated overflow buckets available.
		// See makeBucketArray for more details.
		ovf = h.extra.nextOverflow
		if ovf.overflow(t) == nil {
			// We're not at the end of the preallocated overflow buckets. Bump the pointer.
			h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.BucketSize)))
		} else {
			// This is the last preallocated overflow bucket.
			// Reset the overflow pointer on this bucket,
			// which was set to a non-nil sentinel value.
			ovf.setoverflow(t, nil)
			h.extra.nextOverflow = nil
		}
	} else {
		ovf = (*bmap)(newobject(t.Bucket))
	}
	h.incrnoverflow()
	if !t.Bucket.Pointers() {
		h.createOverflow()
		*h.extra.overflow = append(*h.extra.overflow, ovf)
	}
	b.setoverflow(t, ovf)
	return ovf
}

# 7. 删流程 🗑️

image-20250726234423533
map 删除流程:

  1. 根据 key 计算 hash 值
  2. 根据 hash 值对桶数组取模,得到桶索引
  3. 如果 map 处于扩容,则迁移命中的桶,帮助推进渐进式扩容
  4. 沿着桶内链表依次遍历各个桶内的 key-value 对
  5. 如果命中相同的 key,删除对应的 key-value 对,并将当前的 tophash 置为 emptyOne,表示为空
  6. 如果当前位置位置,或者下一个位置的 tophash 为 emptyRest, 则沿当前位置向前遍历,将毗邻的 emptyOne 统一更新为 emptyRest

map 删除操作位于 runtime.map.gomapdelete 方法中:

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
	// 如果 map 未初始化或者内部 key-value 对数量为 0,删除时不会报错,直接返回
	if h == nil || h.count == 0 {
		if err := mapKeyError(t, key); err != nil {
			panic(err) // see issue 23734
		}
		return
	}
  // 如果存在其他 goroutine 在进行写或者删操作,抛出并发写 fatal error
	if h.flags&hashWriting != 0 {
		fatal("concurrent map writes")
	}
  // 通过 maptype.hasher () 方法求得 key 对应的 hash 值
	hash := t.Hasher(key, uintptr(h.hash0))
	// Set hashWriting after calling t.hasher, since t.hasher may panic,
	// in which case we have not actually done a write (delete).
  // 通过异或位运算,将 map.flags 的第三个 bit 位 置为 1,添加写标记
	h.flags ^= hashWriting
	// 找到当前 key 对应的桶索引 bucket
	bucket := hash & bucketMask(h.B)
  // 倘若发现当前 map 正处于扩容过程,则帮助其渐进扩容
	if h.growing() {
		growWork(t, h, bucket)
	}
  // 从 map 的桶数组 buckets 出发,结合桶索引和桶容量大小,进行地址偏移,获得对应桶 b,并赋值给 bOrig
	b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize)))
	bOrig := b
  // 取得 key 的高 8 位 tophash
	top := tophash(hash)
  // 开启两层 for 循环,外层沿着桶链表依次遍历,内层依次遍历桶内的 key-value 对
search:
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < abi.MapBucketCount; i++ {
			if b.tophash[i] != top {
        // 遍历时,如果发现当前位置 tophash 值为 emptyRest,则直接结束遍历流程
				if b.tophash[i] == emptyRest {
					break search
				}
				continue
			}
      // 如果 key 不相等,则继续遍历
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
			k2 := k
			if t.IndirectKey() {
				k2 = *((*unsafe.Pointer)(k2))
			}
			if !t.Key.Equal(key, k2) {
				continue
			}
			// Only clear key if there are pointers in it.
      // 如果 key 相等,删除对应的 key-value 对,并且将当前位置的 tophash 置为 emptyOne
			if t.IndirectKey() {
				*(*unsafe.Pointer)(k) = nil
			} else if t.Key.Pointers() {
				memclrHasPointers(k, t.Key.Size_)
			}
			e := add(unsafe.Pointer(b), dataOffset+abi.MapBucketCount*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
			if t.IndirectElem() {
				*(*unsafe.Pointer)(e) = nil
			} else if t.Elem.Pointers() {
				memclrHasPointers(e, t.Elem.Size_)
			} else {
				memclrNoHeapPointers(e, t.Elem.Size_)
			}
			b.tophash[i] = emptyOne
			// If the bucket now ends in a bunch of emptyOne states,
			// change those to emptyRest states.
			// It would be nice to make this a separate function, but
			// for loops are not currently inlineable.
      // 倘若当前位置不位于最后一个桶的最后一个位置,或者当前位置的后置位 tophash 不为 emptyRest,则无需向前遍历更新 tophash 标识,直接跳转到 notLast 位置即可
			if i == abi.MapBucketCount-1 {
				if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
					goto notLast
				}
			} else {
				if b.tophash[i+1] != emptyRest {
					goto notLast
				}
			}
      // 向前遍历,将沿途的空位 (tophash 为 emptyOne) 的 tophash 都更新为 emptySet
			for {
				b.tophash[i] = emptyRest
				if i == 0 {
					if b == bOrig {
						break // beginning of initial bucket, we're done.
					}
					// Find previous bucket, continue at its last entry.
					c := b
					for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
					}
					i = abi.MapBucketCount - 1
				} else {
					i--
				}
				if b.tophash[i] != emptyOne {
					break
				}
			}
      // 如果成功从 map 中删除了一组 key-value 对,则将 hmap 的计数器 count 值减 1,如果 map 中的元素全部被删除完了,会为 map 更换一个新的随机因子 hash0
		notLast:
			h.count--
			// Reset the hash seed to make it more difficult for attackers to
			// repeatedly trigger hash collisions. See issue 25237.
			if h.count == 0 {
				h.hash0 = uint32(rand())
			}
			break search
		}
	}
	// 收尾环节,再次检验是否有其他协程并发写,如果有,则抛出 fatal error。
  // 将 hmap.flags 中的写标记抹去,然后退出方法
	if h.flags&hashWriting == 0 {
		fatal("concurrent map writes")
	}
	h.flags &^= hashWriting
}

# 8. 遍历流程 🔄

image-20250727001454920

Map 的遍历流程首先会走进 runtime/map.go 的 mapiterinit () 方法中,初始化用于遍历的迭代器 hiter;

接着会调用 runtime/map.go 的 mapiternext () 方法开启遍历流程

# 8.1 迭代器数据结构 🔍

type hiter struct {
  // 指向遍历得到的 key 的指针
	key         unsafe.Pointer // Must be in first position.  Write nil to indicate iteration end (see cmd/compile/internal/walk/range.go).
  // 指向遍历得到的 value 的指针
	elem        unsafe.Pointer // Must be in second position (see cmd/compile/internal/walk/range.go).
  //map 类型,包含了 key,value 类型大小等信息
	t           *maptype
  //map 的指针
	h           *hmap
  //map 的桶数组
	buckets     unsafe.Pointer // bucket ptr at hash_iter initialization time
  // 当前遍历到的桶
	bptr        *bmap          // current bucket
  // 新桶数组对应的溢出桶
	overflow    *[]*bmap       // keeps overflow buckets of hmap.buckets alive
  // 老桶数组对应的溢出桶
	oldoverflow *[]*bmap       // keeps overflow buckets of hmap.oldbuckets alive
  // 遍历起始位置的桶索引
	startBucket uintptr        // bucket iteration started at
  // 遍历起始位置的 key-value 对索引
	offset      uint8          // intra-bucket offset to start from during iteration (should be big enough to hold bucketCnt-1)
  // 遍历是否穿越桶数组尾端回到头部了
	wrapped     bool           // already wrapped around from end of bucket array to beginning
  // 桶数组的长度指数
	B           uint8
  // 当前遍历到的 key-value 对在桶中的索引
	i           uint8
  // 当前遍历到的桶
	bucket      uintptr
  // 因为扩容流程的存在,需要额外检查的桶
	checkBucket uintptr
}

# 8.2 mapiterinit 🚀

map 遍历流程开始时,首先会走进 runtime/map.go 的 mapiterinit () 方法当中,此时会对创建 map 迭代器 hiter,并且 通过取随机数的方式,决定遍历的起始桶号,以及起始 key-value 对索引号.

随机初始化
func mapiterinit(t *maptype, h *hmap, it *hiter) {
	
	it.t = t
	if h == nil || h.count == 0 {
		return
	}
	if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 {
		throw("hash_iter size incorrect") // see cmd/compile/internal/reflectdata/reflect.go
	}
	it.h = h
	// grab snapshot of bucket state
	it.B = h.B
	it.buckets = h.buckets
	if !t.Bucket.Pointers() {
		// Allocate the current slice and remember pointers to both current and old.
		// This preserves all relevant overflow buckets alive even if
		// the table grows and/or overflow buckets are added to the table
		// while we are iterating.
		h.createOverflow()
		it.overflow = h.extra.overflow
		it.oldoverflow = h.extra.oldoverflow
	}
	// decide where to start
  // 通过取随机数的方式,决定遍历时的起始桶,以及桶中起始 key-value 对的位置
	r := uintptr(rand())
	it.startBucket = r & bucketMask(h.B)
	it.offset = uint8(r >> h.B & (abi.MapBucketCount - 1))
	// iterator state
	it.bucket = it.startBucket
	// Remember we have an iterator.
	// Can run concurrently with another mapiterinit().
	if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
		atomic.Or8(&h.flags, iterator|oldIterator)
	}
	// 外层迭代器 hiter 中各项参数的初始化后,进入 mapiternext 方法开启遍历
	mapiternext(it)
}

# 8.3 mapiternext ⏭️

func mapiternext(it *hiter) {
	h := it.h
	// 遍历时发现其他 goroutine 在并发写,直接抛出 fatal error
	if h.flags&hashWriting != 0 {
		fatal("concurrent map iteration and map write")
	}
	t := it.t
	bucket := it.bucket
	b := it.bptr
	i := it.i
	checkBucket := it.checkBucket
// 开启最外圈的循环,依次遍历桶数组中的每个桶链表,通过 next 和 goto next 关键字实现循环代码块
next:
	if b == nil {
    // 如果已经遍历完所有的桶,重新回到初始桶位置,则直接结束方法
		if bucket == it.startBucket && it.wrapped {
			// end of iteration
			it.key = nil
			it.elem = nil
			return
		}
    // 如果 map 处于扩容流程,取桶时兼容新老桶数组的逻辑,如果桶处于旧桶数组且未完成迁移,需要将 checkBucket 置为当前的桶号
		if h.growing() && it.B == h.B {
			// Iterator was started in the middle of a grow, and the grow isn't done yet.
			// If the bucket we're looking at hasn't been filled in yet (i.e. the old
			// bucket hasn't been evacuated) then we need to iterate through the old
			// bucket and only return the ones that will be migrated to this bucket.
			oldbucket := bucket & it.h.oldbucketmask()
			b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.BucketSize)))
			if !evacuated(b) {
				checkBucket = bucket
			} else {
				b = (*bmap)(add(it.buckets, bucket*uintptr(t.BucketSize)))
				checkBucket = noCheck
			}
		} else {
			b = (*bmap)(add(it.buckets, bucket*uintptr(t.BucketSize)))
			checkBucket = noCheck
		}
    // 遍历的桶号加 1,倘若来到桶数组末尾,则将桶号置为 0。将 key-value 对的遍历索引 i 置为 0
		bucket++
		if bucket == bucketShift(it.B) {
			bucket = 0
			it.wrapped = true
		}
		i = 0
	}
  // 依次遍历各个桶中每个 key-value 对
	for ; i < abi.MapBucketCount; i++ {
		offi := (i + it.offset) & (abi.MapBucketCount - 1)
		if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
			// TODO: emptyRest is hard to use here, as we start iterating
			// in the middle of a bucket. It's feasible, just tricky.
			continue
		}
		k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.KeySize))
		if t.IndirectKey() {
			k = *((*unsafe.Pointer)(k))
		}
		e := add(unsafe.Pointer(b), dataOffset+abi.MapBucketCount*uintptr(t.KeySize)+uintptr(offi)*uintptr(t.ValueSize))
    // 倘若遍历到的桶属于旧桶数组未迁移完成的桶,需要按照其在新桶中的顺序完成遍历. 
	// 比如,增量扩容流程中,旧桶中的 key-value 对最终应该被分散迁移到新桶数组的 x、y 两个区域,则此时遍历时,哪怕 key-value 对仍存留在旧桶中未完成迁移,遍历时也应该严格按照其在新桶数组中的顺序来执行.
		if checkBucket != noCheck && !h.sameSizeGrow() {
			if t.ReflexiveKey() || t.Key.Equal(k, k) {
				hash := t.Hasher(k, uintptr(h.hash0))
				if hash&bucketMask(it.B) != checkBucket {
					continue
				}
			} else {
				if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
					continue
				}
			}
		}
		if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
			!(t.ReflexiveKey() || t.Key.Equal(k, k)) {
			it.key = k
			if t.IndirectElem() {
				e = *((*unsafe.Pointer)(e))
			}
			it.elem = e
		} else {
      // 执行 mapaccessK 方法,基于读流程方法获取 key-value 对,通过迭代的 hiter 的 key、value 指针进行接收,用于对用户的遍历操作进行响应
			rk, re := mapaccessK(t, h, k)
			if rk == nil {
				continue // key has been deleted
			}
			it.key = rk
			it.elem = re
		}
		it.bucket = bucket
		if it.bptr != b { // avoid unnecessary write barrier; see issue 14921
			it.bptr = b
		}
		it.i = i + 1
		it.checkBucket = checkBucket
		return
	}
	b = b.overflow(t)
	i = 0
	goto next
}

# 9. 扩容流程 📈

# 9.1 扩容类型 🔄

image-20250727114738298

Map 的扩容类型分为两类,一类叫做增量扩容,一类叫做等量扩容

  1. 增量扩容

    表现:扩容后,桶数组的长度增长为原长度的 2 倍

    目的:降低每个桶中 key-value 对的数量,优化 map 操作的时间复杂度

  2. 等量扩容

    表现:扩容后,桶数组的长度和之前保持一致,但溢出桶的数量会下降

    目的:提高桶主体结构的数据填充率,减少溢出桶数量,避免发生内存泄漏

# 9.2 何时扩容 ⏰

  1. 只有 map 的写流程可能开启扩容模式

  2. 写 map 新插入 key-value 对之前,会发起是否需要扩容的逻辑判断:

    func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    	// ......
    	if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
    		hashGrow(t, h)
    		goto again // Growing the table invalidates everything, so try again
    	}
    	// ......
    }
  3. 根据 hmap 的 oldbuckets 是否为空,可以判断 map 此前是否已开启扩容模式

    func (h *hmap) growing() bool {
    	return h.oldbuckets != nil
    }
  4. 倘若此前未进入扩容模式,且 map 中 key-value 对的数量超过 8 个,且大于桶数组长度的 6.5 倍,则进入增量扩容

    const (
     loadFactorNum = 13
     loadFactorDen = 2
    )
    func overLoadFactor(count int, B uint8) bool {
    	return count > abi.MapBucketCount && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
    }
  5. 倘若溢出桶的数量大于 2^B(即桶数组的长度;B 大于 15 时取 15),则进入等量扩容

    func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
        if B > 15 {
           B = 15
        }
        return noverflow >= uint16(1)<<(B&15)
    }

# 9.3 如何开启扩容模式 🚀

开启扩容模式的方法位于 runtime/map.go 的 hashGrow 方法中

const sameSizeGrow = 8
func hashGrow(t *maptype, h *hmap) {
  // 如果是增量扩容,bigger 值取 1;
  // 如果是等量扩容,bigger 值取 0,并将 hmap.flags 的第四个 bit 位置为 1,表示当前处于等量扩容流程
	bigger := uint8(1)
	if !overLoadFactor(h.count+1, h.B) {
		bigger = 0
		h.flags |= sameSizeGrow
	}
  // 将原桶数组赋值给 oldbuckets, 并创建新的桶数组和一批新的溢出桶
  // 此处会通过变量 bigger,实现不同扩容模式,新桶数组长度的区别处理
	oldbuckets := h.buckets
	newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
  // 更新 hmap 的桶数组长度指数 B,flag 标识,并将新、老桶数组赋值给 hmap.oldBuckets 和 hmap.buckets
  // 扩容迁移进度 hmap.nevacuate 标记为 0
  // 新桶数组的溢出桶数量 hmap.noveflow 置为 0
	flags := h.flags &^ (iterator | oldIterator)
	if h.flags&iterator != 0 {
		flags |= oldIterator
	}
	// commit the grow (atomic wrt gc)
	h.B += bigger
	h.flags = flags
	h.oldbuckets = oldbuckets
	h.buckets = newbuckets
	h.nevacuate = 0
	h.noverflow = 0
	// 将原本存量可用的溢出桶赋给 hmap.extra.oldoverflow
  // 倘若存在下一个可用的溢出桶,赋给 hmap.extra.nextOverflow
	if h.extra != nil && h.extra.overflow != nil {
		// Promote current overflow buckets to the old generation.
		if h.extra.oldoverflow != nil {
			throw("oldoverflow is not nil")
		}
		h.extra.oldoverflow = h.extra.overflow
		h.extra.overflow = nil
	}
	if nextOverflow != nil {
		if h.extra == nil {
			h.extra = new(mapextra)
		}
		h.extra.nextOverflow = nextOverflow
	}
}

# 9.4 扩容迁移规则 📋

增量扩容流程

image-20250727161533038

  • 在等量扩容中,新桶数组长度与原桶数组相同, key-value 对在新桶数组和老桶数组的索引号保持一致
  • 在增量扩容中,新桶数组长度为原桶数组的两倍,把新桶数组中桶号对应于老桶数组的区域称为 x 区域,新扩展的区域称为 y 区域
  • 实际上,一个 key 属于哪个桶,取决于其 hash 值对桶数组长度取模得到的结果,因此依赖于其低位的 hash 值结果
  • 在增量扩容流程中,新桶数组的长度会扩展一位,假定 key 原本从属的桶号为 i,则在新桶数组中从属的桶号只可能是 i(x 区域)或者 i + 老桶数组长度(y 区域)
  • 当 key 低位 hash 值向左扩展一位的 bit 位为 0,则应该迁往 x 区域的 i 位置;如果该 bit 位为 1,应该迁往 y 区域对应的 i + 老桶数组长度的位置

# 9.5 渐进式扩容 🐌

Map 采用的是渐进扩容的方式,避免因为一次性的全量数据迁移引发性能抖动

当每次触发写、删操作时,会为处于扩容流程中的 map 完成两组桶的数据迁移:

  1. 一组桶时当前写、删除操作所命中的桶
  2. 另一组桶时,当前未迁移的桶中,索引最小的哪个桶
func growWork(t *maptype, h *hmap, bucket uintptr) {
	// make sure we evacuate the oldbucket corresponding
	// to the bucket we're about to use
	evacuate(t, h, bucket&h.oldbucketmask())
	// evacuate one more oldbucket to make progress on growing
	if h.growing() {
		evacuate(t, h, h.nevacuate)
	}
}

image-20250727163621609

数据迁移的逻辑位于 runtime/map.go 的 evacuate 方法中:

// 入参中,oldbucket 为当前要迁移的桶在旧桶数组中的索引
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
  // 获取到待迁移桶的内存地址 b
	b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.BucketSize)))
  // 获取到旧桶数组的容量 newbit
	newbit := h.noldbuckets()
  //evacuated 方法判断出桶 b 是否已经迁移过了,未迁移过,才进入此 if 分支进行迁移处理
	if !evacuated(b) {
		// 通过一个二元数组 xy 指向当前同可能迁移到的目的桶
    	//x = xy [0], 代表新桶数组中索引和旧桶数组一致的桶
    	//y = xy [1], 代表新桶数组中,索引为原索引加上旧桶容量的桶,只在增量扩容中会使用到
		var xy [2]evacDst
		x := &xy[0]
		x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.BucketSize)))
		x.k = add(unsafe.Pointer(x.b), dataOffset)
		x.e = add(x.k, abi.MapBucketCount*uintptr(t.KeySize))
		// 只有进入增量扩容的分支,才需要对 y 进行初始化
		if !h.sameSizeGrow() {
			// Only calculate y pointers if we're growing bigger.
			// Otherwise GC can see bad pointers.
			y := &xy[1]
			y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.BucketSize)))
			y.k = add(unsafe.Pointer(y.b), dataOffset)
			y.e = add(y.k, abi.MapBucketCount*uintptr(t.KeySize))
		}
		// 外层 for 循环,遍历桶 b 和对应的溢出桶
		for ; b != nil; b = b.overflow(t) {
      //k,e 分别记录遍历桶时,当前的 key 和 value 的指针
			k := add(unsafe.Pointer(b), dataOffset)
			e := add(k, abi.MapBucketCount*uintptr(t.KeySize))
      // 遍历桶内的 key-value 对
			for i := 0; i < abi.MapBucketCount; i, k, e = i+1, add(k, uintptr(t.KeySize)), add(e, uintptr(t.ValueSize)) {
        // 取每个位置的 tophash 值进行判断,倘若当前是个空位,则将当前位置 tophash 值置为 evacuatedEmpty, 开始遍历下一个位置
				top := b.tophash[i]
				if isEmpty(top) {
					b.tophash[i] = evacuatedEmpty
					continue
				}
				if top < minTopHash {
					throw("bad map state")
				}
				k2 := k
				if t.IndirectKey() {
					k2 = *((*unsafe.Pointer)(k2))
				}
        // 寻找到迁移的目的桶
        // const evacuatedX = 2
  			// const evacuatedY = 3  
				var useY uint8
				if !h.sameSizeGrow() {
					hash := t.Hasher(k2, uintptr(h.hash0))
					if h.flags&iterator != 0 && !t.ReflexiveKey() && !t.Key.Equal(k2, k2) {
						useY = top & 1
						top = tophash(hash)
					} else {
						if hash&newbit != 0 {
							useY = 1
						}
					}
				}
				if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
					throw("bad evacuatedN")
				}
				b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
				dst := &xy[useY]                 // evacuation destination
				// 将 key-value 对迁移到目的桶中,并且更新目的桶结构内几个指针的指向
				if dst.i == abi.MapBucketCount {
					dst.b = h.newoverflow(t, dst.b)
					dst.i = 0
					dst.k = add(unsafe.Pointer(dst.b), dataOffset)
					dst.e = add(dst.k, abi.MapBucketCount*uintptr(t.KeySize))
				}
				dst.b.tophash[dst.i&(abi.MapBucketCount-1)] = top // mask dst.i as an optimization, to avoid a bounds check
				if t.IndirectKey() {
					*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
				} else {
					typedmemmove(t.Key, dst.k, k) // copy elem
				}
				if t.IndirectElem() {
					*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
				} else {
					typedmemmove(t.Elem, dst.e, e)
				}
				dst.i++
				dst.k = add(dst.k, uintptr(t.KeySize))
				dst.e = add(dst.e, uintptr(t.ValueSize))
			}
		}
		// Unlink the overflow buckets & clear key/elem to help GC.
		if h.flags&oldIterator == 0 && t.Bucket.Pointers() {
			b := add(h.oldbuckets, oldbucket*uintptr(t.BucketSize))
			// Preserve b.tophash because the evacuation
			// state is maintained there.
			ptr := add(b, dataOffset)
			n := uintptr(t.BucketSize) - dataOffset
			memclrHasPointers(ptr, n)
		}
	}
	// 倘若当前迁移的桶是旧桶数组未迁移的桶中索引最小的一个,则 hmap.nevacuate 累加 1
  // 倘若已经迁移完所有的旧桶,则会确保 hmap.flags 中,等量扩容的标识被置为 0
	if oldbucket == h.nevacuate {
		advanceEvacuationMark(h, t, newbit)
	}
}
type evacDst struct {
	b *bmap          // current destination bucket
	i int            // key/elem index into b
	k unsafe.Pointer // pointer to current key storage
	e unsafe.Pointer // pointer to current elem storage
}
  • evacDst.b: 目的地所在桶
  • evacDst.i: 即将入桶的 key-value 对在桶中的索引
  • evacDst.k: 入桶 key 的存储指针
  • evacDst.e: 入桶 value 的存储指针
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
	h.nevacuate++
	stop := h.nevacuate + 1024
	if stop > newbit {
		stop = newbit
	}
	for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
		h.nevacuate++
	}
	if h.nevacuate == newbit { 
		h.oldbuckets = nil
		if h.extra != nil {
			h.extra.oldoverflow = nil
		}
		h.flags &^= sameSizeGrow
	}
}

# sync.map 🔒

# 1. 核心数据结构 🏗️

# 1.1 sync.Map

image-20250723233336309

type Map struct {
	// 互斥锁
	mu Mutex
	// 高性能只读 map
	// 多数情况下,读取都命中它(因为它原子可读)
	read atomic.Pointer[readOnly] 
    // 用来保存新增、更新的 key
	// 只有当 key 不在 read 时,才写入 dirty
	// 在读取 miss 达到阈值时,dirty 会合并到 read 中
	dirty map[any]*entry
	// 每次从 read 未命中但在 dirty 找到时,misses++
	// 如果 misses >= len (dirty),说明 read 太旧了,就同步 dirty->read
	misses int //cache miss 次数
}

sync.Map 的特点是 冗余了两份 map:read map 和 dirty map,通过互斥锁保证并发安全,通过原子指针实现无锁读写,通过 misses 计数触发 read 和 dirty 的合并。

# 1.2 entry 及对应的几种状态 📝

type entry struct {
	// 每个 key 对应的值是一个 entry
	// 使用原子指针包装实际值,实现无所读写(更新时加锁,读取时无锁)
    p atomic.Pointer[any]
}

kv 对中的 value,统一采用 unsafe.Pointer 的形式进行存储,通过 entry.p 的指针进行链接

entry.p 的指向分为三种情况:

  1. 存活态:正常指向元素

    即 key-entry 对仍未删除

  2. 软删除态:指向 nil

    read map 和 dirty map 底层的 map 结构仍存在 key-entry 对,但在逻辑上该 key-entry 已经被删除,因此无法被用户查询到

  3. 硬删除态:指向固定的全局变量 expunged

    ditry map 中已不存在该 key-entry 对

    var expunged = unsafe.Pointer(new(any))

# 1.3 readOnly 📖

type readOnly struct {
    m       map[any]*entry // 真正意义上的 read map,实现从 key 到 entry 的映射
    amended bool // 表示 dirty map 是否包含 read map 没有的 key
}

sync.Map 中只读 map:read 内部包含两个成员属性

  • m:真正意义上的 read map,实现从 key 到 entry 的映射
  • amended:表示 read map 中的 key-entry 对是否存在缺失,需要通过 dirty map 兜底

# 2. 读流程 📖

image-20250724004339991

# 2.1 sync.Map.Load() 📖

func (m *Map) Load(key any) (value any, ok bool) {
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}
  • 查看 read map 中是否存在 key-entry 对,若存在,则直接读取 entry 返回;
  • 倘若第一轮 read map 查询 miss,且 read map 不全,则需要加锁 double check;
  • 第二轮 read map 查询仍 miss(加锁后),且 read map 不全,则查询 dirty map 兜底;
  • 查询操作涉及到与 dirty map 的交互,misses 加一;
  • 解锁,返回查得的结果.

# 2.2 entry.load() 📖

func (e *entry) load() (value any, ok bool) {
	p := e.p.Load()
	if p == nil || p == expunged {
		return nil, false
	}
	return *p, true
}
  • sync.Map 中,kv 对的 value 是基于 entry 指针封装的形式;
  • 从 map 取得 entry 后,最终需要调用 entry.load 方法读取指针指向的内容;
  • 倘若 entry 的指针状态为 nil 或者 expunged,说明 key-entry 对已被删除,则返回 nil;
  • 倘若 entry 未被删除,则读取指针内容,并且转为 any 的形式进行返回.

# 2.3 sync.Map.missLocked() ⚡

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(&readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}
  • 在读流程中,倘若未命中 read map,且由于 read map 内容存在缺失需要和 dirty map 交互时,会走进 missLocked 流程;
  • 在 missLocked 流程中,首先 misses 计数器累加 1;
  • 倘若 miss 次数小于 dirty map 中存在的 key-entry 对数量,直接返回即可;
  • 倘若 miss 次数大于等于 dirty map 中存在的 key-entry 对数量,则使用 dirty map 覆盖 read map,并将 read map 的 amended flag 置为 false;
  • 新的 dirty map 置为 nil,misses 计数器清零.

# 3. 写流程 ✍️

image-20250724101516414

# 3.1 sync.Map.Store() ✍️

// Store 方法是对 Swap 的封装,用于存储键值对
// 它忽略了 Swap 返回的之前的值和是否存在的标志
func (m *Map) Store(key, value any) {
    _, _ = m.Swap(key, value)
}
// Swap 方法用于原子地替换键对应的值
// 返回之前的值(如果存在)和一个表示键是否存在的布尔值
//previous: 之前的值(如果键不存在则为 nil)
//loaded: 如果键存在为 true,否则为 false
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
    // 首先尝试从只读 map 中读取
    read := m.loadReadOnly()
    if e, ok := read.m[key]; ok {
        // 如果键存在于只读 map 中,尝试直接替换值
        //trySwap 是无锁操作,成功则直接返回
        if v, ok := e.trySwap(&value); ok {
            if v == nil {
                return nil, false
            }
            return *v, true
        }
    }
    // 如果无锁操作失败,需要加锁进行处理
    m.mu.Lock()
    read = m.loadReadOnly() // 重新获取只读 map,因为可能在加锁期间被其他 goroutine 修改
    if e, ok := read.m[key]; ok {
        // 键存在于只读 map 中
        if e.unexpungeLocked() {
            // 如果元素被标记为删除,则恢复它并添加到 dirty map
            m.dirty[key] = e
        }
        // 替换值并更新返回状态
        if v := e.swapLocked(&value); v != nil {
            loaded = true
            previous = *v
        }
    } else if e, ok := m.dirty[key]; ok {
        // 键存在于 dirty map 中
        // 替换值并更新返回状态
        if v := e.swapLocked(&value); v != nil {
            loaded = true
            previous = *v
        }
    } else {
        // 键既不在只读 map 也不在 dirty map 中
        if !read.amended {
            // 如果这是第一次向 dirty map 添加新键
            // 需要创建 dirty map 的副本并标记 read map 为不完整
            m.dirtyLocked()
            m.read.Store(&readOnly{m: read.m, amended: true})
        }
        // 在 dirty map 中创建新条目
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
    return previous, loaded
}

# 3.2 trySwap 🔄

func (m *Map) Swap(key, value any) (previous any, loaded bool) {
	read := m.loadReadOnly()
	if e, ok := read.m[key]; ok {
		if v, ok := e.trySwap(&value); ok {
			if v == nil {
				return nil, false
			}
			return *v, true
		}
	}
  
  m.mu.Lock()
  // ... 
  
}
func (e *entry) trySwap(i *any) (*any, bool) {
	for {
		p := e.p.Load()
		if p == expunged {
			return nil, false
		}
		if e.p.CompareAndSwap(p, i) {
			return p, true
		}
	}
}
  • 在写流程中,让若发现 read map 中已存在对应的 key-entry 对,则会对调用 trySwap 方法尝试进行更新
  • 让若 entry 为 expunged 态,说明已被硬删除,dirty 中缺失该项数据,因此 trySwap 执行失败
  • 倘若 entry 为非 expunged 态,则直接执行 CAS 操作完成值的更新即可

# 3.3 entry.unexpungeLocked() 🔓

func (m *Map) Swap(key, value any) (previous any, loaded bool) {
	// ......
	m.mu.Lock()
	read = m.loadReadOnly()
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			m.dirty[key] = e
		}
		if v := e.swapLocked(&value); v != nil {
			loaded = true
			previous = *v
		}
	}
  // ......
}
func (e *entry) unexpungeLocked() (wasExpunged bool) {
	return e.p.CompareAndSwap(expunged, nil)
}
  • 在写流程加锁 double check 的过程中,倘若发现 read map 中存在对应的 key-entry 对,会执行该方法
  • 倘若 key-entry 为硬删除 expunged 态,该方法 hi 基于 CAS 操作将其更新为软删除 nil 态,然后进一步 dirty map 中对齐该 key-entry 对,实现从硬删除到软删除的恢复

# 3.4 entry.swapLocked() 🔄

func (m *Map) Swap(key, value any) (previous any, loaded bool) {
	// ...... 
	m.mu.Lock()
	read = m.loadReadOnly()
	if e, ok := read.m[key]; ok {
		//......
	} else if e, ok := m.dirty[key]; ok {
		if v := e.swapLocked(&value); v != nil {
			loaded = true
			previous = *v
		}
	}
  // ......
}
func (e *entry) swapLocked(i *any) *any {
    return e.p.Swap(i)
}

写流程中,倘若 read map 或者 dirty map 存在对应 key-entry,最终会通过原子操作,将新值的指针存储到 entry.p 当中

# 3.5 sync.Map.dirtyLocked() 🧹

// 与 missLocked () 方法对应
func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}
	// 将 read map 中的数据拷贝回 dirty map
	read := m.loadReadOnly()
	m.dirty = make(map[any]*entry, len(read.m))
	for k, e := range read.m {
    // 过滤掉删除态的数据
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
	p := e.p.Load()
	for p == nil {
		if e.p.CompareAndSwap(nil, expunged) {
			return true
		}
		p = e.p.Load()
	}
	return p == expunged
}
  • 在写流程中,倘若需要将 key-entry 插入到兜底的 dirty map 中,并且此时 dirty map 为空(从未写入过数据或者刚发生过 missLocked),进入 dirtyLocked 流程
  • 此时会遍历一轮 read map,将未删除的 key-entry 对拷贝到 dirty map 当中
  • 在遍历时,还会将 read map 中软删除 nil 态的 entry 更新为硬删除 expunged 态,因此在此流程中,不会将其拷贝到 dirty map

# 4. 删流程 🗑️

image-20250724135308911

# 4.1 sync.Map.Delete() 🗑️

func (m *Map) Delete(key any) {
	m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			delete(m.dirty, key)
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if ok {
		return e.delete()
	}
	return nil, false
}
  1. 倘若 read map 中存在 key,则直接基于 cas 操作将其删除
  2. 倘若 read map 不存在 key,且 read map 有缺失 (amended flag 为 true),则加锁 double check;
  3. 倘若加锁 double check 时,read map 仍不存在 key 且 read map 有缺失,则从 dirty map 中取元素,并且将 key-entry 对从 dirty map 中物理删除
  4. 走入步骤 3,删操作需要和 dirty map 交互,需要走进 missLocked 流程
  5. 解锁
  6. 倘若从 read map 或者 dirty map 中获取到了 key 对应的 entry, 则走入 entry.delete () 方法逻辑删除 entry
  7. 倘若 read map 和 dirty map 中均不存在 key,返回 false 标识删除失败

# 4.2 entry.Delete() 🗑️

func (e *entry) delete() (value any, ok bool) {
	for {
		p := e.p.Load()
		if p == nil || p == expunged {
			return nil, false
		}
		if e.p.CompareAndSwap(p, nil) {
			return *p, true
		}
	}
}
  • entry 的逻辑删除流程
  • 倘若 entry 此前已被删除,则直接返回 false 标识删除失败
  • 倘若 entry 当前仍存在,则通过 CAS 将 entry.p 指向 nil,标识其已进入软删除状态

# 5. 遍历流程 🔄

image-20250724141718292

func (m *Map) Range(f func(key, value any) bool) {
	read := m.loadReadOnly()
	if read.amended {
		m.mu.Lock()
		read = m.loadReadOnly()
		if read.amended {
			read = readOnly{m: m.dirty}
			copyRead := read
			m.read.Store(&copyRead)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}
	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}
  1. 在遍历过程中,倘若发现 read map 数据不全(amended flag 为 true),会额外加一次锁,并使用 dirty map 覆盖 read map
  2. 遍历 read map(通过步骤 1 保证了 read map 有全量数据),执行用户传入的回调函数,倘若某次回调时返回值为 false,则会终止全流程

# 6. 总结 📝

# 6.1 entry 的 expunged 态 🔍

image-20250724153712447

为什么使用 expunged 态来区分软硬删除呢?仅用 nil 一种状态来标识删除不可以吗?

无论是软删除 (nil) 还是硬删除 (expunged),都表示在逻辑意义上 key-entry 对已经从 sync.Map 中删除,nil 和 expunged 的区别在于:

  • 软删除态(nil):read map 和 dirty map 在物理上仍保有该 key-entry 对,因此倘若此时需要对该 entry 执行写操作,可以直接 CAS 操作
  • 硬删除态(expunged):dirty map 中已经没有该 key-entry 对,倘若执行写操作,必须加锁 (dirty map 必须含有全量 key-entry 对数据)

设计 expunged 和 nil 两种状态的原因,就是为了优化在 dirtyLocked 前,针对同一个 key 先删后写的场景,通过 expunged 态额外标识出 dirty map 种仍具有指向该 entry 的能力,这样能够实现对一部分 nil 态 key-entry 对的解放,能够基于 CAS 完成这部分内容写入操作而无需加锁。

# 6.2 read map 和 dirty map 的数据流转 🔄

  • read map: 访问全程无锁 🔓
  • dirty map: 是兜底的读写 map,访问时需要加锁 🔒

之所以这样处理,是希望能根据对读、删、更新、写操作频次的探测,来实时动态地调整操作方式,希望在读、更新、删频次较高时,更多地采用 CAS 的方式无锁化地完成操作;在写操作频次较高时,则直接了当地采用加锁操作完成.

因此, sync.Map 本质上采取了一种以空间换时间 + 动态调整策略的设计思路,下面对两个 map 间的数据流转过程进行详细介绍:

# 6.2.1 两个 map 🗺️

image-20250724144239134

  • 总体思想,希望能多用 read map,少用 dirty map,因为操作前者无锁,后者需要加锁
  • 除了 expunged 态的 entry 之外,read map 的内容为 dirty map 的子集

# 6.2.2 dirty map -> read map 📈

image-20250724150532506

记录读 / 删流程中,通过 misses 记录访问 read map miss 由 dirty 兜底处理的次数,当 miss 次数达到阈值,则通过 missLocked 流程,进行新老 read/dirty 替换流程;此时将老 dirty 作为新 read,新 dirty 则暂时为空,直到 dirtyLocked 流程完成对 dirty 的初始化

# 6.2.3 read map -> dirty map 📉

image-20250724150506915

  • 发生 dirtyLocked 的前置条件:
      1. dirty 暂时为空(此前没有写操作或者近期进行过 missLocked 流程)
      1. 接下来一次写操作访问 read 时 miss,需要 dirty 兜底
  • 在 dirtyLocked 流程中,需要对 read 内的元素进行状态更新,因此需要遍历,是一个线性时间复杂度的过程,可能存在性能抖动
  • dirtyLocked 遍历中,会将 read 中未被删除的元素(非 nil 非 expunged)拷贝到 dirty 中;会将 read 中所有此前被删除的元素统一置为 expunged 态

# 6.3 适用场景与注意问题 ⚠️

  • sync.Map 适用于读多、更新多、删多、写少的场景 ✅
  • 倘若写操作过多,sync.Map 基本等价于互斥锁 + map ⚠️
  • sync.Map 可能存在性能抖动问题,主要发生在读 / 删流程 miss 只读 map 次数过多时(触发 missLocked 流程),下一次插入操作的过程当中(dirtyLocked 流程)💥
更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

ZJM 微信支付

微信支付

ZJM 支付宝

支付宝