07 设计无锁的并发数据结构 | 《C++ Concurrency In Action》笔记

本系列笔记参考《C++ Concurrency in Action, 2nd Edition》及其中文翻译

定义和意义

使用互斥量、条件变量,以及 future 可以用来同步算法和数据结构。调用库函数将会挂起执行线程,直到其他线程完成某个特定动作。库函数将调用阻塞操作来对线程进行阻塞,在阻塞解除前线程无法继续自己的任务。通常,操作系统会完全挂起一个阻塞线程(并将其时间片交给其他线程),直到解阻塞。“解阻塞”的方式很多,比如互斥锁解锁、通知条件变量达成,或让“future 状态”就绪。

不使用阻塞库的数据结构和算法称为“非阻塞数据结构”。不过,非阻塞的数据结构并非都是无锁的。

不变量

所谓不变量,指的是对于某个数据结构,当其合法可用时,总是成立的一个命题。当编写队列时,你需要为每一个队列指定一个队列头指针,指向队列的第一个元素。每一个数据元素也包含指向下一个元素的指针。重要的并不完全是数据,程序还要依赖于数据之间的关系。例如,队列或者为空,或者包含一个指向队首元素的指针。数据元素包含的指针或者指向下一个队列元素,或者为空(此时该元素为队尾元素)上述关系就是队列中的不变量。

即使不变量有时是不明显的,也很难遇到一个完全没有不变量的程序。当程序遇到被破坏的不变量时,系统可能会返回错误结果甚至立即失败。例如,当程序试图解除对指向无效数据元素的队列头指针的引用时。

阻塞与饥饿

在上图中,线程 2 获得了数据结构上的锁。当线程 1 也尝试获取锁时,它需要等待直到线程 2 释放锁为止。它不会在获得锁定之前继续进行。如果在线程 2 持有锁的同时将其挂起,线程 1 将不得不永远等待。线程 1 的等待被称为线程阻塞。阻塞是指一个线程进入临界区后,其它线程就必须在临界区外等待,待进去的线程执行完任务离开临界区后,其它线程才能再进去。

线程 2 访问数据结构,但不获取锁。线程 1 尝试同时访问数据结构,检测并发访问,然后立即返回,通知线程它无法完成(红色)操作,因为有别的线程也在访问该数据结构。然后线程 1 将再次尝试,直到成功完成操作为止(绿色)。这种方法的优点是不需要锁。但是可能发生的情况是:如果线程 2(或其他线程)以高频率访问数据结构,那么线程 1 需要大量尝试,直到最终成功。线程 1 的不停尝试被称为线程饥饿。

非阻塞数据结构

一般认为并发可以分为阻塞与非阻塞,对于非阻塞可以进一步细分为无障碍、无锁、无等待:

  • 无障碍:如果其他线程都暂停了,任何给定的线程都将在一定时间内完成操作
  • 无锁:如果多个线程对一个数据结构进行操作,经过一定时间后,其中一个线程将完成其操作
  • 无等待:即使有其他线程也在对该数据结构进行操作,每个线程都将在一定的时间内完成操作

无障碍数据结构

无障碍是一种最弱的非阻塞调度。两个线程如果是无障碍的执行,那么他们不会因为临界区的问题导致一方被挂起。换言之,大家都进入临界区了。那么如果一起修改共享数据,把数据改坏了可怎么办呢?对于无障碍的线程来说,一旦检测到这种情况,它就会立即对自己所做的修改进行回滚,确保数据安全。从这个策略中也可以看到,当临界区中存在严重的冲突时,所有的线程可能都会不断地回滚自己的操作,而没有一个线程可以走出临界区。这种情况会影响系统的正常执行。

一种可行的无障碍实现可以依赖一个“一致性标记”来实现。线程在操作之前,先读取并保存这个标记,在操作完成后,再次读取,检查这个标记是否被更改过,如果两者是一致的,则说明资源访问没有冲突。如果不一致,则说明资源可能在操作过程中与其他写线程冲突,需要重试操作。而任何对资源有修改操作的线程,在修改数据前,都需要更新这个一致性标记,表示数据不再安全。

跟非阻塞调度比较,阻塞调度可以认为是一种悲观的策略,它会认为多个线程一起修改数据会使数据损坏,所以阻塞调度每次只能允许一个线程去修改数据。而非阻塞调度相对来说比较乐观,它认为如果多个线程一起修改也未必会把造成数据损坏,所以它允许多个线程同时进入临界区,但无障碍是一种宽进严出的策略,进的时候不作限制,所有的线程都能进入临界区做其想做的事情,包括读与写,但是出来的时候就不那么宽松了,如果一个线程在临界区中的操作遇到了数据竞争,跟其它线程产生了冲突,它就会回滚这条数据,然后重试自己的操作。比如读取 x 与 y 的值,这个操作是分步进行的先读 x,再读 y,当读完 x,发现别的线程修改了 x,再读 y 就已经没有意义了,因为可能会读到一个错误的数据,所以该线程会重试,再去读取一次,直到自己读到的 x、y 没有问题为止,所以无障碍是一种会不断重试的调度策略,但它会保证没有数据竞争时,线程必然能在有限的步骤内执行完任务(即:如果其他线程都暂停了,任何给定的线程都将在一定时间内完成操作)。

在无障碍的调度方式当中,所有的线程都相当于在拿取一个系统当前的快照,它们会一直重试,直到拿到的快照有效为止。

无锁数据结构

前面说的无障碍是指所有的线程都能进入临界区,但如果发生了竞争,无障碍并不保证临界区的线程能够顺利的出来,因为如果线程发现自己的数据每次去读取或者去操作,总是跟其它线程产生冲突,它就会不停地重试,如果在临界区当中有 10 个线程,线程 1 修改了部分数据,结果它被线程 2 干扰了,线程 2 又被线程 3 干扰,依此类推,最后线程 1 它又可能去干扰线程 10,如果它们之间是彼此干扰的,最终会导致所有的线程都卡死在里面,系统的性能会受到比较严重的影响,因此,无锁必须在无障碍的基础上加一个约束,保证在竞争当中有一个线程是必然能够胜出的(即:如果多个线程对一个数据结构进行操作,经过一定时间后,其中一个线程将完成其操作)。这样就能保证在临界区的线程当中至少有一个是能顺利走出去的,而不至于全部在里面阵亡掉,如果至少有一个线程能够出去,那么就有第二个线程能够出去,假设里面有一百个线程,第一个线程竞争胜利,走出了临界区,剩下 99 个再竞争又必然能胜利一个,因为每次竞争它必然保证能有一个胜利,使得系统至少是能够顺畅的执行下去的。

在高并发多线程中,CAS(Compare And Swap)技术就是一种无锁实现。在它的实现中,使用了一个无限循环,当要修改的内容和期望内容一致时,才去做修改。因此 CAS 对死锁是免疫的。

另外,使用无锁方式,省去了线程之间竞争临界区资源锁而产生的性能损耗,也没有线程之间频繁调度带来的开销。

无等待数据结构

前面说了无锁是能保证至少有一个线程能够在有限步当中完成它的操作,所有的线程在不停地竞争直到有一个胜出为止。无等待相比于无锁更进一步,它首先要求是无锁的,保证所有线程能进并且至少有一个线程能出来,同时无等待它在提高要求,它要求所有进入临界区的线程都能够在有限步当中完成其操作(即:即使有其他线程也在对该数据结构进行操作,每个线程都将在一定的时间内完成操作)。这个要求很高,因为任何线程都能够无障碍进入临界区,并且任何线程都能够在有限步当中完成操作后离开临界区,这就会使得整个系统的运行变得非常顺畅,无等待可以说是并行最高级别了,它基本上能使整个系统发挥到最好佳效率。

无等待必须然也是无饥饿的,因为所有的线程都能在有限步当中完成,因此必然不会有线程永久地呆在临界区内出不去,所以它一定是无饥饿的。

无等待的一个典型案例是,有读写两个线程,如果说只有读线程没有写线程,那么所有的读线程之间必然是无等待的,因为读不会修改数据,如果有一个写线程在里面,由于会修改数据,写线程必然会导致读线程不是无等待。因此可以提出一种算法去作一点改进,比如说有一种算法它会这样做,因为写可能会影响到读,所以每次写之前先把数据拷贝一份副本,线程修改的是这个副本而非原始数据,修改数据的过程可能需要一点时间,因为修改的是副本数据而不是原始数据,所以这个修改的过程也不影响线程读,因此在这个过程当中所有的读线程一样是无等待的,它们都能够在有限的步骤当中完成自己的操作,而所有的写线程相对来讲,因为每个写线程它都是写自己的副本,因此它们的写也是无等待的,所以它们都不需要去跟彼此作同步,最后需要同步的只是将写完之后的数据覆盖原始数据,而这个覆盖原始数据的动作是非常快的,因为我们并不需要作大量的写操作,只不过是一个指针或引用作一个替换而已,不管哪个写线程胜出,总是能够保证替换上去的数据是一致的,并不像其它的算法一样,可能会把数据写坏,因为大家都写的是副本,最后是一个指针指向谁的问题,这样数据必然是安全的,这种方式它就是无等待的一个典型的实现。

无锁数据结构的利与弊

使用无锁结构的主要原因:最大化并发。使用基于锁的容器,会让线程阻塞或等待,并且互斥锁削弱了结构的并发性。无锁数据结构中,某些线程可以逐步执行。无等待数据结构中,每一个线程都可以独自向前运行,这种理想的方式实现起来很难。

使用无锁数据结构的第二个原因就是鲁棒性。当一个线程在持有锁时被终止,那么数据结构将会永久性的破坏。不过,当线程在无锁数据结构上执行操作,在执行到一半终止时,数据结构上的数据没有丢失(除了线程本身的数据),其他线程依旧可以正常执行。

另一方面,当不能限制访问数据结构的线程数量时,就需要注意不变量的状态,或选择替代品来保持不变量的状态。同时,还需要注意操作的顺序。为了避免未定义行为,及相关的数据竞争,必须使用原子操作对修改操作进行限制。不过,仅使用原子操作是不够的,需要确定其他线程看到的修改,是否遵循正确的顺序。

因为,没有任何锁(有可能存在活锁),死锁问题不会困扰无锁数据结构。活锁的产生是两个线程同时尝试修改数据结构,但每个线程所做的修改操作都会让另一个线程重启,所以两个线程就会陷入循环,多次的尝试完成自己的操作。试想有两个人要过独木桥,当两个人从两头向中间走的时候,他们会在中间碰到,然后需要再走回出发的地方,再次尝试过独木桥。要打破僵局,除非有人先到独木桥的另一端(或是商量好了,或是走的快,或纯粹是运气),要不这个循环将一直重复下去。不过活锁的存在时间并不久,因为其依赖于线程调度。所以只是对性能有所消耗,而不是一个长期问题,但这个问题仍需要关注。根据定义,因其操作执行步骤有上限,无等待的代码不会受活锁所困扰。换句话说,无等待的算法要比等待算法的复杂度高,即使没有其他线程访问数据结构,也可能需要更多步骤来完成相应操作。

“无锁-无等待”代码的缺点:虽然提高了并发访问的能力,减少了单个线程的等待时间,但是其可能会将整体性能拉低。首先,原子操作的无锁代码要慢于无原子操作的代码,原子操作就相当于无锁数据结构中的锁。不仅如此,硬件必须通过同一个原子变量对线程间的数据进行同步。提交代码之前,无论是基于锁的数据结构,还是无锁的数据结构,对性能的检查都很重要(最坏的等待时间,平均等待时间,整体执行时间或者其他指标)。

无锁数据结构的例子

实现一个无锁的线程安全栈

push

栈的要求很简单:查询顺序是添加顺序的逆序——先入后出。所以,确保值能安全的入栈就十分重要,因为可能在入栈后,会马上被其他线程索引,同时确保只有一个线程能索引到指定数据。最简单的栈就是链表,head 指针指向第一个节点(可能是下一个被索引到的节点),并且每个节点依次指向下一个节点。这样的情况下,添加一个节点很简单:

  1. 创建新节点 node
  2. node->next 指向 head->next
  3. head->next 指向 node

单线程中这种方式没有问题,不过使用多线程对栈进行修改时,这几步就不够用了。有两个线程同时添加节点时,第 2 步和第 3 步会产生条件竞争。就会使之前那个线程的结果丢弃,亦或是造成更加糟糕的后果。了解了如何解决这个条件竞争前,还要注意当 head 更新并指向了新节点时,另一个线程就能读取到这个节点,因此 head 设置在指向新节点前。因为在这之后就不能对节点进行修改了,所以新节点准备就绪就很重要。

那如何应对条件竞争呢?答案就是:第 3 步的时候使用原子“比较/交换”操作,来保证步骤 2 对 head 进行读取时,不会对 head 进行修改,有修改时可以循环“比较/交换”操作。下面的代码中,就不用锁来实现线程安全的 push() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template<typename T>
class lock_free_stack {
private:
struct node {
T data;
node* next;
node(T const& data_) : data(data_) {}
};
std::atomic<node*> head;

public:
void push(T const& data) {
// 1. 创建新节点 new_node
node* const new_node = new node(data);
// 2. new_node->next 指向 head
new_node->next = head.load();
// 3. head 与 new_node->next 相等时,表明没人修改,此时将 head 指向 new_node;否则 new_node->next 指向 head,回退到上一步
while(!head.compare_exchange_weak(new_node->next, new_node));
}
};

代码的亮点是使用“比较/交换”操作:返回 false 时,因为比较失败(比如 head 被其他线程锁修改),会使用 head 中的内容更新 new_node->next 的内容。因为编译器会重新加载 head 指针,所以循环中不需要每次都重新加载 head 指针。这里也可以直接将整个循环更换为 head.compare_exchange_strong(new_node->next, new_node),只是性能可能会差些。

pop

pop 的步骤如下:

  1. 获取 head
  2. 读取 head->next 指向的结点 node
  3. 设置 head->next 指向 node->next
  4. 通过 node 返回携带的数据 data
  5. 删除 node 节点

多线程环境下,就不那么简单了。有两个线程要从栈中移除数据时,两个线程可能在步骤 1 中读取到同一个 head。其中一个线程处理到步骤 5 时,另一个线程还在处理步骤 2,这个还在处理步骤 2 的线程将会解引用一个悬空指针。这是书写无锁代码的问题之一,所以现在只能跳过步骤 5,让节点泄露(只是不删除节点,但是节点不会被读到)。

另一个问题:两个线程读取到同一个 head 值时,同一个节点将会被返回到两个线程,这就违反了栈结构的意图,所以需要避免这样的情况发生。可以像在 push() 函数中解决条件竞争那样来解决这个问题:使用“比较/交换”操作更新 head。当“比较/交换”操作失败时,不是一个新节点已被推入,就是其他线程已经弹出了节点。无论是哪种情况,都得返回步骤 1(“比较/交换”操作将会重新读取 head)。当“比较/交换”成功,就可以确定当前线程是弹出指定节点的唯一线程,之后就可以放心的执行步骤 4 了。这里先看一下 pop() 的雏形:

1
2
3
4
5
6
7
8
9
template<typename T>
class lock_free_stack {
public:
void pop(T& result) {
node* old_head = head.load();
while(!head.compare_exchange_weak(old_head, old_head->next));
result = old_head->data;
}
};

“比较/交换”操作返回 true 时,说明 head 和 old_head 相等,此时认为该线程是唯一获取 head 的线程,此时就可以将 old_head->next 的值赋给 head,即 head->next 指向 node->next。“比较/交换”操作返回 false 时,说明 head 和 old_head 不相等,head 已被更改,此时就可以将 head 的值赋给 old_head,进入下一次循环。

这段代码很优雅,但有两个节点泄露的问题。首先,这段代码在空链表时不工作:当 head 指针是空指针时,要访问 next 指针时,将引起未定义行为。很容易通过对 nullptr 的检查进行修复(在 while 循环中),要不对空栈抛出一个异常,要不返回一个 bool 值来表明成功与否。

第二个问题就是异常安全。第 3 章中介绍栈结构时,了解了在返回值时会出现异常安全问题:有异常抛出时,复制的值将丢失。这种情况下,传入引用是一种可以接受的解决方案,这样就能保证当有异常抛出时,栈上的数据不会丢失。不幸的是,不能这样做,只能在单一线程对值进行返回时,才进行拷贝以确保拷贝操作的安全性,这就意味着在拷贝结束后这个节点就会删除。因此,通过引用获取返回值的方式不可取。若想要安全的返回,必须使用第 3 章中的其他方法:返回指向数据值的(智能)指针。返回的是智能指针时,返回 nullptr 表明没有值可返回,但是要求在堆上对智能指针进行内存分配。将分配过程做为 pop() 的一部分时(也没有更好的选择了),堆分配内存时可能会抛出一个异常。与此相反,在 push() 操作中对内存进行分配——无论怎样,都需要对 node 进行内存分配。返回一个 std::shared_ptr<> 不会抛出异常,所以在 pop() 中进行内存分配是安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template<typename T>
class lock_free_stack {
private:
struct node {
std::shared_ptr<T> data; // 智能指针指向当前数据
node* next;
node(T const& data_) : data(std::make_shared<T>(data_)) {} // 让 std::shared_ptr 指向新分配出来的 T,必须在堆上为数据分配内存
};

std::atomic<node*> head;
public:
void push(T const& data) {
node* const new_node = new node(data);
new_node->next = head.load();
while(!head.compare_exchange_weak(new_node->next, new_node));
}

std::shared_ptr<T> pop() {
node* old_head = head.load();
while(old_head /*在解引用前检查 old_head 是否为空指针*/ && !head.compare_exchange_weak(old_head, old_head->next));
return old_head ? old_head->data : std::shared_ptr<T>(); // 不存在时,将返回一个空指针
}
};

注意,结构是无锁的,不是无等待的,因为在 push() 和 pop() 函数中都有 while 循环,当 compare_exchange_weak() 失败的时候,循环将会持续下去。

如果语言有垃圾收集机制(比如 C# 或 Java),那到这就结束了,节点一旦它不再被任何线程访问,将被收集和循环利用。然而,并不是所有 C++ 编译器都有垃圾回收机制。

终止内存泄露:使用无锁数据结构管理内存

当释放一个节点时,需要确认其他线程没有持有这个节点。只有一个线程调用 pop()时,可以放心的释放。push() 和栈中的节点没什么关系,因为 push() 只会修改 head 节点,所以调用 pop() 函数的线程只和栈中的节点有关,并且能够安全的将节点删除。

栈处理多线程对 pop() 的调用时,就知道节点什么时候删除,这需要一个专用的垃圾回收器:检查 pop() 可访问哪些节点。多线程只能通过 pop() 访问同一节点,没有线程调用 pop() 时,就可以删除栈上的节点。添加节点到“可删除”列表中时,就能从结点中提取数据了。没有线程通过 pop() 访问节点时,可以安全的删除这些节点了。

通过计数即可知道没有线程调用 pop():当计数器数值增加时,就有节点推入。当减少时,就有节点删除。从“可删除”列表中删除节点就很安全,直到计数器的值为 0 为止。计数器必须是原子的,这样才能在多线程的情况下正确的进行计数。下面的代码展示了修改后的 pop() 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
template<typename T>
class lock_free_stack {
private:
std::atomic<unsigned> threads_in_pop; // 原子变量
void try_reclaim(node* old_head);

public:
std::shared_ptr<T> pop() {
++threads_in_pop; // 在做事之前,计数值加 1
node* old_head = head.load();
while(old_head && !head.compare_exchange_weak(old_head, old_head->next));
std::shared_ptr<T> res;
if (old_head) {
res.swap(old_head->data); // 从节点中直接提取数据,而非拷贝指针
}
try_reclaim(old_head); // 回收删除的节点
return res;
}
};

threads_in_pop 原子变量用来记录有多少线程试图弹出栈中的元素。调用 pop() 函数时,计数器加 1。调用 try_reclaim() 时,计数器减 1。节点调用函数时,说明节点已删除。因为暂时不需要将节点删除,可以通过 swap() 函数来删除节点上的数据(而非只是拷贝指针),当不再需要这些数据时,数据会自动删除,而不是持续存在。接下来看一下 try_reclaim() 是如何实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
template <typename T>
class lock_free_stack {
private:
std::atomic<node*> to_be_deleted_;

static void delete_nodes(node* nodes) {
while (nodes) {
node* next = nodes->next;
delete nodes;
nodes = next;
}
}

void try_reclaim(node* old_head) {
if (threads_in_pop == 1) {
node* nodes_to_delete = to_be_deleted_.exchange(nullptr); // 声明可删除列表,exchange() 允许使用新选的值替换已存储的值,并且返回原始值
if (--threads_in_pop == 0) { // 计数器减 1,并且判断是否只有一个线程调用 pop()
delete_nodes(nodes_to_delete);
} else if (nodes_to_delete) {
chain_pending_nodes(nodes_to_delete); // 将其挂在等待删除链表后
}
delete old_head;
} else {
chain_pending_node(old_head);
--threads_in_pop;
}
}

void chain_pending_nodes(node* nodes) {
node* last = nodes;
while (node* const next = last->next) { // 让 next 指针指向链表的末尾
last = next;
}
chain_pending_nodes(nodes, last);
}

void chain_pending_nodes(node* first, node* last) {
last->next = to_be_deleted_; // 将最后一个节点的 next 指针替换为当前 to_be_deleted_ 指针
while (!to_be_deleted_.compare_exchange_weak(last->next, first)); // 将链表的第一个节点作为新的 to_be_deleted_ 指针进行存储
}

void chain_pending_node(node* n) {
chain_pending_nodes(n, n); // 添加单个节点是一种特殊情况,因为需要将这个节点作为第一个节点进行添加(同时作为最后一个节点)
}
};

回收节点时,threads_in_pop 是 1,说明只有一个线程试图弹出栈中的元素。当数值不是 1 时,删除任何节点都不安全,所以需要向等待列表中添加该节点。

假设某一时刻,threads_in_pop 值为 1。就可以尝试回收等待列表,如果不回收,节点就会继续等待,直到整个栈被销毁。要做到回收,首先要通过原子 exchange 操作声明可删除列表,并将计数器减 1。如果之后计数的值为 0,意味着没有其他线程访问等待节点链表。不必为出现新的等待节点而烦恼,因为它们会安全的回收。而后,可以使用 delete_nodes 对链表进行迭代,并将其删除。

计数值在减后不为 0 时,回收节点就不安全,此时需要将其挂在等待删除链表后,这种情况会发生在多个线程同时访问数据结构的时候。下图展示了一种可能的情况:

图中线程 C 添加节点 Y 到 to_be_deleted_ 链表中,即使线程 B 仍将其引用作为 old_head,之后会尝试访问其 nex t指针。线程 A 删除节点时,会造成线程 B 发生未定义行为。所以要在 try_reclaim() 对声明节点进行删除前对 threads_in_pop 进行检查。

当栈处于高负荷状态时,to_be_deleted_ 链表将会无限增加,因为找不到机会进行节点回收,会再次泄露。这样的话就得为回收节点寻找替代机制。关键是要确定无线程访问给定节点,这样给定节点就能回收,所以最简单的替换机制就是使用风险指针(hazard pointer)。

使用风险指针检测不可回收的节点

使用引用计数

无锁栈上的内存模型

实现一个无锁的线程安全队列

设计无锁数据结构的指导建议

使用 std::memory_order_seq_cst

std::memory_order_seq_cst 比起其他内存序要简单的多。本章的所有例子,都是从 std::memory_order_seq_cst 开始,只有当基本操作正常工作的时候,才放宽内存序的选择。这种情况下,使用其他内存序就是优化(早期可以不用这样做)。通常,了解整套代码对数据结构的操作后,才能决定是否要放宽内存序的选择。

对无锁内存的回收

与无锁代码最大的区别就是内存管理。当线程对节点进行访问的时候,线程无法删除节点。为避免内存的过多使用,还是希望这个节点能在删除的时候尽快删除。本章中介绍了三种技术来保证内存可以安全回收:

  • 等待无线程对数据结构进行访问时,删除所有等待删除的对象
  • 使用风险指针来标识正在访问的对象
  • 对对象进行引用计数,当没有线程对对象进行引用时将其删除

所有例子的想法都是使用一种方式去跟踪指定对象上的线程访问数量。无锁数据结构中,还有很多方式可以用来回收内存,例如使用一个垃圾收集器,只需要让回收器知道,当节点没引用的时就回收节点,这比起算法来说更容易实现一些。

其他替代方案就是循环使用节点,只在数据结构销毁时才将节点完全删除。因为节点能复用,这样就不会有非法的内存,所以就能避免未定义行为的发生。这种方式的缺点,就是会产生“ABA问题”。

小心 ABA 问题

基于“比较/交换”的算法中要格外小心“ABA 问题”。其流程是:

  1. 线程 1 读取原子变量 x,并且发现其值是 A
  2. 线程 1 对这个值进行一些操作,比如解引用或做查询,或其他操作
  3. 操作系统将线程 1 挂起。
  4. 其他线程对 x 执行一些操作,并且将其值改为 B
  5. 另一个线程对 A 相关的数据进行修改(线程 1 持有),让其不再合法。可能会在释放指针指向的内存时,代码产生剧烈的反应,或者只是修改了相关值而已
  6. 再来一个线程将 x 的值改回为 A。如果 A 是一个指针,那么其可能指向一个新的对象,只是与旧对象共享同一个地址而已
  7. 线程 1 继续运行,并且对 x 执行“比较/交换”操作,将 A 进行对比。这里,“比较/交换”成功,不过这是一个错误的 A。从第 2 步中读取的数据不再合法,但是线程 1 无法言明这个问题,并且之后的操作将会损坏数据结构。

本章提到的算法不存在这个问题,不过在无锁的算法中,这个问题很常见。解决这个问题的一般方法是:让变量 x 中包含一个 ABA 计数器。“比较/交换”会对加入计数器的 x 进行操作,每次的值都不一样,计数随之增长。所以,x 还是原值的前提下,即使有线程对 x 进行修改,“比较/交换”还是会失败。“ABA 问题”在使用释放链表和循环使用节点的算法中很是普遍,而将节点返回给分配器,则不会引起这个问题。

识别忙等待循环和帮助其他线程

在本章最后的队列的例子中,线程在执行 push 操作时,必须等待另一个 push 操作流程的完成。这样等待线程就会陷入到忙等待循环中,当线程尝试失败时会继续循环,这会浪费 CPU 的计算周期。通过对算法的修改,当之前的线程还没有完成操作前,让等待线程执行未完成的步骤,就能让忙等待的线程不再阻塞。

参考

无锁数据结构简介

并发级别:阻塞、无障碍、无锁、无等待

程序员的自我修养(六):保护线程间的共享数据

一起来学POSIX thread 之 不变量、临界区、谓词

07 设计无锁的并发数据结构 | 《C++ Concurrency In Action》笔记

http://www.zh0ngtian.tech/posts/5b14525a.html

作者

zhongtian

发布于

2021-05-01

更新于

2023-12-16

许可协议

评论