C++ folly库解读(三)Synchronized —— 比std::lock_guard/std::unique_lock更易用、功能更强大的同步机制
目录
- 传统同步方案的缺点
- folly/Synchronized.h 简单使用
- Synchronized
的模板参数 - withLock()/withRLock()/withWLock() —— 更易用的加锁方式
- 升级锁
- ulock()和 withULockPtr()
- Timed Locking
- Synchronized 与 std::condition_variable
- acquireLocked() —— 同时锁多个数据
- 使用一把锁,锁多个数据
- struct
- std::tuple
- Benchmark
folly/Synchronized.h 提供了一种更简单、更不容易出错的同步机制,可以用来替代传统 C++标准库中使用较复杂、较容易出错的同步机制。
传统同步方案的缺点
一般是将需要同步的数据和锁一一配对,即 —— associate mutexes with data, not code :
class RequestHandler {
...
std::mutex requestMutex_;
RequestQueue requestQueue_;
processRequest(const Request& request);
};
void RequestHandler::processRequest(const Request& request) {
std::lock_guard lg(requestMutex_);
requestQueue_.push_back(request);
}
然而,操作这些数据成员,开发人员必须注意,正确的获取锁、获取正确的锁。
一些常见的错误包括:
- 操作数据之前没有获取锁。
- 获取了不配对的锁,这个锁不是用来锁这个数据的。
- 获取了读锁,但是试图去修改数据。
- 获取了写锁,但是对数据只有 const access.
一般在使用时,需要提醒开发人员:“别忘了 xxxx”,那一般都会出错,比如 new 的对象别忘了 delete : )
folly/Synchronized.h 简单使用
上面的代码可以用 folly/Synchronized.h 重写为:
class RequestHandler {
folly::Synchronized requestQueue_;
processRequest(const Request& request);
};
void RequestHandler::processRequest(const Request& request) {
requestQueue_.wlock()->push_back(request);
}
为什么 folly/Synchronized.h 更加有效呢?
- 与传统使用方式不同,这里锁和数据是结合成了一个对象 —— requestQueue_。传统方案中,需要寻找锁和数据的配对关系。
- 几乎不可能在不获取锁的情况下,去操作数据,还是因为它们被封装成了一个对象。传统方案加不加锁全靠自觉。
- 在 push_back 后,锁立即被释放。
如果在临界区有多个操作,那么可以使用如下方法:
{
auto lockedQueue = requestQueue_.wlock();
lockedQueue->push_back(request1);
lockedQueue->push_back(request2);
}
wlock 返回一个 LockedPtr 对象,这个对象可以被理解为指向数据成员的指针。只有这个对象存在,那么锁就会被锁住,所以最好为这个对象显示定义一个 scope.
更好的方式,是使用 lambdas :
void RequestHandler::processRequest(const Request& request) {
requestQueue_.withWLock([&](auto& queue "&") {
// withWLock() automatically holds the lock for the
// duration of this lambda function
queue.push_back(request);
});
}
使用 withWLock 配合 lambdas 强制定义了一个 scope,更清晰。
Synchronized的模板参数
Synchronized 有两个模板参数,数据类型和锁类型:
template
如果不指定第二个模板参数,默认是 folly::SharedMutex。只要被 folly::LockTraits 支持的都可以使用,比如 std::mutex、std::recursive_mutex、std::timed_mutex,。std::recursive_timed_mutex、folly::SharedMutex、folly::RWSpinLock、folly::SpinLock.
根据锁类型的不同,Synchronized 会提供不同的 API:
- 共享锁和升级锁:如果存在 lock_shared()成员函数,Synchronized 会提供 wlock(),rlock(),ulock()三个方法来获取不同的锁类型。其中,rlock()只提供对数据成员 const access.
- 排他锁:lock()
withLock()/withRLock()/withWLock() —— 更易用的加锁方式
withLock()在上面提到过了,可以用来替代 lock()。在持有锁的期间,执行一个 lambda 或者 function. withRLock()/withWLock()同理可以替代 rlock()/wlock().
我们再详细说一下这种方式的好处。下面的函数将 vector 里的所有元素都 double:
auto locked = vec.lock();
for (int& n : *locked) {
n *= 2;
}
使用 lock()/wlock()/rlock()的一个重要注意事项:一个指向数据的指针或者引用,它的生命周期一定不要比 LockedPtr 对象长(lock()/wlock()/rlock()的返回值类型)。 如果我们将上面的例子这样写就会出问题:
// No. NO. NO!
for (int& n : *vec.wlock()) {
n *= 2;
}
vec.wlock()返回的 LockPtr 对象在 range iterators 建立后就销毁了(详细解释见 Range-based for loop Temporary range expression 小节),range iterators 指向了 vector data,但此时锁已经被释放。想想如果要 debug 这种问题,会用多少时间 ??
这时 withLock()/withRLock()/withWLock()的好处就体现出来了,锁会在 for loop 期间一直持有:
vec.withLock([](auto& data "") {
for (int& n : data) {
n *= 2;
}
});
withLock 定义为(withRLock/withWLock 类似):
/**
* Invoke a function while holding the lock.
*
* A reference to the datum will be passed into the function as its only
* argument.
*
* This can be used with a lambda argument for easily defining small critical
* sections in the code. For example:
*
* auto value = obj.withLock([](auto& data "") {
* data.doStuff();
* return data.getValue();
* });
*/
template
auto withLock(Function&& function) {
return function(*lock());
}
template
auto withLock(Function&& function) const {
return function(*lock());
}
升级锁
ulock()和 withULockPtr()
Synchronized 还支持升级锁。升级锁与共享锁可以共存,但是与排它锁互斥。
/**
* An enum to describe the "level" of a mutex. The supported levels are
* Unique - a normal mutex that supports only exclusive locking
* Shared - a shared mutex which has shared locking and unlocking functions;
* Upgrade - a mutex that has all the methods of the two above along with
* support for upgradable locking
*/
enum class MutexLevel { UNIQUE, SHARED, UPGRADE };
升级锁解决的问题是:先对数据进行读操作,然后根据一定的条件会进行写操作。
升级锁可以通过 uclock()或者 withULockPtr()获得:
{
// only const access allowed to the underlying object when an upgrade lock
// is acquired
auto ulock = vec.ulock();
auto newSize = ulock->size();
}
auto newSize = vec.withULockPtr([](auto ulock "") {
// only const access allowed to the underlying object when an upgrade lock
// is acquired
return ulock->size();
});
通过下面的函数可以进行升级或者降级:
- moveFromUpgradeToWrite()
- moveFromWriteToUpgrade()
- moveFromWriteToRead() // withWLockPtr()获得的 wlock 可以调用此函数降级为 rlock
- moveFromUpgradeToRead()
调用这些函数的 LockedPtr 会被设置为 invalid null state,并返回另一个锁住特定锁的 LockedPtr。这些操作都是原子性的,中间不会出现 unlocked 状态。
比如现在有一个 cache,数据结构为 unordered_map,需求是先检查对应的 key 是否在 unordered_map 中,如果在则返回对应的 value,不在则初始化 value 为 0:
folly::Synchronized> cache;
int64_t res = cache.withULockPtr([key,value](auto ulock "key,value") {
int64_t cache_value;
auto iter = ulock->find(key);
if (iter != ulock->end()) {
cache_value = iter->second;
} else {
cache_value = 0;
// ulock is now null
auto wlock = ulock.moveFromUpgradeToWrite();
(*wlock)[key] = cache_value;
}
return cache_value;
});
Timed Locking
如果初始化 Synchronized 的锁类型支持时间,lock()/wlock()/rlock()可以传入一个类型为 std::chrono::duration 的参数:
void fun(Synchronized>& vec) {
{
auto locked = vec.lock(10ms);
if (!locked) {
throw std::runtime_error("failed to acquire lock");
}
locked->push_back("hello");
locked->push_back("world");
}
LOG(INFO) << "successfully added greeting";
}
Synchronized 与 std::condition_variable
如果 Synchronized 的锁类型是 std::mutex,那么可以和 std::condition_variable 配合使用。
Synchronized, std::mutex> vec;
std::condition_variable emptySignal;
// Assuming some other thread will put data on vec and signal
// emptySignal, we can then wait on it as follows:
auto locked = vec.lock();
emptySignal.wait(locked.getUniqueLock(),
[&] { return !locked->empty(); });
getUniqueLock()返回一个 std::unique_lockSynchronizedBenchmark.cpp
下篇文章写一下 Synchronized 的基本实现 :)
参考资料:
- Synchronized.md
(完)
朋友们可以关注下我的公众号,获得最及时的更新: