Lockfree Queue

因为《并发编程重要概念及比较》文章过长,所以将其中无锁队列部分拆出来。

常见的用CAS实现的Lockfree算法例如并发缓冲队列,我们可以抽象成维护一个链表。

下面介绍几个经典实现,可以注意观察:

  1. 如何减少 CAS 操作数量
  2. 如何 GC

有锁链表——使用一把大锁

有锁链表——每个节点一把锁

Valois 无锁队列

首先对于一读一写的模型我们可以仅通过约束读指针和写指针的行为即可实现,并不需要接触并发模型。下面我们考虑多对多的模型,以 Valois 的论文Implementing Lock-Free Queues中的论述为例。

1
2
3
4
5
Initialize(){
head = new record();
head->next = NULL;
tail = head;
}

在 Enqueue 中,我们需要维护 p 和 tail 两个变量。

1
2
3
4
5
6
7
8
9
10
11
Enqueue(x) {
q = new record();
q->value = x;
q->next = NULL;

do {
p = tail; // 使用tail维护链表尾指针的位置
} while( CAS(p->next, NULL, q) != true); // 1

CAS(tail, p, q); // 2
}

【Q】这里有一个疑问,就是为什么2这句不使用循环保护起来,以确保成功呢?这是因为这个语句是始终能够成功的。我们考虑:

  1. 线程T1成功进行了1处的CAS,它就会使得tail->next不为NULL
  2. 线程T2执行到1,那么它的CAS一定是失败
    这是因为tail还是老的值,但是tail->next已经被线程T1更新过,不是0了。这个过程一直到语句2之后tail被成功更新成q,因此实际上可以tail->next看成一个锁一样的东西

不过,这样我们就发现了一个违背锁无关性质的问题,也就是当线程T1在执行语句2时挂掉了,那就会阻塞所有其他在循环中的线程。其实这里可以想到一个优化,就是如果其他线程执行语句1失败了,仍然可以帮助T1更新完tail。

深入思考一下,原因在于两个CAS操作1和2并不是原子的,所以可能出现某个线程执行了1,但没有执行2的中间状态。换句话说,这个版本中语句p = tail中的tail并不一定是结尾。这也导致了为了维护离开循环时p必须指向结尾这个特性,线程需要在循环内自旋,从而导致上述的死锁现象的产生。于是想到,tail和next真的都是必要的么?事实上,即使我不记录tail,那么这个链表也是正确的啊

为了解决问题,在这一版本中我们索性放宽假设,认为tail只是“接近”结尾,因此现在我们需要使用一个内层的while循环(指3处的while)来从tail开始尝试更新结尾,此时tail存在的目的是为了减少我们next的数量。因此我们提出下面的改良版,不过在此之前,需要再研究下Dequeue的实现。

下面是Dequeue的实现,需要额外考虑两种情况下和Enqueue会不会产生冲突:

  1. 链表中只有一个元素
    那么head和tail指向的就会是同一个节点。为了解决这个问题,增加一个dummy节点作为head,并且每次弹出head->next而不是head
    这样,Enqueue的时候不需要访问head,Dequeue的时候,不需要访问tail。MS 队列中指出,这样的好处是可以避免潜在的死锁问题。
  2. 链表为空
    此时,head和tail应当指向同一个节点。但这种实现有可能会破坏这个性质。考虑下面的图,我们假设T1在做Dequeue,正准备执行位于1处的判断。此时T2正在执行Enqueue插入节点A,并且刚执行完1处的CAS,此时p->next不为NULL了。回归到T1,此时1处的判断不成立,Dequeue就会把Enqueue新生成的节点取走。而等线程切换回来,T1还在傻傻地设置tail。Scott等人的论文中提到,这种方案会阻碍对Dequeued节点的释放。
1
2
3
4
5
6
7
8
9
10
DeQueue() {
do{
p = head;
// 判断是不是空节点
if (p->next == NULL){ // 1
return ERR_EMPTY_QUEUE;
}
} while( CAS(head, p, p->next) != TRUE );
return p->next->value;
}

改良版1

在改良版中,我们不再记录 tail 的值,而是用一个内部循环不断地 p->next 找 tail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
EnQueue(x)
{
q = new record();
q->value = x;
q->next = NULL;

p = tail;
oldp = tail;
do {
while (p->next != NULL) // 3
p = p->next;
} while( CAS(p->next, NULL, q) != TRUE); // 1

CAS(tail, oldp, q); // 2
}

观察改良版代码,即使T1线程挂在语句2,没能更新完tail指针,线程T2也可以自动跟踪到T1在1处的修改。

注意此时语句2可能失败,但这说明此时它应该失败。考虑下面的执行顺序:

  1. 原先链表中只有一个元素1
  2. 此时线程T1添加了一个元素2,并且成功执行语句1,将p指向了元素2的位置
  3. 此时发生了调度,线程T2获得处理器,它需要在队列中加一个元素3
    T2在刚进入循环时它发现自己的tail是指向1的,这是因为此时T1还没有更新tail指针。
    但T2在内层的while循环中根据pnext指针走到了刚被T1添加进去的元素2处。因此T2在元素2的末尾增加了元素3,并且更新自己的p指向元素3。
    T2继续执行语句2,此时tail == oldp指向元素1,所以CAS成功,tail指向了元素3。
    现在,T1重新获得了处理器,此时tail已经被T2修改到指向元素3了,于是不能匹配oldp,这个CAS就会失败,因为它试图更新一个较旧的值。

容易看到这个失败不会影响tail指向精确的队列结尾。但是如果稍稍修改下上面的运行顺序,按照

1
2
3
4
5
  T1          T2
添加元素2
添加元素3
修改tail
修改tail(失败)

来执行,那会发现tail被更新到指向元素2而不是元素3。所以看到先前放宽的假设是非常有必要的,在论文中作者指出这种情况下tail指针距离列表的准确结束位置最多相差2 * p - 1个节点。其实这个“最多”还是有点多的,所以在实践中我们常常结合两种方案来使用。

Michael和Scott队列

Michael和Scott在1996年提出了另一种无锁队列的实现方法。在论文里还介绍了有锁实现,我们暂时不讨论。

在指针 pointer_t 中引入了一个 count,也就是论文中提到的 modification count,其作用是解决 ABA 问题。
给定一个 p: pointer_t,则p.ptr->next表示后继,是个pointer_t对象。
和 Valois 的算法一样,MS 队列的 Head 指针同样是一个 dummy 节点。 Tail 指针指向的是倒数第一个或者倒数第二个节点。
为了允许出队函数释放被弹出的 node,我们需要保证 Tail 并不指向被弹出的节点,或者任何它的前驱 node。因此我们甚至可以安全地重新使用这些节点

1
2
3
4
5
6
7
8
structure pointer_t {ptr: pointer to node_t, count: unsigned integer}
structure node_t {value: data type, next: pointer_t}
structure queue_t {Head: pointer_t, Tail: pointer_t}

initialize(Q: pointer to queue_t)
node = new node() # Allocate a free node
node–>next.ptr = NULL # Make it the only node in the linked list
Q–>Head = Q–>Tail = node # Both Head and Tail point to it

插入的过程主要是:

  1. 不断重复,直到 Enqueue 成功
    1. 记录当前队尾 Q->Tail 到 tail
    2. 记录 tail 的后继对应的 pointer_t 到 next
    3. E7 判断此时记录的 tail 是否还是队尾
      如果不是,则下一轮循环。
    4. E8 判断此时 tail.ptr–>next 指向的 node_t 为 NULL
      1. 如果是,说明此时 tail 还是队尾
        E9 尝试将队尾改为 <node, next.count+1>
        E9的CAS要成功,则tail.ptr–>next==next==NULL
      2. 如果不是,说明 tail 不是队尾了
        此时 tail 指向倒数第二个 node。
        E13 我们更新一下 Q->tail,指向 tail.ptr–>next
  2. Enqueue 完成
    E17 将 Q->Tail 修改为 <node, tail.count+1>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
enqueue(Q: pointer to queue_t, value: data type)
E1: node = new node() # Allocate a new node from the free list
E2: node–>value = value # Copy enqueued value into node
E3: node–>next.ptr = NULL # Set next pointer of node to NULL
E4: loop # Keep trying until Enqueue is done
E5: tail = Q–>Tail # Read Tail.ptr and Tail.count together
E6: next = tail.ptr–>next # Read next ptr and count fields together
E7: if tail == Q–>Tail # Are tail and next consistent? 检查tail有没有被更新
E8: if next.ptr == NULL # Was Tail pointing to the last node? 检查tail.next有没有被更新
E9: if CAS(&tail.ptr–>next, next, <node, next.count+1>) # Try to link node at the end of the linked list
E10: break # Enqueue is done. Exit loop
E11: endif
E12: else # Tail was not pointing to the last node 此时Tail不再是最后一个节点了,我们需要重新更新一下Tail
E13: CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) # Try to swing Tail to the next node
E14: endif
E15: endif
E16: endloop
E17: CAS(&Q–>Tail, tail, <node, tail.count+1>) # Enqueue is done. Try to swing Tail to the inserted node

需要注意这里的free不是简单的delete,如果简单 delete,会 use after free。
事实上,无锁算法中,GC 是一个难点。它在于当线程释放了一块内存时,是无法获知是否有别的线程也同时持有该块内存的指针并需要访问。所以 MS 队列的作者之一就提出了 HazardPointer 的思路。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean
D1: loop # Keep trying until Dequeue is done
D2: head = Q–>Head # Read Head
D3: tail = Q–>Tail # Read Tail
D4: next = head–>next # Read Head.ptr–>next
D5: if head == Q–>Head # Are head, tail, and next consistent?如果Head不是最新的,说明有人Dequeue了
D6: if head.ptr == tail.ptr # Is queue empty or Tail falling behind?检查队列是不是空,或者是Tail落后了(Valois算法的一个问题)
D7: if next.ptr == NULL # Is queue empty?如果Head和Tail的next相等都为NULL,比如初始情况
D8: return FALSE # Queue is empty, couldn’t dequeue
D9: endif
D10: CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) # Tail is falling behind. Try to advance it。如果 Tail 落后了,就用next.ptr更新Q->Tail
D11: else # No need to deal with Tail
# Read value before CAS, otherwise another dequeue might free the next node
D12: *pvalue = next.ptr–>value
D13: if CAS(&Q–>Head, head, <next.ptr, head.count+1>) # Try to swing Head to the next node
D14: break # Dequeue is done. Exit loop
D15: endif
D16: endif
D17: endif
D18: endloop
D19: free(head.ptr) # It is safe now to free the old dummy node
D20: return TRUE # Queue was not empty, dequeue succeeded

Safety

我们要证明下面的性质始终成立:

  1. 链表始终是连接上的
  2. 新节点只插入到最后一个节点后面
  3. 节点只在链表头删除
  4. Head 始终指向链表头
  5. Tail 始终指向链表中的某个节点

证明:
首先,在一开始所有的性质都成立。那么假设 ABA 不发生,则:

  1. 一个节点不会被设置为 NULL,除非它被释放。而只有当节点从链表头被删除之后,才会被释放【性质3】。
  2. 通过 Tail 始终能找到链表中的某个节点【性质5】,新的节点只会插入到 next 为 NULL 的节点后面,也就是链表的最后一个节点【性质1】。
  3. 因为只通过 Head 删除,但 Head 始终指向链表头【性质4】。
  4. Head 的值只会在一种情况下变化,也就是在删除时,原子地切换到 next 上。Head 不可能为 NULL,因为如果队列中只有一个元素,dequeue 会直接返回。
  5. Tail 永远不会落后于 Head,所以它不会指向一个被删除了的节点。并且,Tail 只会在 next 不为 NULL 时,切换到 next。

Liveness

我们将证明上面这个算法是Non-Blocking的,主要思路是证明如果循环判断条件触发了超过一次,那么必然有另外一个进程完成了操作,那么整体来说,整个算法就是一直在往前运行的。首先考虑Enqueue情况:

  1. E7
    如果E7不满足,说明在E5后,Tail被另一个进程修改了。Tail永远指向最后一个,或者倒数第二个节点。所以如果E7失败超过1次,那么另一个进程一定成功完成了一次Enqueue。
  2. E8
    如果E8失败,说明Tail此时正指向倒数第二个节点。那么在E13的CAS后,Tail就会指向链表中最后一个节点,除非另一个进程又Enqueue了一个。因此,如果E8又失败了一次,说明另一个进程一定成功完成了一次Enqueue。
  3. E9
    E9 处的 CAS 失败,说明一个进程成功Enqueue了一个元素。

下面是Dequeue情况:

  1. D5/D13
    如果D5的判断不满足,或者D13的CAS失败了,说明Head被另一个线程修改过了,而Head只有在被成功Dequeue(E13)的时候才会被修改。
  2. D6
    如果D6满足,并且此时链表是不空的,说明此时Tail正指向倒数第二个节点,也就是正数第一个节点。
    那么在D10的CAS之后,Tail一定指向最后一个节点了,除非此时又有一个进程完成一次Enqueue操作。
    所以,如果D6的条件满足了多于1次,那么另一个进程一定Enqueue成功,并且同一个或者另一个进程又Dequeue了一个item

如何解决ABA问题