基于锁的并发数据结构 线程安全栈——使用锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <exception> struct empty_stack : std::exception { const char * what () const throw () ; }; template <typename T>class threadsafe_stack { private : std::stack<T> data; mutable std::mutex m; public : threadsafe_stack () {} threadsafe_stack (const threadsafe_stack& other) { std::lock_guard<std::mutex> lock (other.m) ; data = other.data; } threadsafe_stack& operator =(const threadsafe_stack&) = delete ; void push (T new_value) { std::lock_guard<std::mutex> lock (m) ; data.push (std::move (new_value)); } std::shared_ptr<T> pop () { std::lock_guard<std::mutex> lock (m) ; if (data.empty ()) { throw empty_stack (); } std::shared_ptr<T> const res ( std::make_shared<T>(std::move(data.top()))) ; data.pop (); return res; } void pop (T& value) { std::lock_guard<std::mutex> lock (m) ; if (data.empty ()) throw empty_stack (); value = std::move (data.top ()); data.pop (); } bool empty () const { std::lock_guard<std::mutex> lock (m) ; return data.empty (); } };
互斥量 m 可保证线程安全,对每个成员函数进行加锁保护。保证在同一时间内,只有一个线程可以访问到数据
虽然 empty() 和 pop() 之间会存在竞争,但是代码会在 pop() 上锁时,显式的查询栈是否为空,所以不是恶性竞争
pop() 具备弹出值和返回弹出值的功能,避免 std::stack<> 中 top() 和 pop() 之间的竞争
类中也有一些异常源
上锁操作是每个成员函数所做的第一个操作,所以对互斥量上锁可能会抛出异常,因无数据修改,所以安全
data.push() 的调用 1 可能会抛出一个异常,原因是拷贝/移动数据或者内存不足,但是不管哪种情况,std::stack<> 都能保证其安全性,所以也没有问题
第一个重载 pop() 中,代码可能会抛出 empty_stack 异常 2,数据没有修改,是安全的;创建 res 3 时,也可能会抛出异常,两个原因:std::make_shared 无法分配出足够的内存去创建新对象,并且内部数据需要引用新对象,或者在拷贝或移动构造到新分配的内存中时抛出异常。两种情况下,C++ 运行时库和标准库能确保不会出现内存泄露,并且新创建的对象(如果有的话)都能正确销毁;当调用data.pop() 4 时能保证不抛出异常并返回结果(这个成员函数保证不会抛出异常),所以这个重载的 pop() 是“异常安全”的
第二个重载 pop() 除了在拷贝赋值或移动赋值时会抛出异常 5,当构造新对象和 std::shared_ptr 实例时都不会抛出异常。同样,调用data.pop() 6 之前,没有对数据结构进行修改,所以这个函数也是“异常安全”的
empty() 不会修改任何数据,所以也是“异常安全”函数
所有成员函数都使用 std::lock_guard<> 保护数据,所以栈成员函数才是“线程安全”的。当然,构造与析构函数不是“线程安全”的,但构造与析构只有一次。调用不完全构造对象或是已销毁对象的成员函数,无论在哪种编程方式下都不可取。所以,用户就要保证在栈对象完成构建前,其他线程无法对其进行访问。并且,要保证在栈对象销毁后,停止所有线程的访问操作
线程安全队列——使用锁和条件变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 template <typename T>class threadsafe_queue { private : mutable std::mutex mut; std::queue<T> data_queue; std::condition_variable data_cond; public : threadsafe_queue () {} void push (T data) { std::lock_guard<std::mutex> lk (mut) ; data_queue.push (std::move (data)); data_cond.notify_one (); } void wait_and_pop (T& value) { std::unique_lock<std::mutex> lk (mut) ; data_cond.wait (lk, [this ] { return !data_queue.empty (); }); value = std::move (data_queue.front ()); data_queue.pop (); } std::shared_ptr<T> wait_and_pop () { std::unique_lock<std::mutex> lk (mut) ; data_cond.wait (lk, [this ] { return !data_queue.empty (); }); std::shared_ptr<T> res (std::make_shared<T>(std::move(data_queue.front()))) ; data_queue.pop (); return res; } bool try_pop (T& value) { std::lock_guard<std::mutex> lk (mut) ; if (data_queue.empty ()) return false ; value = std::move (data_queue.front ()); data_queue.pop (); return true ; } std::shared_ptr<T> try_pop () { std::lock_guard<std::mutex> lk (mut) ; if (data_queue.empty ()) return std::shared_ptr <T>(); std::shared_ptr<T> res (std::make_shared<T>(std::move(data_queue.front()))) ; data_queue.pop (); return res; } bool empty () const { std::lock_guard<std::mutex> lk (mut) ; return data_queue.empty (); } };
和前面的代码相比,异常安全会有一些变化。不止一个线程等待对队列进行推送操作时,只会有一个线程因 data_cond.notify_one() 而继续工作。但是,如果工作线程在 wait_and_pop() 中抛出一个异常,例如,构造新的 std::shared_ptr<> 对象时 4 抛出异常,那么其他线程则会永远休眠。对于这种情况,有两种解决方案:
调用函数需要改成 data_cond.notify_all(),这个函数将唤醒所有的工作线程,不过当大多线程发现队列依旧是空时,又会耗费资源让线程重新进入睡眠
当有异常抛出时,让 wait_and_pop() 函数调用 notify_one(),从而让个另一个线程去索引存储的值
将 std::shared_ptr<> 的初始化过程移到 push() 中,并且存储 std::shared_ptr<> 实例,而不是直接使用数据值,将 std::shared_ptr<> 拷贝到内部 std::queue<> 中就不会抛出异常了,这样wait_and_pop() 又是安全的了
下面的代码,就是根据第三种方案修改的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 template <typename T>class threadsafe_queue { private : mutable std::mutex mut; std::queue<std::shared_ptr<T>> data_queue; std::condition_variable data_cond; public : threadsafe_queue () {} void wait_and_pop (T& value) { std::unique_lock<std::mutex> lk (mut) ; data_cond.wait (lk, [this ] { return !data_queue.empty (); }); value = std::move (*data_queue.front ()); data_queue.pop (); } bool try_pop (T& value) { std::lock_guard<std::mutex> lk (mut) ; if (data_queue.empty ()) return false ; value = std::move (*data_queue.front ()); data_queue.pop (); return true ; } std::shared_ptr<T> wait_and_pop () { std::unique_lock<std::mutex> lk (mut) ; data_cond.wait (lk, [this ] { return !data_queue.empty (); }); std::shared_ptr<T> res = data_queue.front (); data_queue.pop (); return res; } std::shared_ptr<T> try_pop () { std::lock_guard<std::mutex> lk (mut) ; if (data_queue.empty ()) return std::shared_ptr <T>(); std::shared_ptr<T> res = data_queue.front (); data_queue.pop (); return res; } void push (T new_value) { std::shared_ptr<T> data (std::make_shared<T>(std::move(new_value))) ; std::lock_guard<std::mutex> lk (mut) ; data_queue.push (data); data_cond.notify_one (); } bool empty () const { std::lock_guard<std::mutex> lk (mut) ; return data_queue.empty (); } };
线程安全队列——使用细粒度锁和条件变量 最简单的队列就是单链表了,如下图所示。
从队列中删除数据,其实就是将头指针指向下一个元素,并将之前头指针指向的值进行返回。向队列中添加元素是要从队尾进行的。为了做到这点,队列里还有一个尾指针,其指向链表中的最后一个元素。新节点的加入将会改变尾指针的 next 指针,之前队列的最后一个元素的 next 将会指向新添加进来的元素,新的尾指针的 next 也会指向新添加进来的元素。当链表为空时,头/尾指针皆为 nullptr。
下面的代码是一个简单队列的实现,这个队列仅供单线程使用,所以实现中只有一个 try_pop() 函数,没有 wait_and_pop() 函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 template <typename T>class queue { private : struct node { T data; std::unique_ptr<node> next; node (T data_) : data (std::move (data_)) {} }; std::unique_ptr<node> head; node* tail; public : queue () {} queue (const queue& other) = delete ; queue& operator =(const queue& other) = delete ; std::shared_ptr<T> try_pop () { if (!head) { return std::shared_ptr <T>(); } std::shared_ptr<T> const res (std::make_shared<T>(std::move(head->data))) ; std::unique_ptr<node> const old_head = std::move (head); head = std::move (old_head->next); return res; } void push (T new_value) { std::unique_ptr<node> p (new node(std::move(new_value))) ; node* const new_tail = p.get (); if (tail) { tail->next = std::move (p); } else { head = std::move (p); } tail = new_tail; } };
虽然,这种实现对于单线程来说没什么问题,但当在多线程下尝试使用细粒度锁时,就会出现问题:
在给定的实现中有两个数据项 head 和 tail,即便是使用两个互斥量来保护头指针和尾指针,也会出现问题:push() 可以同时修改头指针和尾指针,所以 push() 函数会同时获取两个互斥量
push() 和 pop() 都能访问 next 指针指向的节点:push() 可更新 tail->next,随后 try_pop() 读取 head->next。当队列中只有一个元素时,head==tail,所以此时 head->next 和 tail->next 是同一个对象,在这种情况下,push() 和 try_pop() 锁住的是同一个锁
通过分离数据实现并发 可以使用“预分配无数据虚拟节点,确保这个节点永远在队列的最后,用来分离头尾指针能访问的节点”的办法。对于一个空队列来说,head 和 tail 都属于虚拟指针,而非空指针。当添加一个节点入队列时,head 和 tail 现在指向不同的节点,所以就不会在 head->next 和 tail->next 上产生竞争。缺点是,必须额外添加一个间接层次的指针数据来做虚拟节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 template <typename T>class queue { private : struct node { std::shared_ptr<T> data; std::unique_ptr<node> next; }; std::unique_ptr<node> head; node* tail; public : queue () : head (new node), tail (head.get ()) {} queue (const queue& other) = delete ; queue& operator =(const queue& other) = delete ; std::shared_ptr<T> try_pop () { if (head.get () == tail) { return std::shared_ptr <T>(); } std::shared_ptr<T> const res (head->data) ; std::unique_ptr<node> old_head = std::move (head); head = std::move (old_head->next); return res; } void push (T new_value) { std::shared_ptr<T> new_data (std::make_shared<T>(std::move(new_value))) ; std::unique_ptr<node> p (new node) ; tail->data = new_data; node* const new_tail = p.get (); tail->next = std::move (p); tail = new_tail; } };
现在的 push() 只能访问 tail,而不能访问 head,try_pop() 可以访问 head 和 tail,但是 tail 只需在最开始进行比较,所以所存在的时间很短。重大的提升在于虚拟节点意味着 try_pop() 和 push() 不能对同一节点进行操作,所以不再需要互斥了。现在,只需要使用一个互斥量来保护 head 和 tail 就够了。那么,现在应该锁哪里?
为了最大程度的并发化,所以需要上锁的时间尽可能的少。push() 很简单:互斥量需要对 tail 的访问上锁,锁需要持续到函数结束时才能解开。try_pop() 就不简单了。首先,需要使用互斥量锁住 head,一直到 head 弹出。实际上,一旦改变完成对 head 的改变,就能解锁互斥量,当返回结果时互斥量就不需要上锁了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 template <typename T>class threadsafe_queue { private : struct node { std::shared_ptr<T> data; std::unique_ptr<node> next; }; std::mutex head_mutex; std::unique_ptr<node> head; std::mutex tail_mutex; node* tail; node* get_tail () { std::lock_guard<std::mutex> tail_lock (tail_mutex) ; return tail; } std::unique_ptr<node> pop_head () { std::lock_guard<std::mutex> head_lock (head_mutex) ; if (head.get () == get_tail ()) { return nullptr ; } std::unique_ptr<node> old_head = std::move (head); head = std::move (old_head->next); return old_head; } public : threadsafe_queue () : head (new node), tail (head.get ()) {} threadsafe_queue (const threadsafe_queue& other) = delete ; threadsafe_queue& operator =(const threadsafe_queue& other) = delete ; std::shared_ptr<T> try_pop () { std::unique_ptr<node> old_head = pop_head (); return old_head ? old_head->data : std::shared_ptr <T>(); } void push (T new_value) { std::shared_ptr<T> new_data (std::make_shared<T>(std::move(new_value))) ; std::unique_ptr<node> p (new node) ; node* const new_tail = p.get (); std::lock_guard<std::mutex> tail_lock (tail_mutex) ; tail->data = new_data; tail->next = std::move (p); tail = new_tail; } };
get_tail() 中的 tail_mutex 解决了数据竞争的问题。因为调用 get_tail() 将会锁住同名锁,就像 push() 一样,这就为两个操作规定好了顺序。要不就是 get_tail() 在 push() 之前被调用,线程可以看到旧的尾节点,要不就是在 push() 之后完成,线程就能看到 tail 的新值。
当 get_tail() 调用前 head_mutex 已经上锁,这一步也是很重要。如果不这样,调用 pop_head() 时就会被 get_tail() 和 head_mutex 所卡住,因为其他线程调用 try_pop() 以及 pop_head() 时,都需要先获取锁:
1 2 3 4 5 6 7 8 9 10 std::unique_ptr<node> pop_head () { node* const old_tail = get_tail (); std::lock_guard<std::mutex> head_lock (head_mutex) ; if (head.get () == old_tail) { return nullptr ; } std::unique_ptr<node> old_head = std::move (head); head = std::move (old_head->next); return old_head; }
这是一个有缺陷的实现,在锁的范围之外调用 get_tail() 后再获取 head_mutex,在这两个操作之间数据结构可能已经发生了变化。
等待数据弹出 1 2 3 4 5 6 7 8 9 10 11 12 13 template <typename T>void threadsafe_queue<T>::push (T new_value) { std::shared_ptr<T> new_data (std::make_shared<T>(std::move(new_value))) ; std::unique_ptr<node> p (new node) ; { std::lock_guard<std::mutex> tail_lock (tail_mutex) ; tail->data = new_data; node* const new_tail = p.get (); tail->next = std::move (p); tail = new_tail; } data_cond.notify_one (); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 template <typename T>class threadsafe_queue { private : node* get_tail () { std::lock_guard<std::mutex> tail_lock (tail_mutex) ; return tail; } std::unique_ptr<node> pop_head () { std::unique_ptr<node> old_head = std::move (head); head = std::move (old_head->next); return old_head; } std::unique_lock<std::mutex> wait_for_data () { std::unique_lock<std::mutex> head_lock (head_mutex) ; data_cond.wait (head_lock, [&] { return head.get () != get_tail (); }); return std::move (head_lock); } std::unique_ptr<node> wait_pop_head () { std::unique_lock<std::mutex> head_lock (wait_for_data()) ; return pop_head (); } std::unique_ptr<node> wait_pop_head (T& value) { std::unique_lock<std::mutex> head_lock (wait_for_data()) ; value = std::move (*head->data); return pop_head (); } public : std::shared_ptr<T> wait_and_pop () { std::unique_ptr<node> const old_head = wait_pop_head (); return old_head->data; } void wait_and_pop (T& value) { std::unique_ptr<node> const old_head = wait_pop_head (value); } };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 template <typename T>class threadsafe_queue { private : std::unique_ptr<node> try_pop_head () { std::lock_guard<std::mutex> head_lock (head_mutex) ; if (head.get () == get_tail ()) { return std::unique_ptr <node>(); } return pop_head (); } std::unique_ptr<node> try_pop_head (T& value) { std::lock_guard<std::mutex> head_lock (head_mutex) ; if (head.get () == get_tail ()) { return std::unique_ptr <node>(); } value = std::move (*head->data); return pop_head (); } public : std::shared_ptr<T> try_pop () { std::unique_ptr<node> old_head = try_pop_head (); return old_head ? old_head->data : std::shared_ptr <T>(); } bool try_pop (T& value) { std::unique_ptr<node> const old_head = try_pop_head (value); return old_head; } bool empty () { std::lock_guard<std::mutex> head_lock (head_mutex) ; return (head.get () == get_tail ()); } };
设计更加复杂的数据结构 使用锁的线程安全字典 和队列和栈一样,std::map<> 的接口不适合多线程进行并发访问,因为这些接口都存在固有的条件竞争。
如果坚持线程安全指导意见,例如:不要返回一个引用,并且用一个简单的互斥锁对每一个成员函数进行上锁,以确保每一个函数线程安全。最有可能的条件竞争在于,当一对键值对加入时,当两个线程都添加一个数据,那么肯定一先一后。一种解决方式是合并“添加”和“修改”操作为一个成员函数。
当在键值没有对应值的时候进行返回时,允许用户提供一个默认值:
1 mapped_type get_value (key_type const & key, mapped_type default_value) ;
这种情况下,当 default_value 没有明确的给出时,默认构造出的 mapped_type 实例将使用,可以扩展成返回一个 std::pair<mapped_type, bool> 来代替 mapped_type 实例,其中 bool 代表返回值是否是当前键对应的值。另一个选择是返回指向数据的智能指针,当指针的值是 nullptr 时,这个键值就没有对应的数据。
当接口确定时(假设此时没有接口间的条件竞争),可以通过对每一个成员函数使用互斥量和锁来保护底层数据的线程安全性。不过,当独立函数对数据结构进行读取和修改时,就会降低并发的可能性。一个选择是使用一个互斥量去面对多个读线程或一个写线程。虽然会提高并发访问,但是同时只有一个线程能对数据结构进行修改。
为了允许细粒度锁能正常工作,需要对数据结构的细节进行仔细考究,而非直接使用已知容器。列出三种常见的实现字典的方式:二叉树(如红黑树)、有序数组、哈希表。
二叉树的方式,不会提高并发访问的能力。每一个查找或者修改操作都需要访问根节点,所以根节点需要上锁。虽然访问线程在向下移动时,锁可以进行释放,但相比横跨整个数据结构的单锁,并没有什么优势。
有序数组是最坏的选择,因为无法提前言明数组中哪段是有序的,所以需要用一个锁将整个数组锁起来。
最后就剩哈希表了。假设有固定数量的桶,每个桶都有一个键值,以及哈希函数。这就意味着你可以安全的对每个桶上锁。当再次使用互斥量(支持多读单写)时,就能将并发访问的可能性增加 N 倍,这里 N 是桶的数量。当然,缺点也是有的:对于键值的操作,需要有合适的函数。C++ 标准库提供 std::hash<> 模板,可以直接使用,用户还可以简单的对键值类型进行特化。如果去效仿标准无序容器,并且获取函数对象的类型作为哈希表的模板参数,用户可以选择特化 std::hash<> 的键值类型,或者直接提供哈希函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 class bucket_type { private : typedef std::pair<Key, Value> bucket_value; typedef std::list<bucket_value> bucket_data; typedef typename bucket_data::iterator bucket_iterator; bucket_data data; mutable std::shared_mutex mutex; bucket_iterator find_entry_for (Key const & key) const { return std::find_if ( data.begin (), data.end (), [&](bucket_value const & item) { return item.first == key; }); } public : Value value_for (Key const & key, Value const & default_value) const { std::shared_lock<std::shared_mutex> lock (mutex) ; bucket_iterator const found_entry = find_entry_for (key); return (found_entry == data.end ()) ? default_value : found_entry->second; } void add_or_update_mapping (Key const & key, Value const & value) { std::unique_lock<std::shared_mutex> lock (mutex) ; bucket_iterator const found_entry = find_entry_for (key); if (found_entry == data.end ()) { data.push_back (bucket_value (key, value)); } else { found_entry->second = value; } } void remove_mapping (Key const & key) { std::unique_lock<std::shared_mutex> lock (mutex) ; bucket_iterator const found_entry = find_entry_for (key); if (found_entry != data.end ()) { data.erase (found_entry); } } }; template <typename Key, typename Value, typename Hash = std::hash<Key>>class threadsafe_lookup_table { private : std::vector<std::unique_ptr<bucket_type>> buckets; Hash hasher; bucket_type& get_bucket (Key const & key) const { std::size_t const bucket_index = hasher (key) % buckets.size (); return *buckets[bucket_index]; } public : typedef Key key_type; typedef Value mapped_type; typedef Hash hash_type; threadsafe_lookup_table (unsigned num_buckets = 19 , Hash const & hasher_ = Hash ()) : buckets (num_buckets), hasher (hasher_) { for (unsigned i = 0 ; i < num_buckets; ++i) { buckets[i].reset (new bucket_type); } } threadsafe_lookup_table (threadsafe_lookup_table const & other) = delete ; threadsafe_lookup_table& operator =(threadsafe_lookup_table const & other) = delete ; Value value_for (Key const & key, Value const & default_value = Value()) const { return get_bucket (key).value_for (key, default_value); } void add_or_update_mapping (Key const & key, Value const & value) { get_bucket (key).add_or_update_mapping (key, value); } void remove_mapping (Key const & key) { get_bucket (key).remove_mapping (key); } };
实现中使用了 std::vector<std::unique_ptr> 来保存桶,其允许在构造函数中指定构造桶的数量。默认为 19 个,这个值可以是一个任意的质数。哈希表在有质数个桶时,工作效率最高。每一个桶都会被一个 std::shared_mutex 实例锁保护,对于每一个桶只有一个线程能对其进行修改。这三个函数都使用到了find_entry_for()成员函数,用来确定数据是否在桶中。每一个桶本质上是一个键值对的链表,所以添加和删除数据就会很简单。
从并发的角度考虑,互斥锁保护所有成员,这样的实现是“异常安全”的吗?value_for 是不能修改任何值的,所以其不会有问题。如果 value_for 抛出异常,也不会对影响任何数据结构。remove_mapping 修改链表时,会调用 erase,这个函数能保证没有异常抛出。那么就剩 add_or_update_mapping 了,可能会在其两个 if 分支上抛出异常。push_back 是异常安全的,如果有异常抛出,也会将链表恢复成原始状态。唯一的问题就在赋值阶段(替换已有的数据),当赋值阶段抛出异常,用于依赖的原始状态没有改变,所以不会影响数据结构的整体,以及用户提供类型的属性,这样就可以放心的将问题交给用户处理。
字典容器还有一些非常有用的操作,比如获取所有键值对的完整快照。这要求锁住整个容器,保证拷贝副本的状态是可以索引的,这将需要在同一时间将所有桶都锁住。因此,只要每次以相同的顺序进行上锁(例如递增桶的索引值),就不会产生死锁。实现如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 std::map<Key, Value> threadsafe_lookup_table::get_map () const { std::vector<std::unique_lock<std::shared_mutex> > locks; for (unsigned i = 0 ; i < buckets.size (); ++i) { locks.push_back (std::unique_lock <std::shared_mutex>(buckets[i].mutex)); } std::map<Key, Value> res; for (unsigned i = 0 ; i < buckets.size (); ++i) { for (bucket_iterator it = buckets[i].data.begin (); it != buckets[i].data.end (); ++it) { res.insert (*it); } } return res; }
使用锁的线程安全链表 链表类型是数据结构中的基本类型,但是并不容易修改成线程安全的,因为需要提供迭代器的支持。迭代器的问题在于其需要持有容器内部引用。当容器可被其他线程修改时,这个引用还是有效的。实际上就需要迭代器持有锁,对指定的结构中的部分进行上锁。在迭代器的生命周期中,让其完全脱离容器的控制是很糟糕的做法。
替代方案就是提供迭代函数,例如:将 for_each 作为容器本身的一部分。这就能让容器对迭代的部分进行负责和锁定,不过这将违反之前提到的的指导意见。为了让 for_each 在任何情况下都有效,其必须调用用户提供的代码,而在这期间 for_each 会持有内部锁,这可能会造成死锁。不仅如此,需要传递一个对容器中元素的引用到用户代码中,就是让用户代码对容器中的元素进行操作。为了避免传递引用,需要传出一个拷贝到用户代码中。不过当数据很大时,拷贝要付出的代价也很大。
所以,可以将避免死锁的工作和避免对引用进行存储时的条件竞争交给用户去做。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 struct node { std::mutex m; std::shared_ptr<T> data; std::unique_ptr<node> next; node () : next () {} node (T const & value) : data (std::make_shared <T>(value)) {} }; template <typename T>class threadsafe_list { public : threadsafe_list () {} ~threadsafe_list () { remove_if ([](node const &) { return true ; }); } threadsafe_list (threadsafe_list const & other) = delete ; threadsafe_list& operator =(threadsafe_list const & other) = delete ; void push_front (T const & value) { std::unique_ptr<node> new_node (new node(value)) ; std::lock_guard<std::mutex> lk (head.m) ; new_node->next = std::move (head.next); head.next = std::move (new_node); } template <typename Function> void for_each (Function f) { node* current = &head; std::unique_lock<std::mutex> lk (head.m) ; while (node* const next = current->next.get ()) { std::unique_lock<std::mutex> next_lk (next->m) ; lk.unlock (); f (*next->data); current = next; lk = std::move (next_lk); } } template <typename Predicate> std::shared_ptr<T> find_first_if (Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk (head.m) ; while (node* const next = current->next.get ()) { std::unique_lock<std::mutex> next_lk (next->m) ; lk.unlock (); if (p (*next->data)) { return next->data; } current = next; lk = std::move (next_lk); } return std::shared_ptr <T>(); } template <typename Predicate> void remove_if (Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk (head.m) ; while (node* const next = current->next.get ()) { std::unique_lock<std::mutex> next_lk (next->m) ; if (p (*next->data)) { std::unique_ptr<node> old_next = std::move (current->next); current->next = std::move (next->next); next_lk.unlock (); } else { lk.unlock (); current = next; lk = std::move (next_lk); } } } private : node head; };
for_each() 这个操作对队列中的每个元素执行 Function。链表中会有“手递手”的上锁过程,这个过程开始时,需要锁住 head 及节点的互斥量。然后,安全的获取指向下一个节点的指针。当指针不为空时,为了继续对数据进行处理,就需要对指向的节点进行上锁。当锁住了那个节点,就可以对上一个节点进行释放了,并调用指定函数。当函数执行完成时,就可以更新当前指针所指向的节点(刚刚处理过的节点),并将所有权从 next_lk 移动移动到 lk。
find_first_if() 的功能是对链表中每个元素执行 Predicate 函数,如果是 true 则返回当前节点值。可以使用 for_each() 来做这件事,不过在找到后,继续做查找就没意义了。
remove_if()就有些不同了,因为函数会改变链表结构。所以就不能使用 for_each() 实现这个功能。当 Predicate 函数返回 true,对应元素将会移除,并且更新 current->next。当这些都做完,就可以释放 next 指向节点的锁。当 Predicate 函数返回 false,移动的操作就和之前一样了。
那么,所有的互斥量中会有死锁或条件竞争吗?答案无疑是“否”,要看提供的函数是否有良好的行为。迭代通常都使用一种方式,从 head 节点开始,并且在释放当前节点锁之前,将下一个节点的互斥量锁住,所以就不可能会有不同线程有不同的上锁顺序。唯一可能出现条件竞争的地方就在 remove_if() 中删除已有节点的时候,但是可以确定这是安全的,因为现在还持有前一个节点的互斥锁,所以不会有新的线程尝试去获取正在删除节点的互斥锁。
细粒度锁要比单锁的并发概率大很多:同一时间内,不同线程在不同节点上工作,无论是使用 for_each() 对每一个节点进行处理,还是使用 find_first_if() 对数据进行查找,或是使用 remove_if() 删除一些元素。不过,因为互斥量必须按顺序上锁,线程就不能交叉进行工作。当线程耗费大量的时间对一个特殊节点进行处理,其他线程就必须等待这个处理完成。完成后,其他线程才能到达这个节点。