当前位置: 代码迷 >> 综合 >> Linux内核同步原语之原子操作(Atomic)
  详细解决方案

Linux内核同步原语之原子操作(Atomic)

热度:94   发布时间:2023-12-17 01:15:57.0

一、原子操作的命名规则

Linux内核中提供了各种各样的原子操作函数。除了最原始的读取和设置之外,还包括各种运算,以及位操作等等。而且有的原子操作还要返回操作过后变量的值,有的要返回操作之前变量的值,如果再牵涉到内存屏障的问题,将这些因素组合起来,有非常多的原子操作函数。这些原子操作函数看似非常杂乱,其实函数命名是有规律的。

1)基本型

基本型包括非RMW(Read Modifiy Write)的读写操作,以及RMW的算术和位操作等。

非RMW的操作很简单,只有两个,即用来读取的atomic_read()和用来写入的atomic_set()。一般对单独变量的读取或写入操作本身都是原子的,如果代码中只对单个变量进行读写操作,而从来没对它使用RMW操作,那一般说明你用错了,没必要使用内核提供的原子操作。

RMW操作就很多了,包括下面几种:

  1. 算术操作:包括加、减、递增和递减,命名形式是atomic_{add,sub,inc,dec}();
  2. Bit位操作:包括与、或、异或和与非,命名形式是atomic_{and,or,xor,andnot}();
  3. 交换操作:包括交换(atomic_xchg())、比较交换(atomic_cmpxchg())和添加了返回成功与否的比较交换(atomic_try_cmpxchg())。

除了atomic_t类型外,Linux内核还支持对64位和长整形的atomic64_t和atomic_long_t类型,它们也都有对应的基本原子操作函数,只不过函数前缀名分别是atomic64_和atomic_long_,具体就不在赘述了。

2)要返回修改过后的值

如果一个原子操作函数要返回修改过后的原子变量的值,那么该函数名会含有“_return”字符串,并且是在表示具体操作字符串的后面。例如,atomic_add_return、atomic_dec_return_relaxed等。

3)要返回修改之前的值

如果一个原子操作函数要返回修改之前的原子变量的值,那么该函数名会含有“_fetch”字符串,并且是在表示具体操作字符串的前面。例如,atomic_fetch_and、atomic_fetch_or_acquire等。

4)有Acquire和Release单向屏障语义

 如果一个原子操作函数还需要包括Acquire或者Release单向屏障语义,那么该函数名会有“_acquire”或者“_release”后缀。例如,atomic_xchg_acquire、atomic_cmpxchg_release等。

相反的,如果一个原子操作函数名有“_relaxed”后缀,表示这个函数没有被任何内存屏障保护,可以被任意重排序。例如,atomic_add_return_relaxed、atomic_xchg_relaxed等。

只有当一个原子操作函数要返回值的时候才有可能添加_acquire、_release和_relaxed后缀。

二、原子操作的重排序规则

Linux内核的原子操作只保证对单一变量的某个操作是原子的,多个CPU同时操作时,不会出现中间的错误状态。但是,对这个原子操作本身,并不一定保证其执行的顺序,在SMP系统下,有可能会出现重排序的问题。因此,前面也提到过,有些原子操作函数自己就带了一定的内存屏障的语义。具体有没有带,带了多少,可以通过函数名看出来,具体规则如下:

  • 非RMW的原子操作可以被任意重排序;
  • RMW的原子操作,如果没有返回值可以被任意重排序;
  • RMW的原子操作,如果有返回值,并且没有_relaxed、_acquire和_release后缀,是有内存屏障保护的,可以保证不被重排序;
  • RMW的原子操作,如果有返回值,且有_relaxed后缀,没有任何内存屏障保护,可以被任意重排序;
  • RMW的原子操作,如果有返回值,且有_acquire后缀,表示读(RMW中的R)操作是有Acquire单向屏障保护的;
  • RMW的原子操作,如果有返回值,且有_release后缀,表示写(RMW中的W)操作是有Release单向屏障保护的;
  • RMW的原子操作,如果有条件判断,那么条件是否的那部分会被任意重排序。

对于那些不提供内存屏障语义的原子操作来说,为了保证其本身不被重排序,还需要显式的在其前面或后面使用内存屏障。Linux内核提供了两个函数,分别是smp_mb__before_atomic()用于原子操作函数的前面,和smp_mb__after_atomic()用于原子操作函数的后面。注意,函数的中间是“__”而不是“_”,而且说起来是两个不同的函数,但是内核实际上都把它们映射成了普通的通用内存屏障:

#ifndef __smp_mb__before_atomic
#define __smp_mb__before_atomic()	__smp_mb()
#endif#ifndef __smp_mb__after_atomic
#define __smp_mb__after_atomic()	__smp_mb()
#endif......#ifndef smp_mb__before_atomic
#define smp_mb__before_atomic()	__smp_mb__before_atomic()
#endif#ifndef smp_mb__after_atomic
#define smp_mb__after_atomic()	__smp_mb__after_atomic()
#endif

三、ARMv8架构下原子操作的实现

下面我们来看看上面提到的所有原子操作函数在Linux内核中是如何实现的,由于原子操作函数实在太多了,我们挑出几个有代表性的来分析,分别是atomic_add_return_acquire()、atomic_fetch_or_release()和atomic_cmpxchg()。它们的定义如下(代码位于include/asm-generic/atomic-instrumented.h中):

#if defined(arch_atomic_add_return_acquire)
static inline int
atomic_add_return_acquire(int i, atomic_t *v)
{kasan_check_write(v, sizeof(*v));return arch_atomic_add_return_acquire(i, v);
}
#define atomic_add_return_acquire atomic_add_return_acquire
#endif......#if defined(arch_atomic_fetch_or_release)
static inline int
atomic_fetch_or_release(int i, atomic_t *v)
{kasan_check_write(v, sizeof(*v));return arch_atomic_fetch_or_release(i, v);
}
#define atomic_fetch_or_release atomic_fetch_or_release
#endif......#if !defined(arch_atomic_cmpxchg_relaxed) || defined(arch_atomic_cmpxchg)
static inline int
atomic_cmpxchg(atomic_t *v, int old, int new)
{kasan_check_write(v, sizeof(*v));return arch_atomic_cmpxchg(v, old, new);
}
#define atomic_cmpxchg atomic_cmpxchg
#endif

可以看出来,具体的原子操作实现都是和架构相关的。那万一哪个原子操作在当前平台下没有被定义怎么办呢?例如,在ARMv8下,arch_atomic_try_cmpxchg就没有被定义,那么atomic_try_cmpxchg也不会被定义:

#if defined(arch_atomic_try_cmpxchg)
static inline bool
atomic_try_cmpxchg(atomic_t *v, int *old, int new)
{kasan_check_write(v, sizeof(*v));kasan_check_write(old, sizeof(*old));return arch_atomic_try_cmpxchg(v, old, new);
}
#define atomic_try_cmpxchg atomic_try_cmpxchg
#endif

对于那些没有平台架构没有定义的原子操作,还有一次补救的机会,例如在ARMv8下,atomic_try_cmpxchg函数真正的定义如下(代码位于include/linux/atomic-fallback.h中):

#ifndef atomic_try_cmpxchg
static inline bool
atomic_try_cmpxchg(atomic_t *v, int *old, int new)
{int r, o = *old;r = atomic_cmpxchg(v, o, new);if (unlikely(r != o))*old = r;return likely(r == o);
}
#define atomic_try_cmpxchg atomic_try_cmpxchg
#endif

所以,atomic_try_cmpxchg函数其实是基于已有的atomic_cmpxchg函数来实现的。

接着上面说,所以atomic_add_return_acquire函数实际调用的是arch_atomic_add_return_acquire函数,atomic_fetch_or_release函数实际调用的是arch_atomic_fetch_or_release函数,atomic_cmpxchg函数实际调用的是arch_cmpxchg_relaxed函数,它们分别定义如下(代码位于arch/arm64/include/asm/atomic.h中):

#define arch_atomic_add_return_acquire		arch_atomic_add_return_acquire
......
#define arch_atomic_fetch_or_release		arch_atomic_fetch_or_release
......
#define arch_atomic_cmpxchg(v, old, new) \arch_cmpxchg(&((v)->counter), (old), (new))

arch_cmpxchg函数先放一下,后面说。arch_atomic_add_return_acquire函数和atomic_fetch_or_release函数都是由下面的宏定义的:

#define ATOMIC_FETCH_OP(name, op)					\
static inline int arch_##op##name(int i, atomic_t *v)			\
{									\return __lse_ll_sc_body(op##name, i, v);			\
}#define ATOMIC_FETCH_OPS(op)						\ATOMIC_FETCH_OP(_relaxed, op)					\ATOMIC_FETCH_OP(_acquire, op)					\ATOMIC_FETCH_OP(_release, op)					\ATOMIC_FETCH_OP(        , op)ATOMIC_FETCH_OPS(atomic_fetch_andnot)
ATOMIC_FETCH_OPS(atomic_fetch_or)
ATOMIC_FETCH_OPS(atomic_fetch_xor)
ATOMIC_FETCH_OPS(atomic_fetch_add)
ATOMIC_FETCH_OPS(atomic_fetch_and)
ATOMIC_FETCH_OPS(atomic_fetch_sub)
ATOMIC_FETCH_OPS(atomic_add_return)
ATOMIC_FETCH_OPS(atomic_sub_return)#undef ATOMIC_FETCH_OP
#undef ATOMIC_FETCH_OPS

所以,这里定义了一组以arch_打头的原子操作函数,且它们都是内联(inline)的。但是最终都是被映射成了__lse_ll_sc_body宏(代码位于arch/arm64/include/asm/lse.h中):

static inline bool system_uses_lse_atomics(void)
{return (static_branch_likely(&arm64_const_caps_ready)) &&static_branch_likely(&cpu_hwcap_keys[ARM64_HAS_LSE_ATOMICS]);
}#define __lse_ll_sc_body(op, ...)					\
({									\system_uses_lse_atomics() ?					\__lse_##op(__VA_ARGS__) :				\__ll_sc_##op(__VA_ARGS__);				\
})

system_uses_lse_atomics函数用来判断当前系统是否支持ARMv8.1及之后才新增的所谓LSE(Large System Extention)指令,其中有很多原子操作指令,方便许多,不用再用LL/SC操作了。但是,如果当前Arm平台不支持的话,只能继续通过老的LL/SC操作来实现原子操作了。

所以,最终arch_atomic_add_return_acquire被映射成内联函数__lse_atomic_add_return_acquire或__ll_sc_atomic_add_return_acquire,arch_atomic_fetch_or_release被映射成内联函数__lse_atomic_fetch_or_release或__ll_sc_atomic_fetch_or_release。

我们先来开老的使用LL/SC操作的实现(代码位于arch/arm64/include/asm/atomic_ll_sc.h):

#define ATOMIC_OPS(...)							\ATOMIC_OP(__VA_ARGS__)						\ATOMIC_OP_RETURN(        , dmb ish,  , l, "memory", __VA_ARGS__)\ATOMIC_OP_RETURN(_relaxed,        ,  ,  ,         , __VA_ARGS__)\ATOMIC_OP_RETURN(_acquire,        , a,  , "memory", __VA_ARGS__)\ATOMIC_OP_RETURN(_release,        ,  , l, "memory", __VA_ARGS__)\ATOMIC_FETCH_OP (        , dmb ish,  , l, "memory", __VA_ARGS__)\ATOMIC_FETCH_OP (_relaxed,        ,  ,  ,         , __VA_ARGS__)\ATOMIC_FETCH_OP (_acquire,        , a,  , "memory", __VA_ARGS__)\ATOMIC_FETCH_OP (_release,        ,  , l, "memory", __VA_ARGS__)ATOMIC_OPS(add, add, I)
ATOMIC_OPS(sub, sub, J)#undef ATOMIC_OPS
#define ATOMIC_OPS(...)							\ATOMIC_OP(__VA_ARGS__)						\ATOMIC_FETCH_OP (        , dmb ish,  , l, "memory", __VA_ARGS__)\ATOMIC_FETCH_OP (_relaxed,        ,  ,  ,         , __VA_ARGS__)\ATOMIC_FETCH_OP (_acquire,        , a,  , "memory", __VA_ARGS__)\ATOMIC_FETCH_OP (_release,        ,  , l, "memory", __VA_ARGS__)ATOMIC_OPS(and, and, K)
ATOMIC_OPS(or, orr, K)
ATOMIC_OPS(xor, eor, K)
ATOMIC_OPS(andnot, bic, )#undef ATOMIC_OPS
#undef ATOMIC_FETCH_OP
#undef ATOMIC_OP_RETURN
#undef ATOMIC_OP

又是一堆宏定义,__ll_sc_atomic_add_return_acquire函数是通过宏ATOMIC_OPS(add, add, I)定义出的一堆关于add原子操作函数其中的一个,也就是接着通过宏ATOMIC_OP_RETURN(_acquire,        , a,  , "memory", __VA_ARGS__)定义出来的函数:

#define ATOMIC_OP_RETURN(name, mb, acq, rel, cl, op, asm_op, constraint)\
static inline int							\
__ll_sc_atomic_##op##_return##name(int i, atomic_t *v)			\
{									\unsigned long tmp;						\int result;							\\asm volatile("// atomic_" #op "_return" #name "\n"		\__LL_SC_FALLBACK(						\
"	prfm	pstl1strm, %2\n"					\
"1:	ld" #acq "xr	%w0, %2\n"					\
"	" #asm_op "	%w0, %w0, %w3\n"				\
"	st" #rel "xr	%w1, %w0, %2\n"					\
"	cbnz	%w1, 1b\n"						\
"	" #mb )								\: "=&r" (result), "=&r" (tmp), "+Q" (v->counter)		\: __stringify(constraint) "r" (i)				\: cl);								\\return result;							\
}

这是一段嵌入C语言中的汇编代码,ATOMIC_OP_RETURN宏的参数name对应于_acquire,参数mb对应的是空,参数acq对应的是a,参数rel对应的是空,参数cl对应的是"memory",参数op对应的是add,参数asm_op对应的也是add,参数constrait对应的是I。

这段汇编代码被包含在了__LL_SC_FALLBACK宏中:

#if IS_ENABLED(CONFIG_ARM64_LSE_ATOMICS) && IS_ENABLED(CONFIG_AS_LSE)
#define __LL_SC_FALLBACK(asm_ops)					\
"	b	3f\n"							\
"	.subsection	1\n"						\
"3:\n"									\
asm_ops "\n"								\
"	b	4f\n"							\
"	.previous\n"							\
"4:\n"
#else
#define __LL_SC_FALLBACK(asm_ops) asm_ops
#endif

如果内核配置了CONFIG_ARM64_LSE_ATOMICS和CONFIG_AS_LSE,则将包含的代码放到当前段中的名字为“1”的一个子段中,在函数的入口处跳入进来,当执行完之后在跳回去;如果内核没有配置的话,则是空的,什么都没做。

这里假设__LL_SC_FALLBACK的定义是空的,那么__ll_sc_atomic_add_return_acquire函数扩展后为:

static inline int
__ll_sc_atomic_add_return_acquire(int i, atomic_t *v)
{unsigned long tmp;int result;asm volatile(
"       // 将v->counter预取到CPU缓存\n"                                   \
"	prfm    pstl1strm, %2\n"					\
"       // 将v->counter独占的读入result中\n"                              \
"1:	ldaxr    %w0, %2\n"					        \
"       // result = result + i\n"                                       \
"	add    %w0, %w0, %w3\n"				                \
"       // 将result的值独占的存入v->counter中\n"                           \
"       // 如果存入时独占标记被清除则将tmp置1\n"                             \
"	stxr	%w1, %w0, %2\n"					        \
"       // 如果tmp被置1则从头再次执行一遍\n"                                \
"	cbnz	%w1, 1b\n"						\)								\: "=&r" (result), "=&r" (tmp), "+Q" (v->counter)		\: __stringify(I) "r" (i)				        \: "memory");					                \\return result;							\
}

ldxr和stxr是ARMv8指令集下的实现LL/SC操作的独占访问指令,而ldaxr多了一个a,表示比ldxr指令多了一个Load-Acquire语义,这也就是__ll_sc_atomic_add_return_acquire函数后面_acquire后缀的由来。操作数中加上w(比如%w0,而不是%0)表示是要操作32位的数,使用wn寄存器,否则在64位下,默认使用xn寄存器。所以,这段函数的原理就是独占读取count的值,然后加上i,最后试着独占存入count。如果发现独占存入失败就重新执行一次上述操作,直到成功为止。

我们接着来看__ll_sc_atomic_fetch_or_release函数,其是通过宏ATOMIC_OPS(or, orr, K)定义出的一堆关于or原子操作函数其中的一个,也就是接着通过宏ATOMIC_FETCH_OP (_release, , , l, "memory", __VA_ARGS__)定义出来的函数:

#define ATOMIC_FETCH_OP(name, mb, acq, rel, cl, op, asm_op, constraint) \
static inline int							\
__ll_sc_atomic_fetch_##op##name(int i, atomic_t *v)			\
{									\unsigned long tmp;						\int val, result;						\\asm volatile("// atomic_fetch_" #op #name "\n"			\__LL_SC_FALLBACK(						\
"	prfm	pstl1strm, %3\n"					\
"1:	ld" #acq "xr	%w0, %3\n"					\
"	" #asm_op "	%w1, %w0, %w4\n"				\
"	st" #rel "xr	%w2, %w1, %3\n"					\
"	cbnz	%w2, 1b\n"						\
"	" #mb )								\: "=&r" (result), "=&r" (val), "=&r" (tmp), "+Q" (v->counter)	\: __stringify(constraint) "r" (i)				\: cl);								\\return result;							\
}

ATOMIC_FETCH_OP宏的参数name对应于_release,参数mb对应的是空,参数acq对应的是空,参数rel对应的是l,参数cl对应的是"memory",参数op对应的是or,参数asm_open对应的是orr,参数constraint对应的是K。同样,将__ll_sc_atomic_fetch_or_release函数扩展后为:

static inline int							\
__ll_sc_atomic_fetch_or_release(int i, atomic_t *v)			\
{									\unsigned long tmp;						\int val, result;						\\asm volatile(			                                \
"       // 将v->counter预取到CPU缓存\n"                                   \
"	prfm	pstl1strm, %3\n"					\
"       // 将v->counter独占的读入result中\n"                              \
"1:	ldxr	%w0, %3\n"					        \
"       // val = result | i\n"                                          \
"	orr	%w1, %w0, %w4\n"				        \
"       // 将val的值独占的存入v->counter中\n"                              \
"       // 如果存入时独占标记被清除则将tmp置1\n"                             \
"	stlxr	%w2, %w1, %3\n"					        \
"       // 如果tmp被置1则从头再次执行一遍\n"                                \
"	cbnz	%w2, 1b\n"						\: "=&r" (result), "=&r" (val), "=&r" (tmp), "+Q" (v->counter)	\: __stringify(K) "r" (i)				        \: "memory");							\\return result;							\
}

这里用到了stlxr指令,比stxr指令多了一个l,表示比stxr指令多了一个Store-Release语义,这也就是__ll_sc_atomic_fetch_or_release函数后面_release后缀的由来。由于要返回计算之前的值,因此还需要一个临时变量val用来存放计算之后的值,而result用来存放原来的值。

如果当前平台支持LSE指令,那就会调用函数__lse_atomic_add_return_acquire和__lse_atomic_fetch_or_release。我们先来看__lse_atomic_add_return_acquire函数(代码位于arch/arm64/include/asm/atomic_lse.h中):

#define ATOMIC_OP_ADD_RETURN(name, mb, cl...)				\
static inline int __lse_atomic_add_return##name(int i, atomic_t *v)	\
{									\u32 tmp;							\\asm volatile(							\__LSE_PREAMBLE							\"	ldadd" #mb "	%w[i], %w[tmp], %[v]\n"			\"	add	%w[i], %w[i], %w[tmp]"				\: [i] "+r" (i), [v] "+Q" (v->counter), [tmp] "=&r" (tmp)	\: "r" (v)							\: cl);								\\return i;							\
}ATOMIC_OP_ADD_RETURN(_relaxed,   )
ATOMIC_OP_ADD_RETURN(_acquire,  a, "memory")
ATOMIC_OP_ADD_RETURN(_release,  l, "memory")
ATOMIC_OP_ADD_RETURN(        , al, "memory")#undef ATOMIC_OP_ADD_RETURN

所以,__lse_atomic_add_return_acquire函数由宏ATOMIC_OP_ADD_RETURN(_acquire,  a, "memory")进行定义。参数name对应于_acquire,参数mb对应域a,参数cl对应于"memory"。宏__LSE_PREAMBLE定义成了:

#define __LSE_PREAMBLE	".arch_extension lse\n"

因此,__lse_atomic_add_return_acquire函数最终被扩展成:

static inline int __lse_atomic_add_return_acquire(int i, atomic_t *v)	\
{									\u32 tmp;							\\asm volatile(							\".arch_extension lse\n"                                         \"// tmp = v->count; v->count = v->count + i;\n"                 \"ldadda	%w[i], %w[tmp], %[v]\n"			                \"// i = i + tmp;\n"                                             \"add	%w[i], %w[i], %w[tmp]"				        \: [i] "+r" (i), [v] "+Q" (v->counter), [tmp] "=&r" (tmp)	\: "r" (v)							\: "memory");							\\return i;							\
}

ldadd本身就是一个原子操作,但是它只会返回加操作之前内存的值,但是函数要返回的是加之后的值,所以后面还要再执行一次加的操作。不过这并不会影响原子性,因为内存中的值真的已经被原子的改变过了。ldadda指令相对于ldadd指令多了一个a,表示比ldadd指令多了一个Load-Acquire语义,这也就是__lse_atomic_add_return_acquire函数后面_acquire后缀的由来。

我们接着看__lse_atomic_fetch_or_release函数

#define ATOMIC_FETCH_OP(name, mb, op, asm_op, cl...)			\
static inline int __lse_atomic_fetch_##op##name(int i, atomic_t *v)	\
{									\asm volatile(							\__LSE_PREAMBLE							\
"	" #asm_op #mb "	%w[i], %w[i], %[v]"				\: [i] "+r" (i), [v] "+Q" (v->counter)				\: "r" (v)							\: cl);								\\return i;							\
}#define ATOMIC_FETCH_OPS(op, asm_op)					\ATOMIC_FETCH_OP(_relaxed,   , op, asm_op)			\ATOMIC_FETCH_OP(_acquire,  a, op, asm_op, "memory")		\ATOMIC_FETCH_OP(_release,  l, op, asm_op, "memory")		\ATOMIC_FETCH_OP(        , al, op, asm_op, "memory")ATOMIC_FETCH_OPS(andnot, ldclr)
ATOMIC_FETCH_OPS(or, ldset)
ATOMIC_FETCH_OPS(xor, ldeor)
ATOMIC_FETCH_OPS(add, ldadd)#undef ATOMIC_FETCH_OP
#undef ATOMIC_FETCH_OPS

所以,__lse_atomic_fetch_or_release函数由ATOMIC_FETCH_OPS(or, ldset)宏和ATOMIC_FETCH_OP(_release,  l, op, asm_op, "memory")宏共同定义。ATOMIC_FETCH_OP宏的name参数对应于_release,mb参数对应于l,op参数对应于or,asm_op参数对应于ldset,cl参数对应于"memory"。扩展过后,__lse_atomic_fetch_or_release被定义为:

static inline int __lse_atomic_fetch_or_release(int i, atomic_t *v)	\
{									\asm volatile(							\".arch_extension lse\n"                                         \"ldsetl	%w[i], %w[i], %[v]"				        \: [i] "+r" (i), [v] "+Q" (v->counter)				\: "r" (v)							\: cl);								\\return i;							\
}

ldset指令本身就可以保证原子的或操作,而且会返回没修改之前的值,因此一条指令就搞定了。ldsetl指令相对于ldset指令多了一个l,表示比ldset指令多了一个Store-Release语义,这也就是__lse_atomic_fetch_or_release函数后面_release后缀的由来。

最后,我们再回过头来看看arch_cmpxchg函数的实现:

#define __cmpxchg_wrapper(sfx, ptr, o, n)				\
({									\__typeof__(*(ptr)) __ret;					\__ret = (__typeof__(*(ptr)))					\__cmpxchg##sfx((ptr), (unsigned long)(o),		\(unsigned long)(n), sizeof(*(ptr)));	\__ret;								\
})
......
#define arch_cmpxchg(...)		__cmpxchg_wrapper( _mb, __VA_ARGS__)

所以,arch_cmpxchg最终被宏定义成了__cmpxchg_wrapper,而它又调用了__cmpxchg_mb函数:

#define __CMPXCHG_GEN(sfx)						\
static __always_inline unsigned long __cmpxchg##sfx(volatile void *ptr,	\unsigned long old,		\unsigned long new,		\int size)			\
{									\switch (size) {							\case 1:								\return __cmpxchg_case##sfx##_8(ptr, old, new);		\case 2:								\return __cmpxchg_case##sfx##_16(ptr, old, new);		\case 4:								\return __cmpxchg_case##sfx##_32(ptr, old, new);		\case 8:								\return __cmpxchg_case##sfx##_64(ptr, old, new);		\default:							\BUILD_BUG();						\}								\\unreachable();							\
}__CMPXCHG_GEN()
__CMPXCHG_GEN(_acq)
__CMPXCHG_GEN(_rel)
__CMPXCHG_GEN(_mb)#undef __CMPXCHG_GEN

__cmpxchg_mb函数是靠宏__CMPXCHG_GEN(_mb)定义的,按照要交换数据的大小,分别调用了__cmpxchg_case_mb_8、__cmpxchg_case_mb_16、__cmpxchg_case_mb_32或__cmpxchg_case_mb_64:

#define __CMPXCHG_CASE(name, sz)			\
static inline u##sz __cmpxchg_case_##name##sz(volatile void *ptr,	\u##sz old,		\u##sz new)		\
{									\return __lse_ll_sc_body(_cmpxchg_case_##name##sz,		\ptr, old, new);				\
}__CMPXCHG_CASE(    ,  8)
__CMPXCHG_CASE(    , 16)
__CMPXCHG_CASE(    , 32)
__CMPXCHG_CASE(    , 64)
__CMPXCHG_CASE(acq_,  8)
__CMPXCHG_CASE(acq_, 16)
__CMPXCHG_CASE(acq_, 32)
__CMPXCHG_CASE(acq_, 64)
__CMPXCHG_CASE(rel_,  8)
__CMPXCHG_CASE(rel_, 16)
__CMPXCHG_CASE(rel_, 32)
__CMPXCHG_CASE(rel_, 64)
__CMPXCHG_CASE(mb_,  8)
__CMPXCHG_CASE(mb_, 16)
__CMPXCHG_CASE(mb_, 32)
__CMPXCHG_CASE(mb_, 64)

最后又回到了宏__lse_ll_sc_body,我们这里只分析一下大小为16的情况。所以,如果当前平台支持LSE,那么将调用__lse_cmpxchg_case_mb_16函数,否则将调用__ll_sc_cmpxchg_case_mb_16函数。

我们先来看LL/SC方式的实现__ll_sc_cmpxchg_case_mb_16函数:

#define __CMPXCHG_CASE(w, sfx, name, sz, mb, acq, rel, cl, constraint)	\
static inline u##sz							\
__ll_sc__cmpxchg_case_##name##sz(volatile void *ptr,			\unsigned long old,		\u##sz new)			\
{									\unsigned long tmp;						\u##sz oldval;							\\if (sz < 32)							\old = (u##sz)old;					\\asm volatile(							\__LL_SC_FALLBACK(						\"	prfm	pstl1strm, %[v]\n"				\"1:	ld" #acq "xr" #sfx "\t%" #w "[oldval], %[v]\n"		\"	eor	%" #w "[tmp], %" #w "[oldval], %" #w "[old]\n"	\"	cbnz	%" #w "[tmp], 2f\n"				\"	st" #rel "xr" #sfx "\t%w[tmp], %" #w "[new], %[v]\n"	\"	cbnz	%w[tmp], 1b\n"					\"	" #mb "\n"						\"2:")								\: [tmp] "=&r" (tmp), [oldval] "=&r" (oldval),			\[v] "+Q" (*(u##sz *)ptr)					\: [old] __stringify(constraint) "r" (old), [new] "r" (new)	\: cl);								\\return oldval;							\
}__CMPXCHG_CASE(w, b,     ,  8,        ,  ,  ,         , K)
__CMPXCHG_CASE(w, h,     , 16,        ,  ,  ,         , K)
__CMPXCHG_CASE(w,  ,     , 32,        ,  ,  ,         , K)
__CMPXCHG_CASE( ,  ,     , 64,        ,  ,  ,         , L)
__CMPXCHG_CASE(w, b, acq_,  8,        , a,  , "memory", K)
__CMPXCHG_CASE(w, h, acq_, 16,        , a,  , "memory", K)
__CMPXCHG_CASE(w,  , acq_, 32,        , a,  , "memory", K)
__CMPXCHG_CASE( ,  , acq_, 64,        , a,  , "memory", L)
__CMPXCHG_CASE(w, b, rel_,  8,        ,  , l, "memory", K)
__CMPXCHG_CASE(w, h, rel_, 16,        ,  , l, "memory", K)
__CMPXCHG_CASE(w,  , rel_, 32,        ,  , l, "memory", K)
__CMPXCHG_CASE( ,  , rel_, 64,        ,  , l, "memory", L)
__CMPXCHG_CASE(w, b,  mb_,  8, dmb ish,  , l, "memory", K)
__CMPXCHG_CASE(w, h,  mb_, 16, dmb ish,  , l, "memory", K)
__CMPXCHG_CASE(w,  ,  mb_, 32, dmb ish,  , l, "memory", K)
__CMPXCHG_CASE( ,  ,  mb_, 64, dmb ish,  , l, "memory", L)#undef __CMPXCHG_CASE

定义__ll_sc_cmpxchg_case_mb_16函数,__CMPXCHG_CASE宏的w参数对应于w,sfx参数对应于空,name参数对应于mb_,sz参数对应于16,mb参数对应于dmb ish,acq参数对应于空,rel参数对应于l,cl参数对应于"memory",constraint参数对应于K。所以,__ll_sc_cmpxchg_case_mb_16函数扩展开来的实现是:

static inline u16							\
__ll_sc__cmpxchg_case_mb_16(volatile void *ptr,			        \unsigned long old,		\u16 new)			\
{									\unsigned long tmp;						\u16 oldval;							\\old = (u16)old;					                \\asm volatile(							\"       // 将*ptr的值预取到CPU缓存\n"                              \"	prfm	pstl1strm, %[v]\n"				\"       // oldval = *ptr;\n"                                    \"1:	ldxr    %w[oldval], %[v]\n"		                \"       // tmp = oldval ^ old;\n"                               \"	eor     %w[tmp], %w[oldval], %w[old]\n"	                \"	cbnz	%w[tmp], 2f\n"				        \"       // *ptr = new;\n"                                       \"	stlxr   %w[tmp], %w[new], %[v]\n"	                \"	cbnz	%w[tmp], 1b\n"					\"	dmb ish\n"						\"2:"								\: [tmp] "=&r" (tmp), [oldval] "=&r" (oldval),			\[v] "+Q" (*(u16 *)ptr)					\: [old] __stringify(K) "r" (old), [new] "r" (new)	        \: "memory");							\\return oldval;							\
}

使用了eor异或指令判断新老值是否相等,如果不等则直接退出,如果相等则尝试独占写入新的数据。这里使用了stlxr指令而不是普通的stxr指令,添加了Store-Release语义,这是为了保证写入内存的数据能立即被系统中其它模块感知到。最后添加了一个作用于内部共享域的数据内存屏障,这也印证了前面提到的命名规则,即RMW的原子操作,如果有返回值,并且没有_relaxed、_acquire和_release后缀,是有内存屏障保护的,可以保证不被重排序。

再来看一下__lse_cmpxchg_case_mb_16函数的实现:

#define __CMPXCHG_CASE(w, sfx, name, sz, mb, cl...)			\
static __always_inline u##sz						\
__lse__cmpxchg_case_##name##sz(volatile void *ptr,			\u##sz old,		\u##sz new)		\
{									\register unsigned long x0 asm ("x0") = (unsigned long)ptr;	\register u##sz x1 asm ("x1") = old;				\register u##sz x2 asm ("x2") = new;				\unsigned long tmp;						\\asm volatile(							\__LSE_PREAMBLE							\"	mov	%" #w "[tmp], %" #w "[old]\n"			\"	cas" #mb #sfx "\t%" #w "[tmp], %" #w "[new], %[v]\n"	\"	mov	%" #w "[ret], %" #w "[tmp]"			\: [ret] "+r" (x0), [v] "+Q" (*(unsigned long *)ptr),		\[tmp] "=&r" (tmp)						\: [old] "r" (x1), [new] "r" (x2)				\: cl);								\\return x0;							\
}__CMPXCHG_CASE(w, b,     ,  8,   )
__CMPXCHG_CASE(w, h,     , 16,   )
__CMPXCHG_CASE(w,  ,     , 32,   )
__CMPXCHG_CASE(x,  ,     , 64,   )
__CMPXCHG_CASE(w, b, acq_,  8,  a, "memory")
__CMPXCHG_CASE(w, h, acq_, 16,  a, "memory")
__CMPXCHG_CASE(w,  , acq_, 32,  a, "memory")
__CMPXCHG_CASE(x,  , acq_, 64,  a, "memory")
__CMPXCHG_CASE(w, b, rel_,  8,  l, "memory")
__CMPXCHG_CASE(w, h, rel_, 16,  l, "memory")
__CMPXCHG_CASE(w,  , rel_, 32,  l, "memory")
__CMPXCHG_CASE(x,  , rel_, 64,  l, "memory")
__CMPXCHG_CASE(w, b,  mb_,  8, al, "memory")
__CMPXCHG_CASE(w, h,  mb_, 16, al, "memory")
__CMPXCHG_CASE(w,  ,  mb_, 32, al, "memory")
__CMPXCHG_CASE(x,  ,  mb_, 64, al, "memory")#undef __CMPXCHG_CASE

定义__lse_cmpxchg_case_mb_16函数,__CMPXCHG_CASE宏的w参数对应于w,sfx参数对应于空,name参数对应于mb_,sz参数对应于16,mb参数对应于al,cl参数对应于"memory"。所以,__ll_sc_cmpxchg_case_mb_16函数扩展开来的实现是:

static __always_inline u16						\
__lse__cmpxchg_case_mb_16(volatile void *ptr,			        \u16 old,		                        \u16 new)		                        \
{									\register unsigned long x0 asm ("x0") = (unsigned long)ptr;	\register u16 x1 asm ("x1") = old;				\register u16 x2 asm ("x2") = new;				\unsigned long tmp;						\\asm volatile(							\"       .arch_extension lse\n"                                  \"       // tmp = old;\n"                                        \"	mov	%w[tmp], %w[old]\n"			        \"	casal   %w[tmp], %w[new], %[v]\n"	                \"       // ret = tmp;\n"                                        \"	mov	%[ret], %w[tmp]"			        \: [ret] "+r" (x0), [v] "+Q" (*(unsigned long *)ptr),		\[tmp] "=&r" (tmp)						\: [old] "r" (x1), [new] "r" (x2)				\: "memory");							\\return x0;							\
}

cas指令是LSE中原生提供的一个比较交换指令,会比较第一个寄存器中的值是否和内存中的值相同,如果相同的话就将第二个寄存器的值写入内存,并且会在第一个寄存器中放入修改内存之前的值。casal指令相对于cas指令多了al,表示比cas指令多了一个Load-Acquire和Store-Release语义,其实就是一个完整的内存屏障,保证不被重排序。