前置理论:原子操作

设计

从前文出发,读写锁所需要只需要一个 uint32_t 类型的成员变量来记录写者的状态与读者数量,但由于是原子版本,所以记录状态的成员变量将会用 std::atomic_uint32_t 进行实现,同时运用编译期得到写者占用的标志静态数据便于后续的运算:

classDiagram
    class AtomicSharedMutex {
        -std::atomic_uint32_t state_
        -uint32_t writer_enterd_*
    }

接口

接口命名从标准库的 shared_mutex 中借鉴:

classDiagram
    class AtomicSharedMutex {
        +LockShared()
        +TryLockShared() bool
        +UnlockShared()
        +Lock()
        +TryLock() bool
        +Unlock()
        -std::atomic_uint32_t state_
        -uint32_t writer_enterd_*
    }

接口对应的作用:

  • LockShared():读者加锁,如果出现写者占用会进行阻塞等待
  • TryLockShared():尝试进行读者加锁,返回加锁是否成功
  • UnlockShared():读者解锁
  • Lock():写者加锁,如果出现其他写者占用以及读者占用时将会阻塞等待
  • TryLock():尝试进行写者加锁,返回加锁是否成功
  • Unlock():写者解锁

毕竟使用的是 C++ ,那么实现对应的锁管理器也是所需要的,取名同样源自标准库:

classDiagram
    class AtomicSharedMutex {
        +LockShared()
        +TryLockShared() bool
        +UnlockShared()
        +Lock()
        +TryLock() bool
        +Unlock()
        -std::atomic_uint32_t state_
        -uint32_t writer_enterd_*
    }

    class SharedLock {
        +SharedLock(AtomicSharedMutex&)
        +~SharedLock()
        -AtomicSharedMutex* lock_ 
    }

    class UniqueLock {
        +UniqueLock(AtomicSharedMutex&)
        -~UniqueLock()
        -AtomicSharedMutex* lock_ 
    }

    AtomicSharedMutex --o SharedLock
    AtomicSharedMutex --o UniqueLock

class SharedLock 用于读者加锁,class UniqueLock 用于写者加锁

实现

根据上面的设计思路,AtomicSharedMutex 实现基础定义如下:

/**
* 自旋最大尝试值
*/
#define ATOMIC_SHARED_MUTEX_TRY_COUNT_MAX 5

class AtomicSharedMutex {
/**
* 写者占用标志
*/
static constexpr uint32_t writer_enterd_ = 1U << (sizeof(uint32_t) * __CHAR_BIT__ - 1);
public:
AtomicSharedMutex() = default;
AtomicSharedMutex(const AtomicSharedMutex&) = delete;
~AtomicSharedMutex() = default;

/**
* 读者加锁
*/
void LockShared();
/**
* 尝试进行读者加锁, 如果出现写者占用会阻塞执行
* @return 返回是否成功加锁
*/
bool TryLockShared();
/**
* 读者解锁
*/
void UnlockShared();
/**
* 写者加锁
*/
void Lock();
/**
* 尝试进行写者加锁, 如果出现其他写者占用以及读者占用时将会阻塞执行
* @return 返回是否成功加锁
*/
bool TryLock();
/**
* 写者解锁
*/
void Unlock();

AtomicSharedMutex& operator=(const AtomicSharedMutex&) = delete;
private:
/**
* 保存写者状态与读者数量
* | unsinged int |
* | occupied flag | reader count |
* | 1 | 000..(31)..000 |
*/
std::atomic_uint32_t state_{0};
};

/**
* 用于读者加锁
*/
class SharedLock {
public:
SharedLock(AtomicSharedMutex& lock);
~SharedLock();
private:
AtomicSharedMutex* lock_;
};

/**
* 用于写者加锁
*/
class UniqueLock {
public:
UniqueLock(AtomicSharedMutex& lock);
~UniqueLock();
private:
AtomicSharedMutex* lock_;
};

这里将最大尝试次数(ATOMIC_SHARED_MUTEX_TRY_COUNT_MAX)暂定为 5 次,需要根据平台进行合适的调整

实现 SpinLoopSleep

从前文的 SpinLoopSleep 逻辑对不同平台进行对应的实现:

---
title: SpinLoopSleep
---
stateDiagram-v2
    direction LR
    ThreadSleep : Sleep
    ThreadDoze : Doze
    state IsMaxTryCount <>
    [*] --> IsMaxTryCount
    IsMaxTryCount --> ThreadSleep : 到达最大尝试次数
    IsMaxTryCount --> ThreadDoze : 未到达尝试最大次数
    ThreadDoze --> [*]
    ThreadSleep --> [*]

这里将 SpinLoopSleep 作为静态私有函数实现,将 Doze 功能分开作为一个独立的函数,便于后续适配不同的平台以及可以用于其他自旋场景中,因此 AtomicSharedMutex 需要做出一定的修改:

classDiagram
    class AtomicSharedMutex {
        +LockShared()
        +TryLockShared() bool
        +UnlockShared()
        +Lock()
        +TryLock() bool
        +Unlock()
        -SpinLoopSleep(int32_t& try_count)*
        -std::atomic_uint32_t state_
        -uint32_t writer_enterd_*
    }

对应实现的代码如下:

/**
* 自旋最大尝试值
*/
#define ATOMIC_SHARED_MUTEX_TRY_COUNT_MAX 5

/**
* 用于自旋优化
*/
inline static void SpinLoop() {
#if defined(__x86_64__)
__asm__ volatile("pause":::"memory");
#elif defined(__arm__) || defined(__aarch64__)
__asm__ volatile("isb":::"memory");
#else
__asm__ volatile("nop":::"memory");
#endif
}

...

/**
* 用于到达一定次数后自旋优化的实现
* @param try_count 尝试次数
*/
inline static void SpinLoopSleep(int32_t& try_count) {
if (try_count >= ATOMIC_SHARED_MUTEX_TRY_COUNT_MAX) {
static struct timespec ts { 0, 1000 };
nanosleep(&ts, nullptr);
try_count = 0;
} else {
SpinLoop();
}
}

...

实现读者接口

从读者加锁的流程出发实现对应的三个接口:

---
title: 读者加锁
---
stateDiagram-v2
    direction LR
    ReaderIncreaed: 读者计数增加
    state IsWriterOccupied <>
    [*] --> IsWriterOccupied : 开始自旋
    IsWriterOccupied --> SpinLoopSleep : 写者占用
    SpinLoopSleep --> IsWriterOccupied
    IsWriterOccupied --> ReaderIncreaed: 写者未占用
    ReaderIncreaed --> [*] : 结束自旋 成功加锁

代码实现:

  void LockShared() {
uint32_t expected, desired;
int32_t try_count{};

for (;;) {
expected = state_.load(std::memory_order_acquire) & ~writer_enterd_;
desired = expected + 1;

if (
#if defined(__x86_64__)
state_.compare_exchange_strong(expected, desired, std::memory_order_release)
#else
state_.compare_exchange_weak(expected, desired, std::memory_order_release)
#endif
) {
break;
}
SpinLoopSleep(try_count);
}
}
/**
* 尝试进行读者加锁, 如果出现写者占用会阻塞执行
* @return 返回是否成功加锁
*/
bool TryLockShared() {
uint32_t expected, desired;

expected = state_.load(std::memory_order_acquire) & ~writer_enterd_;
desired = expected + 1;

#if defined(__x86_64__) || defined(_M_X64)
return state_.compare_exchange_strong(expected, desired, std::memory_order_release);
#else
return state_.compare_exchange_weak(expected, desired, std::memory_order_release);
#endif
}
/**
* 读者解锁
*/
void UnlockShared() {
state_.fetch_sub(1);
}

实现写者接口

同样写者也根据之前的流程进行对应实现写者的加解锁:

---
title: 写者加锁
---
stateDiagram-v2
    direction LR
    IsWriterOccupied : 写者占用判断
    state IsWriterOccupied {
        direction LR
        SetWriterOccupied: 设置写者占用
        state IsOtherWriterOccupied <>

        [*] --> IsOtherWriterOccupied
        IsOtherWriterOccupied --> SetWriterOccupied : 不存在其他写者占用
        IsOtherWriterOccupied --> SpinLoop : 存在其他写者占用
        SpinLoop --> IsOtherWriterOccupied
        SetWriterOccupied --> [*]
        
        SpinLoop : SpinLoopSleep
    }
    IsReaderOccupied : 读者占用判断
    state IsReaderOccupied {
        direction LR
        state IsReadersEmpty <>

        [*] --> IsReadersEmpty
        IsReadersEmpty --> SpinLoop_1 : 读者数量不为0
        SpinLoop_1 --> IsReadersEmpty
        IsReadersEmpty --> [*] : 读者数量为0
        SpinLoop_1 : SpinLoopSleep
    }

    [*] --> IsWriterOccupied : 开始自旋
    IsWriterOccupied --> IsReaderOccupied
    IsReaderOccupied --> [*] : 结束自旋 成功加锁

代码实现:

  /**
* 写者加锁
*/
void Lock() {
uint32_t expected, desired;
int32_t try_count{};

for(;;) {
expected = state_.load(std::memory_order_acquire) & writer_enterd_;
desired = expected | writer_enterd_;

if (
#if defined(__x86_64__)
state_.compare_exchange_strong(expected, desired, std::memory_order_release)
#else
state_.compare_exchange_weak(expected, desired, std::memory_order_release)
#endif
) {
try_count = 0;

for (;;) {
expected = state_.load(std::memory_order_acquire) & writer_enterd_;
if (
#if defined(__x86_64__)
state_.compare_exchange_strong(expected, expected, std::memory_order_release)
#else
state_.compare_exchange_weak(expected, expected, std::memory_order_release)
#endif
) {
return;
}
SpinLoopSleep(try_count);
}
}
SpinLoopSleep(try_count);
}
}
/**
* 尝试进行写者加锁, 如果出现其他写者占用以及读者占用时将会阻塞执行
* @return 返回是否成功加锁
*/
bool TryLock() {
uint32_t expected, desired;

expected = state_.load(std::memory_order_acquire) & writer_enterd_;
desired = expected | writer_enterd_;

if (
#if defined(__x86_64__)
state_.compare_exchange_strong(expected, desired, std::memory_order_release)
#else
state_.compare_exchange_weak(expected, desired, std::memory_order_release)
#endif
) {
expected = state_.load(std::memory_order_acquire) & writer_enterd_;

if (
#if defined(__x86_64__)
state_.compare_exchange_strong(expected, expected, std::memory_order_release)
#else
state_.compare_exchange_weak(expected, expected, std::memory_order_release)
#endif
) {
return true;
}
}
return false;
}
/**
* 写者解锁
*/
void Unlock() {
state_.fetch_and(~writer_enterd_);
}

实现锁管理器

锁管理器这里只是做简单的封装简单实现,所以实现并不复杂:

/**
* 用于读者加锁
*/
class SharedLock {
public:
SharedLock(AtomicSharedMutex& lock)
: lock_(std::addressof(lock)) {
lock_->LockShared();
}
~SharedLock() {
lock_->UnlockShared();
}
private:
AtomicSharedMutex* lock_;
};

/**
* 用于写者加锁
*/
class UniqueLock {
public:
UniqueLock(AtomicSharedMutex& lock)
: lock_(std::addressof(lock)) {
lock_->Lock();
}
~UniqueLock() {
lock_->Unlock();
}
private:
AtomicSharedMutex* lock_;
};

源码

至此所有逻辑实现完成,源码与测试路径:
实现:tools/include/rwlock.hpp at main · zyxeeker/tools · GitHub
测试:tools/tests/test_rwlock.cpp at main · zyxeeker/tools · GitHub