pthread_rwlock库的实现

pthread_rwlock系列函数是pthread库的读写锁函数。随着版本的不同,它的实现也不同。
本篇的组织是:

  1. 前置知识
    1. GCC的扩展内联汇编
    2. Futex
  2. 2.17和2.30两个版本的lowlevellock的实现
  3. 2.17和2.30两个版本的pthread_rwlock的实现
    我们的结论是:
    旧版GLIBC使用lowlevellock实现pthread_rwlock;
    新版GLIBC使用原子操作实现pthread_rwlock。
    因此具有不同的ABI

扩展形式的GCC汇编介绍

在扩展形式的GCC汇编中,可以访问C语言中定义的变量,或者跳转到C语言定义的标签处。此外,GCC扩展汇编中可以用:去delimit各个operand parameter。
对于要访问的寄存器,并不一定要要显式指明,也可以留给GCC自己去选择,这可能让GCC更好去优化代码。
GCC扩展内联汇编格式如下:

1
2
3
4
5
asm asm-qualifiers ( AssemblerTemplate 
: OutputOperands
: InputOperands
: Clobbers
: GotoLabels)

OutputOperands、InputOperands等列表里面的项目会从0开始标号,并且可以使用%0%1的方法去表示,例如对下面的代码而言,%0old%1*Base%2Offset

1
2
3
4
5
6
7
8
9
bool old;

__asm__ ("btsl %2,%1\n\t" // Turn on zero-based bit #Offset in Base.
"sbb %0,%0" // Use the CF to calculate old.
: "=r" (old), "+rm" (*Base)
: "Ir" (Offset)
: "cc");

return old;

我们看看这个语法中的几个组成部分:

  1. AssemblerTemplate
    是ASM代码的模板。
  2. OutputOperands
    一个逗号分隔的列表,表示所有被这段代码修改的变量。
    其中+rm=r等表示Constraints,将在稍后介绍。
    在括号中的应该是一个C语言左值。
  3. InputOperands
    表示所有被这段代码访问的表达式。
  4. Clobbers
    除了OutputOperands之外,还被这段代码修改的内容,例如
    1. cc:eflags寄存器
    2. memory:内存
      例如我们常见的asm volatile("mfence" ::: "memory")用法。
  5. GotoLabels
    这段代码可能跳转到的C语言的标签。

Constraints

Simple Constraints

Simple Constraints都是字母或数字,表示允许其中的某一种类的操作。

  1. r表示允许寄存器操作;m表示允许内存操作。这两个是很泛化的内存和寄存器操作。
  2. a表示地址寄存器,f表示浮点寄存器,d表示数据寄存器。这些都对特定的处理器适用。
  3. 数字表示matching constraint

Modifiers

Modifiers用来描述operands的读写性。

  1. +
    表示这个值在操作中会被读写
  2. =
    表示这个值只会被写。
  3. &
  4. %

Futex介绍

Futex是一个系统调用。在使用Futex时,大部分的工作是在用户态完成的,只有当程序很有可能阻塞较长时间时,才会真的去使用这个系统调用。
Futex值的加载、Futex值和传入的expected value的比较以及实际的阻塞都是原子的。并且,不同线程对同一个Futex值的并发操作是Total order的。

Futex调用如下所示

1
2
3
long futex(uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3);

它的实现类似下面

1
2
3
4
5
6
7
static int
futex(uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3)
{
return syscall(SYS_futex, uaddr, futex_op, val,
timeout, uaddr2, val3);
}

其中uaddr指向一个Futex值,它在32位或者64位系统上都是一个32位的值。
futex_op包含两部分,要执行的操作(operation),以及操作的option。
option包含下面几个常量:

  1. FUTEX_PRIVATE_FLAG:128
    【Since 2.6.22】
    这个option表示这个Futex是否是被跨进程共享的。指定这个flag能够允许内核做一系列优化。
    对于所有要执行的操作,提供了带_PRIVATE版本后缀的宏,其作用相当于or上了这个FUTEX_PRIVATE_FLAG
    因为这个宏在22版本之后才有,所以我们看到nptl的一些实现上会使用__ASSUME_PRIVATE_FUTEX判定Linux是否支持Private Futex这个功能。我们可以认为在22版本之后的Linux下这个宏始终是1

    1
    2
    3
    4
    5
    // kernel-features.h
    /* Support for private futexes was added in 2.6.22. */
    #if __LINUX_KERNEL_VERSION >= 0x020616
    # define __ASSUME_PRIVATE_FUTEX 1
    #endif
  2. FUTEX_CLOCK_REALTIME:256
    【Since 2.6.28】
    这个option指定timeout的计算方式

操作部分包含下面几个常量:

  1. FUTEX_WAIT:0
    这个操作检查uaddr指向的值是否等于val
    如果是,那么就会睡在这个Futex上面,直到另一个FUTEXT_WAKE被调用。这个读取、比较和开始睡眠的过程是原子的。
    如果不是,那么就会立即返回EAGAIN
    之所以要在这个操作里面再比较一次val,而不是直接加锁,其原因是为了防止丢失wake up事件。例如,如果在本线程准备阻塞之后,另一个线程修改了Futex的值,并且遵循了下面的时序:
    1. 对方线程先修改Futex值
    2. 对方线程执行FUTEX_WAKE
    3. 本线程执行FUTEX_WAIT
      那么在执行FUTEX_WAIT时同步检查val的方案就能发现Futex值变化了,并且不进入睡眠。
  2. FUTEX_WAKE:1
    这个操作唤醒最多val个睡在这个Futex上面的线程。指定INT_MAX表示唤起所有线程。
  3. FUTEX_FD:2
    【已被移除】
    表示为当前Futex创建一个fd,当这个Futex发生FUTEX_WAKE调用时,这个fd被select、poll和epoll等可读。
  4. FUTEX_REQUEUE:3
    FUTEX_CMP_REQUEUE的不带check的简化版。
  5. FUTEX_CMP_REQUEUE:4
    TODO
  6. FUTEX_WAKE_OP:5
    这个操作用来支持在用户态中对多个Futex同时操作的情形。例如pthread_cond_signal的实现中需要用到两个Futex,一个用来实现Mutex,一个用来管理和CV相连的Wait queue。通过这个操作可以避免一些contention或者上下文切换。
    这个op的主要过程是:
    1. 保存uaddr2的旧值到oldval,并且对uaddr2执行操作。这个操作是原子的。
    2. 唤醒uaddr上的val个线程。
    3. 如果oldval的值满足一定条件,则唤起val2个线程。
  7. FUTEX_LOCK_PI:6
    这里的PI表示”priority inheritance”,用来处理所谓的优先级倒置问题。
    它的解决方案是,当一个高优先级的任务被一个低优先级任务持有的锁阻塞时,会暂时提高这个低优先级任务的优先级为高优先级,这样它就不会被对方抢占。
    注意,这个”priority inheritance”的实现也必须是可传递的。也就是当这个低优先级任务在等待另一个中等优先级任务的锁的时候,所有的任务都要被提升为高优先级。
  8. FUTEX_UNLOCK_PI:7
  9. FUTEX_TRYLOCK_PI:8
  10. FUTEX_WAIT_BITSET:9
  11. FUTEX_WAKE_BITSET:10
  12. FUTEX_WAIT_REQUEUE_PI:11
  13. FUTEX_CMP_REQUEUE_PI:12

下面我们讨论futex调用的返回值。首先如果发生错误,就按照通常规矩,返回-1,并且设置errno。对于成功的情况,需要根据op讨论:

  1. FUTEX_WAIT
    返回0表示caller被唤醒了。需要注意的是,这个0有可能是spurious wake-up,所以在这之后,仍然需要根据Futex值的具体值去判断是否继续block。我认为这也是为什么__lll_lock_wait_private的实现中有一个while循环的原因。
  2. FUTEX_WAKE
    返回唤醒了多少个waiter。

lll的GLIBC2.17版本

在这个版本中,lll的实现是FUTEX。
我们只看PRIVATE的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// nptl/sysdeps/unix/sysv/linux/x86_64
#define lll_lock(futex, private) \
(void) \
({ int ignore1, ignore2, ignore3; \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__asm __volatile (__lll_lock_asm_start \
".subsection 1\n\t" \
".type _L_lock_%=, @function\n" \
"_L_lock_%=:\n" \
"1:\tlea %2, %%" RDI_LP "\n" \
"2:\tsub $128, %%" RSP_LP "\n" \
"3:\tcallq __lll_lock_wait_private\n" \
"4:\tadd $128, %%" RSP_LP "\n" \
"5:\tjmp 24f\n" \
"6:\t.size _L_lock_%=, 6b-1b\n\t" \
".previous\n" \
LLL_STUB_UNWIND_INFO_5 \
"24:" \
: "=S" (ignore1), "=&D" (ignore2), "=m" (futex), \
"=a" (ignore3) \
: "0" (1), "m" (futex), "3" (0) \
: "cx", "r11", "cc", "memory"); \
else \
...
})

查看__lll_lock_wait_private函数的调用。下面两个lll_futex_wait的含义是如果futex的值是2,那么就会进行等待。
那么为什么要写成两个呢?如果我们已经观察到futex是2,即已经被加锁了,那么我们就直接去wait了,否则我们可以尝试加锁。

1
2
3
4
5
6
void
__lll_lock_wait_private (int *futex)
{
if (*futex == 2)
lll_futex_wait (futex, 2, LLL_PRIVATE);
...

但是加锁的过程也未必成功,可能有两个线程同时过了上面的检验,因此我们还需要进行一次判断。加锁操作是由atomic_exchange_acq调用,它会尝试将futex设置为2,并且返回原值。在while循环中,我们会判断返回的原值是否为0,如果不是0,那么说明这个锁已经被另一个线程加了,所以我们直接wait到这个锁被释放,然后在重新while一次尝试加锁。

1
2
3
4
...
while (atomic_exchange_acq (futex, 2) != 0)
lll_futex_wait (futex, 2, LLL_PRIVATE);
}

辅助函数解释

lll_futex_wait

查看lll_futex_wait函数

1
2
3
4
5
6
7
8
9
10
#define lll_futex_wait(futex, val, private) \
lll_futex_timed_wait(futex, val, NULL, private)


#define lll_futex_timed_wait(futex, val, timeout, private) \
({ \
register const struct timespec *__to __asm ("r10") = timeout; \
int __status; \
register __typeof (val) _val __asm ("edx") = (val); \
...

解释一下:

  1. __status这个变量在一个地址寄存器上,是这个汇编段的Output。

  2. SYS_futex TODO

  3. futex

  4. __lll_private_flag根据传入的两个参数决定是否产生private的futex。需要注意libc和libdl中的所有的futex都应该是private的。
    检查这个函数的实现,有一句很神奇的代码。这个代码的结果是,如果private设置了FUTEX_PRIVATE_FLAG位,那么就将这个位清空。要解释清楚这个问题,需要结合后面rwlock的初始化过程来看。

    1
    2
    #  define __lll_private_flag(fl, private) \
    (((fl) | FUTEX_PRIVATE_FLAG) ^ (private))
  5. cc表示eflags寄存器也会被修改。

1
2
3
4
5
6
7
8
9
...
__asm __volatile ("syscall" \
: "=a" (__status) \
: "0" (SYS_futex), "D" (futex), \
"S" (__lll_private_flag (FUTEX_WAIT, private)), \
"d" (_val), "r" (__to) \
: "memory", "cc", "r11", "cx"); \
__status; \
})

atomic_exchange_acq

lll的GLIBC2.30版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// lowlevellock.h
/* This is an expression rather than a statement even though its value is
void, so that it can be used in a comma expression or as an expression
that's cast to void. */
/* The inner conditional compiles to a call to __lll_lock_wait_private if
private is known at compile time to be LLL_PRIVATE, and to a call to
__lll_lock_wait otherwise. */
/* If FUTEX is 0 (not acquired), set to 1 (acquired with no waiters) and
return. Otherwise, ensure that it is >1 (acquired, possibly with waiters)
and then block until we acquire the lock, at which point FUTEX will still be
>1. The lock is always acquired on return. */
#define __lll_lock(futex, private) \
((void) \
({ \
int *__futex = (futex); \
if (__glibc_unlikely \
(atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \
{ \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__lll_lock_wait_private (__futex); \
else \
__lll_lock_wait (__futex, private); \
} \
}))
#define lll_lock(futex, private) \
__lll_lock (&(futex), private)

rwlock的GLIBC2.17实现

INIT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// pthread_rwlock_init.c
int
__pthread_rwlock_init (rwlock, attr)
pthread_rwlock_t *rwlock;
const pthread_rwlockattr_t *attr;
{
const struct pthread_rwlockattr *iattr;

iattr = ((const struct pthread_rwlockattr *) attr) ?: &default_attr;

memset (rwlock, '\0', sizeof (*rwlock));

rwlock->__data.__flags
= iattr->lockkind == PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP;

...

__ASSUME_PRIVATE_FUTEX这个宏表示是否支持Private Futex,在2.6.22之前是不支持的,因此我们看到在对应分支没有使用FUTEX_PRIVATE_FLAG这个宏,而是借助了THREAD_GETMEM的实现。
FUTEX_PRIVATE_FLAG的值表示这个Futex是否是Private的,在之前已经介绍过。

__SHARED这个字段我觉得有点奇怪了,我不是很明白为啥Futex用Private,而pthread用Shared,正好这两个值是相反的。源码在注释里面给了下面这个转换表,容易看出来在不支持FUTEX_PRIVATE_FLAG的时候就都是0。

1
2
3
4
5
            |     pshared     |     result
| shared private | shared private |
------------+-----------------+-----------------+
!avail 0 | 0 0 | 0 0 |
avail 0x80 | 0x80 0 | 0 0x80 |

不管怎样吧,下面我们设置_shared的值,正好和Private相反。也就是如果我们希望我们的rwlock是Private的,那么我们就清空FUTEX_PRIVATE_FLAG位;如果我们希望它是Shared,那么就设置FUTEX_PRIVATE_FLAG。这样我们xor一下就能得到Private的实际值,这实际上对应了我们先前在lll_futex_timed_wait这个函数上的困惑。

1
2
3
4
5
...
#ifdef __ASSUME_PRIVATE_FUTEX
rwlock->__data.__shared = (iattr->pshared == PTHREAD_PROCESS_PRIVATE
? 0 : FUTEX_PRIVATE_FLAG);
...

由于

1
2
3
4
5
6
7
8
9
10
11
...
#else
rwlock->__data.__shared = (iattr->pshared == PTHREAD_PROCESS_PRIVATE
? 0
: THREAD_GETMEM (THREAD_SELF,
header.private_futex));
#endif

return 0;
}
strong_alias (__pthread_rwlock_init, pthread_rwlock_init)

LOCK

1
2
3
4
5
6
7
// pthread_rwlock_wrlock.c
/* Acquire write lock for RWLOCK. */
int
__pthread_rwlock_wrlock (rwlock)
pthread_rwlock_t *rwlock;
{
int result = 0;

这个LIBC_PROBE定义在include/stap-probe.h里面,实际上是一个systemtap静态检查点的功能。

1
2
3
...
LIBC_PROBE (wrlock_entry, 1, rwlock);
...

下面调用lll_lock去加lowlevellock锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
...
/* Make sure we are alone. */
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);

while (1)
{
/* Get the rwlock if there is no writer and no reader. */
if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0)
{
/* Mark self as writer. */
rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);

LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
break;
}

/* Make sure we are not holding the rwlock as a writer. This is
a deadlock situation we recognize and report. */
if (__builtin_expect (rwlock->__data.__writer
== THREAD_GETMEM (THREAD_SELF, tid), 0))
{
result = EDEADLK;
break;
}

/* Remember that we are a writer. */
if (++rwlock->__data.__nr_writers_queued == 0)
{
/* Overflow on number of queued writers. */
--rwlock->__data.__nr_writers_queued;
result = EAGAIN;
break;
}

int waitval = rwlock->__data.__writer_wakeup;

/* Free the lock. */
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

/* Wait for the writer or reader(s) to finish. */
lll_futex_wait (&rwlock->__data.__writer_wakeup, waitval,
rwlock->__data.__shared);

/* Get the lock. */
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);

/* To start over again, remove the thread from the writer list. */
--rwlock->__data.__nr_writers_queued;
}

/* We are done, free the lock. */
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

return result;
}

weak_alias (__pthread_rwlock_wrlock, pthread_rwlock_wrlock)
hidden_def (__pthread_rwlock_wrlock)

辅助函数详解

THREAD_GETMEM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* Read member of the thread descriptor directly.  */
# define THREAD_GETMEM(descr, member) \
({ __typeof (descr->member) __value; \
if (sizeof (__value) == 1) \
asm volatile ("movb %%fs:%P2,%b0" \
: "=q" (__value) \
: "0" (0), "i" (offsetof (struct pthread, member))); \
else if (sizeof (__value) == 4) \
asm volatile ("movl %%fs:%P1,%0" \
: "=r" (__value) \
: "i" (offsetof (struct pthread, member))); \
else \
{ \
if (sizeof (__value) != 8) \
/* There should not be any value with a size other than 1, \
4 or 8. */ \
abort (); \
\
asm volatile ("movq %%fs:%P1,%q0" \
: "=r" (__value) \
: "i" (offsetof (struct pthread, member))); \
} \
__value; })

rwlock的GLIBC2.30实现

INIT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// pthread_rwlock_init.c
/* See pthread_rwlock_common.c. */
int
__pthread_rwlock_init (pthread_rwlock_t *rwlock,
const pthread_rwlockattr_t *attr)
{
ASSERT_TYPE_SIZE (pthread_rwlock_t, __SIZEOF_PTHREAD_RWLOCK_T);

const struct pthread_rwlockattr *iattr;

iattr = ((const struct pthread_rwlockattr *) attr) ?: &default_rwlockattr;

memset (rwlock, '\0', sizeof (*rwlock));

rwlock->__data.__flags = iattr->lockkind;

/* The value of __SHARED in a private rwlock must be zero. */
rwlock->__data.__shared = (iattr->pshared != PTHREAD_PROCESS_PRIVATE);

return 0;
}

strong_alias是C的一个别名机制,定义pthread_rwlock_init__pthread_rwlock_init的别名。别名包括strong和weak的。

1
strong_alias (__pthread_rwlock_init, pthread_rwlock_init)

它的实现在libc-symbols.h中。其中,__typeof (name) aliasname定义了一个aliasname,它的类型是name的类型。

1
2
3
4
5
/* Define ALIASNAME as a strong alias for NAME.  */
# define strong_alias(name, aliasname) _strong_alias(name, aliasname)
# define _strong_alias(name, aliasname) \
extern __typeof (name) aliasname __attribute__ ((alias (#name))) \
__attribute_copy__ (name);

LOCK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// pthread_rwlock_wrlock.c
/* See pthread_rwlock_common.c. */
int
__pthread_rwlock_wrlock (pthread_rwlock_t *rwlock)
{
LIBC_PROBE (wrlock_entry, 1, rwlock);

int result = __pthread_rwlock_wrlock_full (rwlock, CLOCK_REALTIME, NULL);
LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
return result;
}

weak_alias (__pthread_rwlock_wrlock, pthread_rwlock_wrlock)
hidden_def (__pthread_rwlock_wrlock)

__pthread_rwlock_wrlock_full使用原子操作实现的

1
// pthread_rwlock_common.c

Reference

  1. https://man7.org/linux/man-pages/man2/futex.2.html
    附注一下,Linux的man page后面的数字序号有下面的含义:
    1. 普通命令
    2. 系统调用
    3. 库函数
    4. 特殊文件,也就是/dev下的各种设备文件
    5. 文件的格式
    6. 游戏留的
    7. 附件,或者一些全局变量
    8. 是root专用的命令
  2. https://man7.org/linux/man-pages/man7/futex.7.html
  3. https://www.cnblogs.com/pslydff/p/7041444.html
  4. https://gohalo.me/post/program-c-gdb-deadlock-analyze-introduce.html