前言

在 C++ 17 之前的标准中并不存在读写锁的对应封装,因此对于不同平台下需要分别进行实现,当然也可以采取 N2406 - Mutex, Lock, Condition Variable Rationale 的提案中的 shared_mutex 进行实现。这里采用 N2406 提案的方式使用 CAS 进行实现,记录下原子锁与内核管理的性能对比

实现流程

标准库提案

N2406shared_mutex 是通过成员变量 unsigned state_ 记录读者的数量与写者的状态:

unsigned state_;

static const unsigned write_entered_ = 1U << (sizeof(unsigned)*CHAR_BIT - 1);
static const unsigned n_readers_ = ~write_entered_;

这里将 state_ 的最高位作为了写者的标志,剩余 31 位则用于记录读者的数量,也就是说最大的读者数量为 $2^{31}-1$ ,虽然从现在的架构来说基本不会达到这个限制,但不过谁又能想到 IPV4 会耗尽呢?
N2406 中采用的是一个互斥锁+两个条件变量进行实现,这是写优先的实现策略,POSIX 中的实现和提案中的实现基本一致,同样都是避免写者饥饿的情况。因此原子的实现方式也会和提案中一致
原子锁的本质其实一种自旋锁,避免线程陷入内核态从而进行快速响应,因此读写两者加锁的流程其实十分的简单:

---
title: 读者加锁
---
stateDiagram-v2
    direction LR
    ReaderIncreaed: 读者计数增加
    state IsWriterOccupied <>
    [*] --> IsWriterOccupied : 开始自旋
    IsWriterOccupied --> ReaderIncreaed: 未有写者占用
    IsWriterOccupied --> IsWriterOccupied : 存在写者占用
    ReaderIncreaed --> [*] : 结束自旋 成功加锁
---
title: 写者加锁
---
stateDiagram-v2
    direction LR
    SetWriterOccupied: 设置写者占用
    state IsOtherWriterOccupied <>
    state IsReadersEmpty <>
    [*] --> IsOtherWriterOccupied : 开始自旋
    IsOtherWriterOccupied --> IsOtherWriterOccupied : 存在其他写者占用
    IsOtherWriterOccupied --> SetWriterOccupied : 不存在其他写者占用
    SetWriterOccupied --> IsReadersEmpty
    IsReadersEmpty --> IsReadersEmpty : 读者数量不为0
    IsReadersEmpty --> [*] : 读者数量为0 结束自旋

对自旋的优化

由于自旋带来的会是一直占用 CPU 进行消耗,导致利用率过高,因此需要在合适的时候让出线程的控制权,将控制权交予内核让其他等待的线程继续执行
那如何让出控制权呢?
一般来说调用与线程相关的函数就可以,比如 yieldsleep,但两者又有区别,这里大致总结下区别(这里不做过多的分析,毕竟内核中的调度不是短短的一句话能够概括的)

usleep

usleep 的流程如下(方框为内核函数)

---
title: usleep 流程
---
flowchart LR
    usleep(usleep) --> __nanosleep
    __nanosleep(__nanosleep) -->  __clock_nanosleep
    __clock_nanosleep(__clock_nanosleep) --> __clock_nanosleep_time64
    __clock_nanosleep_time64(__clock_nanosleep_time64) --> do_nanosleep

内核中的 do_nanosleep 会创建一个计时器交于对应的时间片管理器进行管理,这样线程就会暂停执行,也就让出了线程的空位交给其他线程继续执行

pthread_yield

pthread_yield 流程如下(方框为内核函数)

---
title: pthread_yield 流程
---
flowchart LR
      pthread_yield(pthread_yield) --> sched_yield(sched_yield)
    sched_yield --> do_sched_yield

内核中的 do_sched_yield 会将当前的 CPU 让步给其他任务,倘若没有其他任务等待则会退出阻塞并继续进行当前任务,但同时 man 手册中也有对应的提示:

NOTES
If the calling thread is the only thread in the highest priority list at that time, it will continue to run after a call to sched_yield().

POSIX systems on which sched_yield() is available define _POSIX_PRIORITY_SCHEDULING in <unistd.h>.

Strategic calls to sched_yield() can improve performance by giving other threads or processes a chance to run when (heavily) contended resources (e.g., mutexes) have been released by the caller. Avoid calling sched_yield() unnecessarily or inappropriately (e.g., when resources needed by other schedulable threads are still held by the caller), since doing so will result in unnecessary context switches, which will degrade system performance.

sched_yield() is intended for use with real-time scheduling policies (i.e., SCHED_FIFO or SCHED_RR). Use of sched_yield() with nondeterministic scheduling policies such as SCHED_OTHER is unspecified and very likely means your application design is broken.

sched_yield 和线程优先级有关,倘若那个时间段内只有当前线程优先级处于最高位,那么调用后不会让步给其他线程进行运行,也就是说明 yield 会让步给同优先等级的线程,同时也提到需要合适的调用,如果频繁进行调用反而会带来性能的下降

增加让出线程控制权逻辑

usleeppthread_yield 结论上可以看出,在通常情况下在自旋锁中让出线程控制权最好的选择是 sleep ,因此对两者的逻辑增加对应的机制:

---
title: 读者加锁
---
stateDiagram-v2
    direction LR
    ReaderIncreaed: 读者计数增加
    state IsWriterOccupied <>
    [*] --> IsWriterOccupied : 开始自旋
    IsWriterOccupied --> SpinLoopSleep : 写者占用
    SpinLoopSleep --> IsWriterOccupied
    IsWriterOccupied --> ReaderIncreaed: 写者未占用
    ReaderIncreaed --> [*] : 结束自旋 成功加锁

    state SpinLoopSleep {
        direction LR
        ThreadSleep : Sleep
        state IsMaxTryCount <>
        [*] --> IsMaxTryCount
        IsMaxTryCount --> ThreadSleep : 到达最大尝试次数
        IsMaxTryCount --> [*] : 未到达尝试最大次数
        ThreadSleep --> [*]
    }
---
title: 写者加锁
---
stateDiagram-v2
    direction LR
    IsWriterOccupied : 写者占用判断
    state IsWriterOccupied {
        
        SetWriterOccupied: 设置写者占用
        state IsOtherWriterOccupied <>

        [*] --> IsOtherWriterOccupied
        IsOtherWriterOccupied --> SetWriterOccupied : 不存在其他写者占用
        IsOtherWriterOccupied --> SpinLoop : 存在其他写者占用
        SpinLoop --> IsOtherWriterOccupied
        SetWriterOccupied --> [*]
        
        SpinLoop : SpinLoopSleep
        state SpinLoop {
            ThreadSleep : Sleep
            state IsMaxTryCount <>
            [*] --> IsMaxTryCount
            IsMaxTryCount --> [*] : 未到达最大尝试次数
            IsMaxTryCount --> ThreadSleep : 到达最大尝试次数
            ThreadSleep --> [*]
        }
    }
    IsReaderOccupied : 读者占用判断
    state IsReaderOccupied {
        
        state IsReadersEmpty <>

        [*] --> IsReadersEmpty
        IsReadersEmpty --> SpinLoop_1 : 读者数量不为0
        SpinLoop_1 --> IsReadersEmpty
        IsReadersEmpty --> [*] : 读者数量为0
        SpinLoop_1 : SpinLoopSleep
        state SpinLoop_1 {
            ThreadSleep_1 : Sleep
            state IsMaxTryCount_1 <>
            [*] --> IsMaxTryCount_1
            IsMaxTryCount_1 --> [*] : 未到达最大尝试次数
            IsMaxTryCount_1 --> ThreadSleep_1 : 到达最大尝试次数
            ThreadSleep_1 --> [*]
        }
    }

    [*] --> IsWriterOccupied : 开始自旋
    IsWriterOccupied --> IsReaderOccupied
    IsReaderOccupied --> [*] : 结束自旋 成功加锁

更进一步的优化

Intel 中的 PAUSE

除了让线程进行休眠,还有什么其他的方法呢?后面阅读 Nginx读写锁自旋锁发现了一个有趣的函数 ngx_cpu_pause(),这个函数在其他平台下为空实现,其中在 x86 以及 amd64 平台均 使用了一个指令 pause

// src/os/unix/ngx_gcc_atomic_amd64.h
#define ngx_cpu_pause() __asm__ ("pause")

// src/os/unix/ngx_gcc_atomic_x86.h
/* old "as" does not support "pause" opcode */
#define ngx_cpu_pause() __asm__ (".byte 0xf3, 0x90")

// src/os/win32/ngx_atomic.h
#define ngx_cpu_pause() __asm { pause }

检索了下 Intel 手册中的描述:

Improves the performance of spin-wait loops. When executing a“spin-wait loop,”processors will suffer a severe performance penalty when exiting the loop because it detects a possible memory order violation. The PAUSE instruction provides a hint to the processor that the code sequence is a spin-wait loop. The processor uses this hint to avoid the memory order violation in most situations, which greatly improves processor performance. For this reason, it is recommended that a PAUSE instruction be placed in all spin-wait loops.
An additional function of the PAUSE instruction is to reduce the power consumed by a processor while executing a spin loop. A processor can execute a spin-wait loop extremely quickly, causing the processor to consume a lot of power while it waits for the resource it is spinning on to become available. Inserting a pause instruction in a spin-wait loop greatly reduces the processor’s power consumption.

这是英特尔专门为自旋提供的优化指令,手册中解释了由于在退出循环时 CPU 会检测到一个可能的内存乱序从而导致严重的性能开销,因此 pause 指令会提示 CPU 目前处于自旋状态从而避免 CPU 进行指令重排而带来的内存乱序问题,在提升性能的同时也可以降低功耗消耗。同时也提到了在早期的处理器型号中并不支持对应的指令,转而其实是等价的 rep;nop 指令,现在的 pause 的指令其实是将两个指令合并成了一个指令
那么其他平台是否也具有类似的指令?

Arm 中的 PAUSE

指令 YIELD

查了下 Arm 的官方手册,在 The YIELD instruction 中有提到:

The YIELD instruction provides a hint that the task performed by a thread is of low importance so that it could yield, see YIELD. This mechanism can be used to improve overall performance in a Symmetric Multithreading (SMT) or Symmetric Multiprocessing (SMP) system.
Examples of when the YIELD instruction might be used include a thread that is sitting in a spin-lock, or where the arbitration priority of the snoop bit in an SMP system is modified. The YIELD instruction permits binary compatibility between SMT and SMP systems.
The YIELD instruction is a NOP hint instruction.
The YIELD instruction has no effect in a single-threaded system, but developers of such systems can use the instruction to flag its intended use for future migration to a multiprocessor or multithreading system. Operating systems can use YIELD in places where a yield hint is wanted, knowing that it will be treated as a NOP if there is no implementation benefit.

同样 Arm 也提供了对应的指令来优化自旋,yield 指令会提示当前任务线程可以降低优先级从而让出执行空间,同时也说明了该指令其实和 nop 指令相同,而且该指令只在多线程与多进程的系统中才能发挥作用

指令 ISB

看上去,似乎选用 yield 就可以了,但是从 ARMprogress64 中找到了不一样的实现:

// src/arch/armv7a.h
static inline void
doze(void)
{
__asm__ volatile("isb" : : : );//isb better than nop
}

// src/arch/aarch64.h
static inline void
doze(void)
{
//Each ISB takes ~30 cycles
__asm__ volatile("isb" : : : );
__asm__ volatile("isb" : : : );
}

发现官方的实现并没有使用 yield 指令,而是使用了 isb 指令,isb 的指令手册上的内容如下:

An ISB instruction ensures that all instructions that come after the ISB instruction in program order are fetched from the cache or memory after the ISB instruction has completed. Using an ISB ensures that the effects of context-changing operations executed before the ISB are visible to the instructions fetched after the ISB instruction. Examples of context-changing operations that require the insertion of an ISB instruction to ensure the effects of the operation are visible to instructions fetched after the ISB instruction are:
• Completed cache and TLB maintenance instructions.
• Changes to System registers.
Any context-changing operations appearing in program order after the ISB instruction take effect only after the ISB has been executed.

isb 指令全名叫做 Instruction Synchronization Barrier(指令同步屏障),它的作用是确保在程序顺序中位于 isb 指令之后的所有指令在 isb 指令完成之后才从缓存或内存中获取,同时也保证指令前后的上下文的可见性,对于 CPU 来说相当于刷新了流水线。官方只是解释了指令的作用并没有说明这样使用的原因,后面在 stackflow 上找到了关于 isb 的提问 assembly - Why does hint::spin_loop use ISB on aarch64?,根据回答中的 Rust 提交 [Arm64] use isb instruction instead of yield in spin loops · rust-lang/rust@c064b65 找到了对应的解释与说明:

[Arm64] use isb instruction instead of yield in spin loops
On arm64 we have seen on several databases that ISB (instruction synchronization
barrier) is better to use than yield in a spin loop. The yield instruction is a
nop. The isb instruction puts the processor to sleep for some short time. isb
is a good equivalent to the pause instruction on x86.

...

$ cat a.cc
static void BM_scalar_increment(benchmark::State& state) {
int i = 0;
for (auto _ : state)
benchmark::DoNotOptimize(i++);
}
BENCHMARK(BM_scalar_increment);
static void BM_yield(benchmark::State& state) {
for (auto _ : state)
asm volatile("yield"::);
}
BENCHMARK(BM_yield);
static void BM_isb(benchmark::State& state) {
for (auto _ : state)
asm volatile("isb"::);
}
BENCHMARK(BM_isb);
BENCHMARK_MAIN();

$ g++ -o run a.cc -O2 -lbenchmark -lpthread
$ ./run

--------------------------------------------------------------
Benchmark Time CPU Iterations
--------------------------------------------------------------

AWS Graviton2 (Neoverse-N1) processor:
BM_scalar_increment 0.485 ns 0.485 ns 1000000000
BM_yield 0.400 ns 0.400 ns 1000000000
BM_isb 13.2 ns 13.2 ns 52993304

AWS Graviton (A-72) processor:
BM_scalar_increment 0.897 ns 0.874 ns 801558633
BM_yield 0.877 ns 0.875 ns 800002377
BM_isb 13.0 ns 12.7 ns 55169412

Apple Arm64 M1 processor:
BM_scalar_increment 0.315 ns 0.315 ns 1000000000
BM_yield 0.313 ns 0.313 ns 1000000000
BM_isb 9.06 ns 9.06 ns 77259282

static void BM_pause(benchmark::State& state) {
for (auto _ : state)
asm volatile("pause"::);
}
BENCHMARK(BM_pause);

Intel Skylake processor:
BM_scalar_increment 0.295 ns 0.295 ns 1000000000
BM_pause 41.7 ns 41.7 ns 16780553

从提交来看确实使用 isb 起到了更好的效果,当然对于实际效果需要进一步验证

最终的逻辑

根据上面的几点,不难完善最终的逻辑:

---
title: 读者加锁
---
stateDiagram-v2
    direction LR
    ReaderIncreaed: 读者计数增加
    state IsWriterOccupied <>
    [*] --> IsWriterOccupied : 开始自旋
    IsWriterOccupied --> SpinLoopSleep : 写者占用
    SpinLoopSleep --> IsWriterOccupied
    IsWriterOccupied --> ReaderIncreaed: 写者未占用
    ReaderIncreaed --> [*] : 结束自旋 成功加锁

    state SpinLoopSleep {
        direction LR
        ThreadSleep : Sleep
        ThreadDoze : Doze
        state IsMaxTryCount <>
        
        [*] --> IsMaxTryCount
        IsMaxTryCount --> ThreadSleep : 到达最大尝试次数
        IsMaxTryCount --> ThreadDoze : 未到达尝试最大次数
        ThreadDoze --> [*]
        ThreadSleep --> [*]
    }
---
title: 写者加锁
---
stateDiagram-v2
    direction LR
    IsWriterOccupied : 写者占用判断
    state IsWriterOccupied {
        SetWriterOccupied: 设置写者占用
        state IsOtherWriterOccupied <>

        [*] --> IsOtherWriterOccupied
        IsOtherWriterOccupied --> SetWriterOccupied : 不存在其他写者占用
        IsOtherWriterOccupied --> SpinLoop : 存在其他写者占用
        SpinLoop --> IsOtherWriterOccupied
        SetWriterOccupied --> [*]
        
        SpinLoop : SpinLoopSleep
        state SpinLoop {
            ThreadSleep : Sleep
            ThreadDoze : Doze
            state IsMaxTryCount <>
            [*] --> IsMaxTryCount
            IsMaxTryCount --> ThreadDoze : 未到达最大尝试次数
            IsMaxTryCount --> ThreadSleep : 到达最大尝试次数
            ThreadDoze --> [*]
            ThreadSleep --> [*]
        }
    }
    IsReaderOccupied : 读者占用判断
    state IsReaderOccupied {
        state IsReadersEmpty <>

        [*] --> IsReadersEmpty
        IsReadersEmpty --> SpinLoop_1 : 读者数量不为0
        SpinLoop_1 --> IsReadersEmpty
        IsReadersEmpty --> [*] : 读者数量为0
        SpinLoop_1 : SpinLoopSleep
        state SpinLoop_1 {
            ThreadSleep_1 : Sleep
            ThreadDoze_1 : Doze
            state IsMaxTryCount_1 <>
            [*] --> IsMaxTryCount_1
            IsMaxTryCount_1 --> ThreadDoze_1 : 未到达最大尝试次数
            IsMaxTryCount_1 --> ThreadSleep_1 : 到达最大尝试次数
            ThreadDoze_1 --> [*]
            ThreadSleep_1 --> [*]
        }
    }

    [*] --> IsWriterOccupied : 开始自旋
    IsWriterOccupied --> IsReaderOccupied
    IsReaderOccupied --> [*] : 结束自旋 成功加锁