C++ 协程的使用

在上一篇中,介绍了 lewissbaker 的三篇文章,实际上覆盖了 C++ 的无栈协程的实现原理,这里介绍几个常见的协程库的使用。

cppcoro

https://github.com/lewissbaker/cppcoro

修改既有代码

io

协程调度的核心思想是避免阻塞线程,而 std::mutex 的同步原语通常会导致线程阻塞。因此,需要用 cppcoro::async_mutex 进行替换。

注意 std::unique_lock 等这样的锁结构并不是同步原语。它的实现中,没有包含线程上下文切换的部分。而相关的逻辑,实际上是通过调用 std::muex 的 lock 和 unlock 函数来实现的。

尽管如此,cppcoro 中还是提供了诸如 async_mutex_lock 等结构。但这样的结构是为了实现协程而服务的。可以看对应的章节。

主要数据结构

task

一段可以被异步计算的逻辑,它是被 lazily 执行的。当 await 它的时候,它开始执行。

下面给了一个读写文件统计行数的例子,用到了:

  1. co_await cppcoro::read_only_file::open(path);
  2. co_await file.read(offset, buffer, sizeof(buffer));

一个 co_await 的函数必须用到 co_await 或者 co_return,但未必会用到 co_yield。返回值的类型为 task<T>
当一个返回 task<T> 的协程被调用时,一个 coroutine frame 会在必要的时候被调用,并且一些参数会被捕获到 coroutine frame 中。在 coroutine body 被执行前,协程就会挂起,然后返回给调用者。协程的返回值是一个 task<T>

当这个 task<T> 被 co_await 的时候,coroutine body 会开始执行。这会挂起 await 的 coroutine,然后执行被 await 的那个 coroutine。

task<T> 对应的协程通常以 co_return 或者抛出一个异常为结束。之后,在这个线程上,caller 会被 resume。

如果一个协程已经计算得到了结果,那么 await 它不会导致挂起,而是直接返回结果。

如果在 await 之前,那个 task 对象就已经被销毁了,那么这个协程就不会被执行,并且析构函数会销毁被捕获的参数,并且释放被 coroutine frame 使用的内存。

shared_task

我理解是单生产者多消费者模式。

generator / recursive_generator

generator

用法如下所示,但是其中不能使用 co_await,也就是说必须同步地去计算并产生这些值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cppcoro::generator<const std::uint64_t> fibonacci()
{
std::uint64_t a = 0, b = 1;
while (true)
{
co_yield b;
auto tmp = a;
a = b;
b += tmp;
}
}

void usage()
{
for (auto i : fibonacci())
{
if (i > 1'000'000) break;
std::cout << i << std::endl;
}
}

将 fmap 应用到 recursive_generator 将产生 generator 而不是 recursive_generator

recursive_generator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Lists the immediate contents of a directory.
cppcoro::generator<dir_entry> list_directory(std::filesystem::path path);

cppcoro::recursive_generator<dir_entry> list_directory_recursive(std::filesystem::path path)
{
for (auto& entry : list_directory(path))
{
co_yield entry;
if (entry.is_directory())
{
co_yield list_directory_recursive(entry.path());
}
}
}

async_generator

如下所示,可以 co_await 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
cppcoro::async_generator<int> ticker(int count, threadpool& tp)
{
for (int i = 0; i < count; ++i)
{
co_await tp.delay(std::chrono::seconds(1));
co_yield i;
}
}

cppcoro::task<> consumer(threadpool& tp)
{
auto sequence = ticker(10, tp);
for co_await(std::uint32_t i : sequence)
{
std::cout << "Tick " << i << std::endl;
}
}

async_mutex

co_wait 这个 async_mutex 会挂起这个协程,直到这个 mutex 被释放。

这里和 std::mutex 不同的是,async_mutex 是 lock free 的。lock 它并不会 block 当前线程,而只是挂起当前的 coroutine。

如下所示,lock_async 调用会返回一个 async_mutex_lock_operation。这个 async_mutex_lock_operation 必须要被 co_await。co_await 的返回值的类型是 void。

scoped_lock_async 调用会返回一个 async_mutex_scoped_lock_operation。同样它也需要被 co_await。co_await 返回值的类型是 async_mutex_lock。async_mutex_lock 在析构的时候会自动调用持有的 mutex 的 unlock 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// <cppcoro/async_mutex.hpp>
namespace cppcoro
{
class async_mutex_lock;
class async_mutex_lock_operation;
class async_mutex_scoped_lock_operation;

class async_mutex
{
public:
async_mutex() noexcept;
~async_mutex();

async_mutex(const async_mutex&) = delete;
async_mutex& operator(const async_mutex&) = delete;

bool try_lock() noexcept;
async_mutex_lock_operation lock_async() noexcept;
async_mutex_scoped_lock_operation scoped_lock_async() noexcept;
void unlock();
};

class async_mutex_lock_operation
{
public:
bool await_ready() const noexcept;
bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
void await_resume() const noexcept;
};

class async_mutex_scoped_lock_operation
{
public:
bool await_ready() const noexcept;
bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
[[nodiscard]] async_mutex_lock await_resume() const noexcept;
};

class async_mutex_lock
{
public:
// Takes ownership of the lock.
async_mutex_lock(async_mutex& mutex, std::adopt_lock_t) noexcept;

// Transfer ownership of the lock.
async_mutex_lock(async_mutex_lock&& other) noexcept;

async_mutex_lock(const async_mutex_lock&) = delete;
async_mutex_lock& operator=(const async_mutex_lock&) = delete;

// Releases the lock by calling unlock() on the mutex.
~async_mutex_lock();
};
}

sync_wait

类似于 rust tokio 的 block_on

when_all_ready/when_all

类似于 rust tokio 的 futures::join!。两者比较,则 when_all_ready 的功能更强大:

  1. when_all_ready 可以等所有的协程都成功或者失败地跑完,然后分别获取每个操作的结果,它本身不会抛出异常。而 when_all 中,只要有一个 task 抛出异常,那么整个 task 就会抛出异常然后失败。
  2. when_all_ready 的返回值是 when_all_task,需要调用 result() 获取结果。而 when_all 返回的是 void 或者 vector。

fmap