c++ condition_variable 使用

学习来源主要来自于 基本功 | 一文讲清多线程和多线程同步

去年的时候写过一个服务的 cache 模块,启一个线程定期从数据库中拉全量的数据存到本地的 rocksdb 中,并在服务退出的时候,实现线程的优雅退出,其中使用到了 condition_variable ,代码样例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
struct MongoCacheConfig {
int64_t refresh_interval = 1800; // 30 min
std::string db_name = "aladdin";
std::string collection_name = "ping_dispatch";
};

class MongoCache {
MongoCache(
rocksdb::DBWithTTL* _rocksdb, mongocxx::pool* _pool, MongoCacheConfig& config)
: _rocksdb(_rocksdb), _pool(_pool), _config(config) {};

~MongoCache() {
// tell sync data thread to quit and wait
_stop.store(true);
_cv.notify_all();
if (_loop_thread.joinable()) {
_loop_thread.join();
}
}

base::Status init() {
// sync load cache first
base::Status s = _refresh_collection_cache();
if (!s.ok()) {
return s;
}

// start sync data thread
_stop.store(false);
_loop_thread = std::thread(std::bind(&MongoCache::_refresh_collection_cache_loop, this));

return base::Status::OK();
};
// 只从本地缓存中读取,不访问远程服务
base::Status get_downstream(const std::string& id, std::string& value);

private:
// 从 mongo 中获取全量数据
base::Status _refresh_collection_cache();

// 循环从 mongo 中拉取全量数据
void _refresh_collection_cache_loop() {
std::unique_lock<std::mutex> lck(_cv_mutex);
while (!_stop.load()) {
_refresh_collection_cache();
_cv.wait_for(lck,
std::chrono::seconds(_config.refresh_interval),
[&]{return _stop.load();});
}
}

// 使用 rocksdb 作为本地缓存
rocksdb::DBWithTTL* _rocksdb = nullptr;
mongocxx::pool* _pool = nullptr;
MongoCacheConfig _config;

// 线程控制
std::condition_variable _cv;
std::mutex _cv_mutex;
std::thread _loop_thread;
std::atomic<bool> _stop;
};

当时没太理解 condition_variable 的用法,是仿照项目代码中其它用到 condition_variable 的代码写的。今天又捋了一遍,大概搞明白了它的用法。单把调用的地方拎出来:

1
2
3
4
5
6
7
8
9
void _refresh_collection_cache_loop() {
std::unique_lock<std::mutex> lck(_cv_mutex);
while (!_stop.load()) {
_refresh_collection_cache();
_cv.wait_for(lck,
std::chrono::seconds(_config.refresh_interval),
[&]{return _stop.load();});
}
}

首先,先加个锁 lck,锁定 _stop 的状态,然后读取它的值,如果符合条件,则进入到循环中,执行完 _refresh_collection_cache 函数后,将 lck 传给 condition_variable ,它给先把这个锁 unlock 了,随后挂起这个线程,等待 notify_all 的调用。可以配置超时时间,如果超时了,则解除挂起,再继续执行 while 循环。

下面分析一下对象析构函数被调用时,同步数据线程的执行情况

  1. 如果是在执行 _refresh_collection_cache 时,则在执行完毕后,wait_for 的 predicate 满足条件,不阻塞,再执行 while 语句,退出

  2. 如果是在 wait_for 时,则由析构函数中的 notify_all 解除了 wait_for 的挂起,再执行 while 语句,退出

咨询 gpt 的时候,它给的样例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void worker_thread()
{
// 等待主线程发送数据
std::unique_lock<std::mutex> lck(mtx);
while (!ready) cv.wait(lck);
// 接收到数据,开始处理
std::cout << "Worker thread is processing data\n";
// 数据处理完成
ready = false;
std::cout << "Worker thread has processed data\n";
// 通知主线程数据处理完毕
cv.notify_one();
}

int main()
{
std::thread worker(worker_thread);
// 发送数据到工作线程
{
std::lock_guard<std::mutex> lck(mtx);
ready = true;
std::cout << "main thread is sending data\n";
}
cv.notify_one();
// 等待工作线程处理数据
{
std::unique_lock<std::mutex> lck(mtx);
while (ready) cv.wait(lck);
}
std::cout << "Back in main thread\n";
worker.join();
return 0;
}

上述代码中 worker_thread 函数的执行流程是,首先上锁(std::unique_lock<std::mutex> lck(mtx)),当 ready 为 false 时,执行 cv.wait(lck) 释放锁,并阻塞住,直到 main 函数中调用 cv.notify_one() 后,执行后续的逻辑。至于为什么不能将 while (!ready) 改成 if (!ready) ,gpt 给出的解释是,可能存在假唤醒(spurious wakeup)的情况,这里就不深究了。

golang 中也有类似的接口 sync.Cond ,样例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"sync"
"time"
)

var (
ready = false
)

func worker(cond *sync.Cond) {
cond.L.Lock()
for !ready {
cond.Wait()
}
cond.L.Unlock()

fmt.Println("Worker: 已收到通知,开始工作...")
// do some work
}

func main() {
cond := sync.NewCond(&sync.Mutex{})
go worker(cond)

time.Sleep(1 * time.Second)

fmt.Println("Main: 准备发出通知...")
cond.L.Lock()
ready = true
cond.L.Unlock()
cond.Signal()

time.Sleep(1 * time.Second)
fmt.Println("Main: 工作已完成")
}

// Main: 准备发出通知...
// Worker: 已收到通知,开始工作...
// Main: 工作已完成