C++ Coroutine 介绍的翻译

翻译 lewissbaker 的三篇文章。

Coroutine Theory

暂略

Understanding operator co_await

https://lewissbaker.github.io/2017/11/17/understanding-operator-co-await

有两个接口需要定义,Promise 和 Awaiter。

Promist 接口定义了 coroutine 自己的行为,例如 coroutine 被调用的时候应该做什么,应该返回什么,并且定义 co_await 或者 co_yield 在 coroutine 中的行为。

Awaitable 接口定义了 co_await 的语义。当我们 co_await 一个对象时,这个 co_await 会被转化为一系列调用,负责挂起当前的 coroutine,执行一些帮助它后续被重新调度起来的命令,以及一些在 resume 之后生成 co_await 返回值的命令。

Awaiters and Awaitables: Explaining operator co_await

Awaitable

如果一个类型支持 co_await <expr>,它就是一个 Awaitable 类型。

Promise 类型可以通过 await_transform 方法去修改 co_await 的 expr。下面将没有实现 await_transform 的类型称为 Normally Awaitable。将实现了 await_transform 的称为 Contextually Awaitable,此时这个类型只支持在一些特定类型的 coroutine 中被调用。

Awaiter

一个 Awaiter 类型需要实现三个方法:await_ready, await_suspend 和 await_resume,它们加在一起组成了 co_await。

一个类型可以既是 Awaiter 又是 Awaitable。

获取 Awaiter

假设这个 awaiting coroutine 的 promise 对象的类型是 P,并且这个 promise 是对当前 coroutine 中的 P 实例的左值引用。

如果 P 有一个 await_transform 方法,那么 expr 就会被首先传给 promise.await_transform(<expr>),以获得对应的 Awaitable 对象。否则,expr 的结果就会直接被作为 Awaitable 对象。不妨令为 awaitable

Then, if the Awaitable object, awaitable, has an applicable operator co_await() overload then this is called to obtain the Awaiter object. Otherwise the object, awaitable, is used as the awaiter object.
然后,如果这个 Awaitable 对象 awaitable 有一个 operator co_await(),那么就可以通过调用它来获得一个 Awaiter 对象。否则就会直接使用 awaitable 作为 Awaiter 对象。

上面说的内容可以通过下面的代码来理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
template<typename P, typename T>
decltype(auto) get_awaitable(P& promise, T&& expr)
{
if constexpr (has_any_await_transform_member_v<P>)
return promise.await_transform(static_cast<T&&>(expr));
else
return static_cast<T&&>(expr);
}

template<typename Awaitable>
decltype(auto) get_awaiter(Awaitable&& awaitable)
{
if constexpr (has_member_operator_co_await_v<Awaitable>)
return static_cast<Awaitable&&>(awaitable).operator co_await();
else if constexpr (has_non_member_operator_co_await_v<Awaitable&&>)
return operator co_await(static_cast<Awaitable&&>(awaitable));
else
return static_cast<Awaitable&&>(awaitable);
}

【Q】为什么要区分 Awaiter 和 Awaitable 呢?

Awaiting the Awaiter

co_await <expr> 这个调用可以被转换成如下的形式。

注意,下面说的 caller 我理解就是 coroutine 的 caller。而 resumer 指的是 coroutine 在被重新调度执行后的“caller”。

await_suspend 有两个版本:

  1. 返回 void 的版本
    在 await_suspend 调用返回后,会无条件将执行权转移给 caller 或者 resumer。
  2. 返回 bool 的版本
    允许有条件地立即 resume 这个 coroutine,而不是将执行权转移给 caller 或者 resumer。
    一般来说,如果这个 awaiter 需要执行的异步操作在一些情况下可能同步地完成,那么就可以在 await_suspend 中返回 false,让 coroutine 立即 resume 从而执行后面的逻辑。

<suspend-coroutine> 处,编译期会生成一些代码,保存当前 coroutine 的状态以便后续恢复。比如存储 <resume-point> 的位置,以及将当前寄存器的状态保存在内存中等。

那么在 <suspend-coroutine> 之后就可以认为这个 coroutine 已经被 suspend 了。所以可以在 await_suspend 调用中首先可以观察到被挂起的 coroutine。此后,它可以被 resume 或者被 destroy。

await_suspend 还需要负责在 coroutine 的异步操作被完成后重新 resume 或者 destroy 掉这个 coroutine。

如果这个 coroutine 的异步操作是被同步完成的,就可以通过 await_ready 调用避免掉 <suspend-coroutine> 挂起 coroutine 的开销。
【Q】那么它和返回 bool 的 await_suspend 的区别是啥呢?感觉前者是给你决定要不要,后者是告诉你实际发生了什么。

<return-to-caller-or-resumer> 处,执行权会被重新转移给 caller 或者 resumer。此时会 popping the local stack frame but keeping the coroutine frame alive。

【Q】什么是 coroutine frame 呢?从下文可知,coroutine_handle 是一个 coroutine frame 的句柄,用来对它进行操作。可是它本体是啥呢?在前文中有介绍:

The ‘coroutine frame’ holds part of the coroutine’s activation frame that persists while the coroutine is suspended and the ‘stack frame’ part only exists while the coroutine is executing and is freed when the coroutine suspends and transfers execution back to the caller/resumer.

如果被挂起的 coroutine 最终是被 resume 的话,会在 <resume-point> 点被继续执行。

await_resume 调用的返回值会成为 co_await 的返回值。注意 await_resume 同样可能抛出异常,此时这个异常会被传播到 co_await 之外。

Note that if an exception propagates out of the await_suspend() call then the coroutine is automatically resumed and the exception propagates out of the co_await expression without calling await_resume().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
co_await (T expr)
{
auto&& value = <expr>;
auto&& awaitable = get_awaitable(promise, static_cast<decltype(value)>(value));
auto&& awaiter = get_awaiter(static_cast<decltype(awaitable)>(awaitable));
if (!awaiter.await_ready())
{
using handle_t = std::experimental::coroutine_handle<P>;

using await_suspend_result_t =
decltype(awaiter.await_suspend(handle_t::from_promise(p)));

<suspend-coroutine>
// 此后可以认为该 coroutine 已经被 suspend 了

// await_suspend 会处理诸如 rescheduling 的事情
if constexpr (std::is_void_v<await_suspend_result_t>)
{
awaiter.await_suspend(handle_t::from_promise(p));
<return-to-caller-or-resumer>
// 此后,执行权交还给 caller 或者 resumer
}
else
{
static_assert(
std::is_same_v<await_suspend_result_t, bool>,
"await_suspend() must return 'void' or 'bool'.");

if (awaiter.await_suspend(handle_t::from_promise(p)))
{
<return-to-caller-or-resumer>
}
}

<resume-point>
}

return awaiter.await_resume();
}

Coroutine Handles

在上文中,一个 coroutine_handle<P> 类型的对象会被传给 await_suspend,作为 await_suspend 的参数。这个类型是对 coroutine frame 的一个非 owning 的句柄。可以通过它来 resume 或者 destroy。同时,还可以用它来访问 coroutine 的 promise 对象。

coroutine_handle 有类似下面的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
namespace std::experimental
{
template<typename Promise>
struct coroutine_handle;

template<>
struct coroutine_handle<void>
{
bool done() const;

void resume();
void destroy();

void* address() const;
static coroutine_handle from_address(void* address);
};

template<typename Promise>
struct coroutine_handle : coroutine_handle<void>
{
Promise& promise() const;
static coroutine_handle from_promise(Promise& promise);

static coroutine_handle from_address(void* address);
};
}
  1. resume
    当异步动作完成,需要 resume 这个 coroutine 的时候,应该调用 resume 方法。此时会在 <resume-point> 继续执行 coroutine。
    对 resume 本身的调用会在 coroutine 下一次碰到 <return-to-caller-or-resumer> 的时候返回。
  2. destroy
    会销毁当前的 coroutine feame。
    一般来说不需要调用这个方法,除非是库的作者在实现 promise 类型的时候。
    一般来说,coroutine frame 会被调用 coroutine 时返回的 RAII 类型所持有。所以需要避免 double-destruction bug。
  3. promise
    返回 coroutine 的 promise 对象的引用。
    对于大多数 Normally Awaitable 类型,应当使用 coroutine_handle<void> 作为 await_suspend 的参数,而不是 coroutine_handle<Promise>
    coroutine_handle<P>::from_promise(P& promise) 这个函数可以由 coroutine promise 对象的引用来重新构造 coroutine_handle。注意必须要保证 P 和 coroutine frame 使用的 concrete promise type 是一致的。也就是说如果创建 coroutine_handle<Base>,但是实际的 promise type 是 Derived 会导致 UB。
  4. address/from_address
    将一个 coroutine handle 和 void* 指针进行互相转化。它的目的主要是和 C 语言的接口交互。
    但一般来说,在实现时经常发现还需要打包发送其他上下文,所以一般来说会将 coroutine_handle 放到一个结构中,并返回结构的指针。

Synchronisation-free async code

co_await 的一个作用是可以在 coroutine 被 suspend 之后,和被 caller/resumer 重新获得执行权之前的这段时间中执行代码。

也就是说,Awaiter 对象会在 coroutine 被 suspend 之后启动一个 async 操作,将 coroutine_handle 传给这个 async 操作,让它能在完成后去 resume 之前的 coroutine。注意这个 coroutine 可能是在另一个线程中被 resume 了。整个过程中并不需要任何的同步开销。

举个例子,一段代码在线程 A 中执行,使用 coroutine 去做一个异步读,那么 Awaiter 可以在 await_suspend 中启动这个异步读,而这个异步读是在线程 B 中被实际处理的。但线程 A 和线程 B 之间是没有任何同步开销的。比如没有通过条件变量或者 channel 进行等待。如下面所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Time     Thread A                           Thread B
| -------- --------
| .... Call OS - Wait for I/O event
| Call await_ready() |
| <supend-point> |
| Call await_suspend(handle) |
| Store handle in operation |
V Start AsyncFileRead ---+ V
+-----> <AsyncFileRead Completion Event>
Load coroutine_handle from operation
Call handle.resume()
<resume-point>
Call to await_resume()
execution continues....
Call to AsyncFileRead returns
Call to await_suspend() returns
<return-to-caller/resumer>

在上面的伪代码中还需要注意:

  1. 如果异步任务把 coroutine_handle 传给了另一个线程,那么这个线程就可能在 await_suspend 返回之前就 resume 这个 coroutine。这样,就会和 await_suspend 方法剩下来的部分竞争。
  2. 当 resume 一个 coroutine 的时候,首先需要调用 await_resume 去获得异步任务的结果。一般与此同时会立即析构掉 Awaiter 对象(可以看看上面的 demo)。因为 Awaiter 对象实际上就是 await_suspend 的 this 指针,所以 coroutine 可能会在 await_suspend 调用之前就 destruct 掉 coroutine 和 promise 对象。

因此,在 await_suspend 方法中,一旦 coroutine 可以在另一个线程上并发地 resume,就需要保证不会再去访问 this 或者 coroutine 的 promise() 对象了,因为它们可能都被销毁了。实际上,当 coroutine 已经 scheduled for resumption 的时候,唯一能安全访问的只有 await_suspend 中的本地变量了。

Comparison to Stackful Coroutines

和诸如 win32 的 fiber 或者 boost::context 这样的有栈协程相比:
在很多有栈协程中,suspend 操作通常会伴随着 resume 另一个 coroutine,从而组合成为一个 context-switch 操作。而这个操作会导致没有机会再 suspend 当前 coroutine 之后,以及将执行权转移到另一个 coroutine 之前执行一些逻辑。
而这就意味着如果需要实现一个类似的 async-file-read 操作,就需要在 suspend 这个 coroutine 之前就开启这个操作。因此这个操作可能就会在当前 coroutine 被 suspend 之前,就在另一个线程中被执行完了,而当前 coroutine 因此又要被 resume。这就在这两个线程之间引入了 race。而如上所述,C++ 的 coroutine 不需要在线程 A 和线程 B 之间引入同步机制。

There are probably ways around this by using a trampoline context that can start the operation on behalf of the initiating context after the initiating context has been suspended. However this would require extra infrastructure and an extra context-switch to make it work and it’s possible that the overhead this introduces would be greater than the cost of the synchronisation it’s trying to avoid.

Avoiding memory allocations

async 操作需要分配一些内存。比如在 win32 io 函数接口中,需要分配一个 OVERLAPPED 结构,这个结构需要在操作完成之后才会被释放。因此这样的结构必须要分配在堆上,并且每个 async 操作都需要 allocate 一次。因此在这里可以使用一个对象池来优化。

但是在 C++ 的 coroutine 中,可以避免堆内存分配,因为 local variable 在 coroutine 被 suspend 的时候会在 coroutine frame 里面,从而肯定是存活的。

将 per-operation state 存放在 Awaiter 对象中,可以白嫖 coroutine frame,从而延续到至少是 co_await expression 的 lifetime。一旦这个 operation 完成,coroutine 就会 resume,然后 Awaiter 对象就会被销毁。

当然,coroutine frame 本身还是会在堆上分配的,但是,一旦它被分配,是可以被用来执行很多个 async 操作的。这就好像是一个 arena memory allocator 一样,编译期可以在编译期计算出 local variable 的大小,然后就可以一次性分配出来了。

【Q】没太明白为啥 coroutine frame 是可以被复用的。我理解应该是指的可以 co_await 很多次。

An example: Implementing a simple thread-synchronisation primitive

下面是一个简单的多生产者-多消费者模型。如果 set 已经被调用过了,那么后续的 consumer 就不会 suspend 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
T value;
async_manual_reset_event event;

// A single call to produce a value
void producer()
{
value = some_long_running_computation();

// Publish the value by setting the event.
event.set();
}

// Supports multiple concurrent consumers
task<> consumer()
{
// Wait until the event is signalled by call to event.set()
// in the producer() function.
co_await event;

// Now it's safe to consume 'value'
// This is guaranteed to 'happen after' assignment to 'value'
std::cout << value << std::endl;
}

这里的设计就是用一个 std::atomic<void*> 指针。它要么指向 this,说明已经 set 了;要么指向一个链表的表头,表示正在 suspend 的链表。

在这里,我们也实现了上面提到的节省内存分配的方案,将链表的 node 分配在 awaiter 对象里面,而 awaiter 对象在 coroutine frame 上面。

总而言之,代码如下所示。它支持 co_await,所以是个 Awaitable 类型。co_await 操作符返回一个 awaiter 也就是后面要实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class async_manual_reset_event
{
public:

async_manual_reset_event(bool initiallySet = false) noexcept;

// No copying/moving
async_manual_reset_event(const async_manual_reset_event&) = delete;
async_manual_reset_event(async_manual_reset_event&&) = delete;
async_manual_reset_event& operator=(const async_manual_reset_event&) = delete;
async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;

bool is_set() const noexcept;

struct awaiter;
awaiter operator co_await() const noexcept;

void set() noexcept;
void reset() noexcept;

private:

friend struct awaiter;

// - 'this' => set state
// - otherwise => not set, head of linked list of awaiter*.
mutable std::atomic<void*> m_state;
};

这个 awaiter 如下所示:

  1. 首先,它要持有一个 Awaitable 对象的指针,这是因为它要知道自己是要 await 什么东西。
  2. 然后,它还要扮演一个在 awaiter 链表里面的一个节点的角色,所以它应该能访问自己后面的那个 awaiter。
  3. 然后,它还要存储 coroutine_handle 对象,这样当 await_suspend 被调用后,它能知道如何去 resume coroutine。因为我们没 await_transform 啥的,所以这里 coroutine_handle 对象就是 coroutine_handle<void>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct async_manual_reset_event::awaiter
{
awaiter(const async_manual_reset_event& event) noexcept
: m_event(event)
{}

bool await_ready() const noexcept;
bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept;
void await_resume() noexcept {}

private:

const async_manual_reset_event& m_event;
std::experimental::coroutine_handle<> m_awaitingCoroutine;
awaiter* m_next;
};

await_ready 要做的就是如果已经 set 了,就不再 suspend。

1
2
3
4
bool async_manual_reset_event::awaiter::await_ready() const noexcept
{
return m_event.is_set();
}

await_suspend 最为重要:

  1. 首先,它要保存 coroutine_handle,从而后续可以调用 coroutine_handle.resume() 方法。
  2. 然后,就要将 awaiter 放到链表里面
    将链表头的指针即 m_state 设置为 this。注意,这里的 this 是 awaiter;而 oldValue 是 async_manual_reset_event,即 Awaitable。
    如果添加成功,就返回 true,表示不会立即 resume 这个 coroutine。否则,就返回 false,表示可以立即 resume。这也回答了之前的一个【Q】,也就是 await_suspend 和 await_ready 的返回值到底作用有什么不同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
bool async_manual_reset_event::awaiter::await_suspend(
std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
// Special m_state value that indicates the event is in the 'set' state.
const void* const setState = &m_event;

// Remember the handle of the awaiting coroutine.
m_awaitingCoroutine = awaitingCoroutine;

// Try to atomically push this awaiter onto the front of the list.
void* oldValue = m_event.m_state.load(std::memory_order_acquire);
do
{
// Resume immediately if already in 'set' state.
if (oldValue == setState) return false;

// Update linked list to point at current head.
m_next = static_cast<awaiter*>(oldValue);

// Finally, try to swap the old list head, inserting this awaiter
// as the new list head.
} while (!m_event.m_state.compare_exchange_weak(
oldValue,
this,
std::memory_order_release,
std::memory_order_acquire));

// Successfully enqueued. Remain suspended.
return true;
}

【Q】这里 await_resume 起到了什么作用,为啥它是空的?

Filling out the rest of the event class

如果是 set 状态,则改为 nullptr。所以 m_state 有三种情况:

  1. nullptr
    没有 set,但是也没有 awaiter 在等
  2. oldValue
    set 了
  3. 其他
    有 awaiter 在等,并且指向了链表头
1
2
3
4
5
void async_manual_reset_event::reset() noexcept
{
void* oldValue = this;
m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}

下面是 set 操作。其实有点类似于 CV。实际上的行为就是对所有的 waiter 调用 coroutine_handle.resume()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void async_manual_reset_event::set() noexcept
{
// Needs to be 'release' so that subsequent 'co_await' has
// visibility of our prior writes.
// Needs to be 'acquire' so that we have visibility of prior
// writes by awaiting coroutines.
void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
if (oldValue != this)
{
// Wasn't already in 'set' state.
// Treat old value as head of a linked-list of waiters
// which we have now acquired and need to resume.
auto* waiters = static_cast<awaiter*>(oldValue);
while (waiters != nullptr)
{
// Read m_next before resuming the coroutine as resuming
// the coroutine will likely destroy the awaiter object.
auto* next = waiters->m_next;
waiters->m_awaitingCoroutine.resume();
waiters = next;
}
}
}

【Q】这里的 awaiter 都是在什么地方被析构的呢?我理解这里的 awaiter 都是 local variable,生命周期等同于 co_await 的 <expr>。可以看到,co_await 返回的就是这个 awaiter。

1
2
3
4
5
async_manual_reset_event::awaiter
async_manual_reset_event::operator co_await() const noexcept
{
return awaiter{ *this };
}

https://godbolt.org/g/Ad47tH 是源码

Understanding the promise type

https://lewissbaker.github.io/2018/09/05/understanding-the-promise-type

Coroutine Concepts

Promise 接口用来自定义 coroutine 自己的行为,比如它被调用的时候,或者它返回(无论是正常值还是异常)的时候。

Promise objects

字如其名,Promise 确实有类似 std::promise 的作用,但是它的功能更为衍生,实际上应该将它理解为一个 coroutine state controller。

比如,写了一个 coroutine function,编译器会转换成下面的形式,其中 <body-statements> 是函数体。

1
2
3
4
5
6
7
8
9
10
11
12
13
{
co_await promise.initial_suspend();
try
{
<body-statements>
}
catch (...)
{
promise.unhandled_exception();
}
FinalSuspend:
co_await promise.final_suspend();
}

当 coroutine 被调用的时候,会:

  1. 【可选】用 operator new 分配一个 coroutine frame。
    【Q】为什么这里是可选的?见下文。
  2. 将 parameter 拷贝到 coroutine frame 里面。
  3. 调用 P 的构造函数创建 promise 对象。
  4. 调用 promise.get_return_object() 方法会得到一个东西,可以在第一次 suspend 的时候返回给 caller。这个东西被保存为 local variable。
    我理解这个东西,也就是后面会看到的 task 就是 “Coroutine 本身”。特别地:
    • task 里面会持有一个 std::coroutine_handle 对象。
    • task 里面会支持 co_await 操作符,也就是说它是一个 Awaitable 对象。
    • promise 对象的类型就是 task::promise_type
  5. 调用 promise.initial_suspend(),并 co_await 结果。
  6. 当 co_await promise.initial_suspend() resume(这里同样可能不挂起直接返回),coroutine 开始执行 <body-statements>

在 co_return 被执行的时候,会:

  1. 调用 promise.return_void() 或者 promise.return_value(<expr>)
  2. 销毁所有的自动变量。
  3. 调用 promise.final_suspend(),并且 co_await 结果。

特别地,如果 <body-statements> 抛出异常,则:

  1. 捕获这个异常,并调用 promise.unhandled_exception()
  2. 调用 promise.final_suspend(),并且 co_await 结果。

一旦 execution propagates outside of the coroutine body,那么 coroutine frame 就会被销毁。此时:

  1. 调用 promise 对象的析构函数。
  2. 调用 parameter 的析构函数。
  3. 【可选】调用 operator delete 释放内存。
  4. 将执行权交还给 resumer 或者 caller。

当第一次执行到 <return-to-caller-or-resumer> 的时候,或者 coroutine 没有执行到这个点就完成了,那么这个 coroutine 要么是 suspend 了,要么是 destroy 了。此时这之前通过调用 promise.get_return_object() 得到的 return-object 会被直接返回给 caller。回顾下,这里的 <return-to-caller-or-resumer> 是在 co_await 中,执行完 await_suspend 之后的点。

Allocating a coroutine frame

First, the compiler generates a call to operator new to allocate memory for the coroutine frame.

If the promise type, P, defines a custom operator new method then that is called, otherwise the global operator new is called.

要点:

  1. operator new 分配的大小并不是 sizeof(P),而是整个 coroutine frame 的大小,这个是由编译器计算的。包含了 parameter,promise 对象,local variables 以及其他的一些用来存储 coroutine state 的结构。我理解之前我们白嫖的也是这一段的空间。
  2. 编译期可以省略这个 operator new,而直接在 caller 的 stack-frame 或者 coroutine-frame 中分配,当:
    • 可以断定 coroutine frame 的生命周期是小于 caller 的。
    • 并且编译期可以在调用的时候就能看到整个 coroutine frame 需要的大小。
      目前 Coroutine TS 并没有 guarantee 任何的 elision 的情况,所以我们要处理分配 coroutine frame 的时候出现 std::bad_alloc 的情况。这里有一些异常处理相关的问题,一般我们就直接 terminate 掉了。但如果 promise 对象支持静态的 P::get_return_object_on_allocation_failure() 函数,则可以不抛出异常。

Customising coroutine frame memory allocation

Your promise type can define an overload of operator new() that will be called instead of global-scope operator new if the compiler needs to allocate memory for a coroutine frame that uses your promise type.

For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct my_promise_type
{
void* operator new(std::size_t size)
{
void* ptr = my_custom_allocate(size);
if (!ptr) throw std::bad_alloc{};
return ptr;
}

void operator delete(void* ptr, std::size_t size)
{
my_custom_free(ptr, size);
}

...
};

同样,也可以提供一个 custom allocator。如下所示,可以提供一个重载版本的 P::operator new(),它额外接受诸如 allocator 这样的参数,这样就可以在 new 的时候调用 allocator.allocate() 来分配内存了。

这里有个问题,coroutine frame 中存储的 parameter 在 operator delete 之前就已经被析构了,那如何获得 allocator 呢?所以,为了能在 operator delete 中调用 allocator.deallocate(),我们要将 allocator 存在 allocatorOffset 上面。

简而言之,就是在创建 my_promise_type 之前分配空间的时候,多分配一部分空间,用来存放对应的 allocator。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
template<typename ALLOCATOR>
struct my_promise_type
{
template<typename... ARGS>
void* operator new(std::size_t sz, std::allocator_arg_t, ALLOCATOR& allocator, ARGS&... args)
{
// Round up sz to next multiple of ALLOCATOR alignment
std::size_t allocatorOffset =
(sz + alignof(ALLOCATOR) - 1u) & ~(alignof(ALLOCATOR) - 1u);

// Call onto allocator to allocate space for coroutine frame.
void* ptr = allocator.allocate(allocatorOffset + sizeof(ALLOCATOR));

// Take a copy of the allocator (assuming noexcept copy constructor here)
new (((char*)ptr) + allocatorOffset) ALLOCATOR(allocator);

return ptr;
}

void operator delete(void* ptr, std::size_t sz)
{
std::size_t allocatorOffset =
(sz + alignof(ALLOCATOR) - 1u) & ~(alignof(ALLOCATOR) - 1u);

ALLOCATOR& allocator = *reinterpret_cast<ALLOCATOR*>(
((char*)ptr) + allocatorOffset);

// Move allocator to local variable first so it isn't freeing its
// own memory from underneath itself.
// Assuming allocator move-constructor is noexcept here.
ALLOCATOR allocatorCopy = std::move(allocator);

// But don't forget to destruct allocator object in coroutine frame
allocator.~ALLOCATOR();

// Finally, free the memory using the allocator.
allocatorCopy.deallocate(ptr, allocatorOffset + sizeof(ALLOCATOR));
}
}

To hook up the custom my_promise_type to be used for coroutines that pass std::allocator_arg as the first parameter, you need to specialise the coroutine_traits class (see section on coroutine_traits below for more details).

For example:

1
2
3
4
5
6
7
8
namespace std::experimental
{
template<typename ALLOCATOR, typename... ARGS>
struct coroutine_traits<my_return_type, std::allocator_arg_t, ALLOCATOR, ARGS...>
{
using promise_type = my_promise_type<ALLOCATOR>;
};
}

Note that even if you customise the memory allocation strategy for a coroutine, the compiler is still allowed to elide the call to your memory allocator.

Copying parameters to the coroutine frame

The coroutine needs to copy any parameters passed to the coroutine function by the original caller into the coroutine frame so that they remain valid after the coroutine is suspended.

复制 parameter 到 coroutine frame 的目的是保证了 coroutine 被 suspend 之后,这些东西都还在。

  1. 如果是 by value 的复制,那么会调用 move-ctor。
  2. 如果是 by reference 的复制,无论是左值还是右值,那么只有引用本身会被复制,指向的对象是不会的。

对于只有 trivial destructor 的 parameter,编译器可以 elide 掉 copy,如果这个 parameter 在某个可达的 <return-to-caller-or-resumer> 之后就不再被访问了。

C++ 中用完美转发会比较多,这在 coroutine 中经常会导致 UB。原因就是传入了 reference。

If any of the parameter copy/move constructors throws an exception then any parameters already constructed are destructed, the coroutine frame is freed and the exception propagates back out to the caller.

Constructing the promise object

先复制 parameter 再构造 promise 的原因是允许 promise 对象可以基于复制后的 parameter 构建。

First, the compiler checks to see if there is an overload of the promise constructor that can accept lvalue references to each of the copied parameters. If the compiler finds such an overload then the compiler generates a call to that constructor overload. If it does not find such an overload then the compiler falls back to generating a call to the promise type’s default constructor.

这个听起来挺神奇的,好像是既支持“默认”的 aggregate initialization,又支持 default initialization。

Note that the ability for the promise constructor to “peek” at the parameters was a relatively recent change to the Coroutines TS, being adopted in N4723 at the Jacksonville 2018 meeting. See P0914R1 for the proposal. Thus it may not be supported by some older versions of Clang or MSVC.

If the promise constructor throws an exception then the parameter copies are destructed and the coroutine frame freed during stack unwinding before the exception propagates out to the caller.

Obtaining the return object

The first thing a coroutine does with the promise object is obtain the return-object by calling promise.get_return_object().

在 coroutine 被建立后,首先是调用 promise.get_return_object() 获取 return-object。return-object 后续会被返回给 coroutine 的 caller。如前文所说,返回的时间点是第一次 suspend,或者 coroutine 完成了。

执行大抵如下。注意,在 “Coroutine Handles” 这一节中介绍了,可以通过 from_promise 从 promise 重新构建出 coroutine_handle。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Pretend there's a compiler-generated structure called 'coroutine_frame'
// that holds all of the state needed for the coroutine. It's constructor
// takes a copy of parameters and default-constructs a promise object.
struct coroutine_frame { ... };

T some_coroutine(P param)
{
auto* f = new coroutine_frame(std::forward<P>(param));

auto returnObject = f->promise.get_return_object();

// Start execution of the coroutine body by resuming it.
// This call will return when:
// 1. the coroutine gets to the first suspend-point
// 2. or when the coroutine runs to completion.
coroutine_handle<decltype(f->promise)>::from_promise(f->promise).resume();

// Then the return object is returned to the caller.
return returnObject;
}

注意,必须要在执行 coroutine body 之前就获取 return-object。这因为 coroutine frame 以及它持有的 promise 对象可能在 coroutine_handle::resume() 返回前就被销毁掉。也就是说,在 resume() 返回前,程序可能处于 suspend 状态的,我理解后续这个 coroutine 就可能被直接销毁掉了。

销毁未必发生在 caller 的线程上。因此,在开始执行 coroutine body 之后调用 get_return_object() 是不安全的。

The initial-suspend point

The next thing the coroutine executes once the coroutine frame has been initialised and the return object has been obtained is execute the statement co_await promise.initial_suspend();

执行 co_await promise.initial_suspend(),实际上允许 promise_type 也就是之前提到的 P 的作者,可以控制 coroutine 到底是立即执行,还是先 suspend 等调度。这有点类似于 std::async 里面相同参数的意思了。

如果在 initial suspend 点选择 suspend 的话,后续可以被 resume 或者被 destroy。

co_await promise.initial_suspend() 的结果被丢弃,所以实现上可以从 await_resume 返回 void。

注意1:initial_suspend() 这个调用并没有被 try-catch 块环绕,也就是说这里发生的异常,更准确说是在它的 <return-to-caller-or-resumer> 之前的异常会在销毁 coroutine frame 和 return-object 之后被直接抛给 caller。

注意2:如果 return-object 中有某个 RAII 语义,能够在它被销毁的时候销毁 coroutine frame,那么就需要保证 co_await promise.initial_suspend() 不会抛出异常,否则会发生 double free 的问题。当然也有提案说要去修改这个行为。

但实际上因为大部分 coroutine 的 initial_suspend() 只会返回都是 noexcept 的 suspend_never 或者 suspend_always,所以这不是个问题。

Returning to the caller

当 coroutine 执行到第一个 <return-to-caller-or-resumer> 点(如果没有这个点就是执行完成)的时候,从 get_return_object 获取的 return-object 会被返回给 caller。

注意,return-object 的类型不一定是 coroutine function 的 return type。可以进行隐式转换。

Returning from the coroutine using co_return

co_return 会被转化为:

  1. promise.return_void()
    co_return <expr>
  2. promise.return_value(<expr>)
    如果 expr 的类型是 void,则 <expr>; promise.return_void();
    如果 expr 的类型不是 void,则 promise.return_value(<expr>);

Note that if execution runs off the end of a coroutine without a co_return statement then this is equivalent to having a co_return; at the end of the function body. In this case, if the promise_type does not have a return_void() method then the behaviour is undefined.

If either the evaluation of or the call to promise.return_void() or promise.return_value() throws an exception then the exception still propagates to promise.unhandled_exception().

Handling exceptions that propagate out of the coroutine body

The final-suspend point

final_suspend 的调用发生在 return_void()return_value()unhandled_exception() 之后。也发生在所有的 local variable 都被销毁之后。

This allows the coroutine to execute some logic, such as publishing a result, signalling completion or resuming a continuation. It also allows the coroutine to optionally suspend immediately before execution of the coroutine runs to completion and the coroutine frame is destroyed.

在 final_suspend 点 resume 一个 coroutine 是 UB 的,对于这个状态的 coroutine 只可以调用 destroy。

The rationale for this limitation, according to Gor Nishanov, is that this provides several optimisation opportunities for the compiler due to the reduction in the number of suspend states that need to be represented by the coroutine and a potential reduction in the number of branches required.

尽管可以在 final_suspend 处不 suspend,但建议是尽量 suspend。因为这可以强迫你在 coroutine 外面调用 destroy(一般是通过某种 RAII 机制)。这样编译器就能够更容易确定 coroutine frame 的 lifetime 是被 caller 的 lifetime 覆盖了的,从而就可以执行之前说的 elide 掉 coroutine frame 内存分配的优化。

How the compiler chooses the promise type

编译期可以自动推导 promise_type。

1
2
3
4
5
6
7
8
9
10
11
task<float> foo(std::string x, bool flag);
-->
typename coroutine_traits<task<float>, std::string, bool>::promise_type;

task<void> my_class::method1(int x) const;
-->
typename coroutine_traits<task<void>, const my_class&, int>::promise_type;

task<foo> my_class::method2() &&;
-->
typename coroutine_traits<task<foo>, my_class&&>::promise_type;

可以自定义 promise_type

1
2
3
4
5
6
7
8
namespace std::experimental
{
template<typename T, typename... ARGS>
struct coroutine_traits<std::optional<T>, ARGS...>
{
using promise_type = optional_promise<T>;
};
}

Identifying a specific coroutine activation frame

介绍 coroutine_handle 这个类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
namespace std::experimental
{
template<typename Promise = void>
struct coroutine_handle;

// Type-erased coroutine handle. Can refer to any kind of coroutine.
// Doesn't allow access to the promise object.
template<>
struct coroutine_handle<void>
{
// Constructs to the null handle.
constexpr coroutine_handle();

// Convert to/from a void* for passing into C-style interop functions.
constexpr void* address() const noexcept;
static constexpr coroutine_handle from_address(void* addr);

// Query if the handle is non-null.
constexpr explicit operator bool() const noexcept;

// Query if the coroutine is suspended at the final_suspend point.
// Undefined behaviour if coroutine is not currently suspended.
bool done() const;

// Resume/Destroy the suspended coroutine
void resume();
void destroy();
};

// Coroutine handle for coroutines with a known promise type.
// Template argument must exactly match coroutine's promise type.
template<typename Promise>
struct coroutine_handle : coroutine_handle<>
{
using coroutine_handle<>::coroutine_handle;

static constexpr coroutine_handle from_address(void* addr);

// Access to the coroutine's promise object.
Promise& promise() const;

// You can reconstruct the coroutine handle from the promise object.
static coroutine_handle from_promise(Promise& promise);
};
}

它可以由两个方式获得:

  1. await_suspend 的参数
    这类似于 CPS 的方式。
  2. 通过 promise 从 from_promise 构造

coroutine_handle 并不是 RAII 的,需要调用 destroy 去释放它。这样设计是为了减少 overhead。

You should generally try to use higher-level types that provide the RAII semantics for coroutines, such as those provided by cppcoro (shameless plug), or write your own higher-level types that encapsulate the lifetime of the coroutine frame for your coroutine type.

Customising the behaviour of co_await

promise 类型可以可选地自定义 co_await 表达式的行为。
只需要定义这个类型的 await_transform() 方法,编译器就能够将所有的 co_await <expr> 转换为 co_await promise.await_transform(<expr>)

为什么要提供这个功能呢?

原因1

因为有些类型不是 awaitable 的,所以要提供这个转换。

For example, a promise type for coroutines with a std::optional<T> return-type might provide an await_transform() overload that takes a std::optional<U> and that returns an awaitable type that either returns a value of type U or suspends the coroutine if the awaited value contains nullopt.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template<typename T>
class optional_promise
{
...

template<typename U>
auto await_transform(std::optional<U>& value)
{
class awaiter
{
std::optional<U>& value;
public:
explicit awaiter(std::optional<U>& x) noexcept : value(x) {}
bool await_ready() noexcept { return value.has_value(); }
void await_suspend(std::experimental::coroutine_handle<>) noexcept {}
U& await_resume() noexcept { return *value; }
};
return awaiter{ value };
}
};

原因2

It lets you disallow awaiting on certain types by declaring await_transform overloads as deleted.

For example, a promise type for std::generator<T> return-type might declare a deleted await_transform() template member function that accepts any type. This basically disables use of co_await within the coroutine.

1
2
3
4
5
6
7
8
9
10
template<typename T>
class generator_promise
{
...

// Disable any use of co_await within this type of coroutine.
template<typename U>
std::experimental::suspend_never await_transform(U&&) = delete;

};

原因3

It lets you adapt and change the behaviour of normally awaitable values.

For example, you could define a type of coroutine that ensured that the coroutine always resumed from every co_await expression on an associated executor by wrapping the awaitable in a resume_on() operator (see cppcoro::resume_on()).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename T, typename Executor>
class executor_task_promise
{
Executor executor;

public:

template<typename Awaitable>
auto await_transform(Awaitable&& awaitable)
{
using cppcoro::resume_on;
return resume_on(this->executor, std::forward<Awaitable>(awaitable));
}
};

As a final word on await_transform(), it’s important to note that if the promise type defines any await_transform() members then this triggers the compiler to transform all co_await expressions to call promise.await_transform(). 所以,如果只是希望对某些类型定制 co_await 行为,最好为 await_transform() 提供一个只 forward argument 的重载。

Customising the behaviour of co_yield

编译器会把 co_yield <expr> 转换为 co_await promise.yield_value(<expr>)。因此 promise 对象可以定制 yield_value 方法。

如果编译器没有定制这个方法,该方法不会有默认的行为。所以需要显式提供这样的方法,promise 类型才能支持 co_yield。

如下所示,对一个 generator 类型提供了 yield_value 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename T>
class generator_promise
{
T* valuePtr;
public:
...

std::experimental::suspend_always yield_value(T& value) noexcept
{
// Stash the address of the yielded value and then return an awaitable
// that will cause the coroutine to suspend at the co_yield expression.
// Execution will then return from the call to coroutine_handle<>::resume()
// inside either generator<T>::begin() or generator<T>::iterator::operator++().
valuePtr = std::addressof(value);
return {};
}
};

Understanding Symmetric Transfer

https://lewissbaker.github.io/2020/05/11/understanding_symmetric_transfer

在 Coroutine TS 刚开始被提出的时候,有一个限制,会导致轻易的 stack-overflow。为了避免它,就需要在 task<T> 类型中引入额外的同步开销。

在 2018 年,引入了一个 symmetric transfer 的特性,使得我们可以挂起一个 Coroutine,并 Resume 另一个,但是不会消耗栈空间了。

First some background on how a task coroutine works

1
2
3
4
5
6
7
task foo() {
co_return;
}

task bar() {
co_await foo();
}

不放展开看看 bar()co_await foo() 的时候都发生了什么:

  1. 调用 foo 需要有下面几步
    为 coroutine frame 分配寻出空间。
    将参数复制到 coroutine frame 里面。在当前 case 里面没有参数,所以就是一个空操作。
    在 coroutine frame 里面构造 promise object。
    调用 promise.get_return_object() 获得 foo() 的返回值。这个过程中会产生被返回的 task 对象,并使用 std::coroutine_handle 创建它。如前所述,std::coroutine_handle 持有刚创建的 coroutine frame 的引用。
    在 initial-suspend point 挂起 coroutine 的执行。
    返回 task 对象给到 bar()。
  2. 后面,bar() 会执行 co_await
    bar() 会被挂起,然后调用由 foo() 返回的 task 对象上的 await_suspend() 方法。会把指向 bar 的 coroutine frame 的 std::coroutine_handle 传给该方法。
    await_suspend() 中,会存储 bar() 的 std::coroutine_handle 到 foo 的 promise 对象中【A】,然后通过调用 foostd::coroutine_handle 的 resume 方法去 resume foo() 的执行。
  3. foo() 会同步地执行。
  4. foo() 会在 final-suspend point 挂起,然后 resume bar。这是根据被存在 promise 对象中的 std::coroutine_handle 来找到的,见【A】步骤。
  5. bar() 会 resume,继续执行,并最终到达 co_await 语句处,并调用临时 task 对象的析构函数。
  6. task 对象执行析构。因为这个 task 对象是 foo() 返回的,所以它会调用 foo() 的 coroutine handle 上的 .destroy() 方法,这样就会销毁 coroutine frame,包括 promise 对象和之前复制了的参数之内。

Outline of a task implementation

下面可以看下如果不支持 symmetric transfer,task 类应该如何被实现。可以看出 task 是一个 Awaitable。

A task has exclusive ownership of the std::coroutine_handle that corresponds to the coroutine frame created during the invocation of the coroutine. The task object is an RAII object that ensures that .destroy() is called on the std::coroutine_handle when the task object goes out of scope.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class task {
public:
class promise_type { /* see below */ };

task(task&& t) noexcept
: coro_(std::exchange(t.coro_, {}))
{}

~task() {
if (coro_)
coro_.destroy();
}

class awaiter { /* see below */ };

awaiter operator co_await() && noexcept;

private:
explicit task(std::coroutine_handle<promise_type> h) noexcept
: coro_(h)
{}

std::coroutine_handle<promise_type> coro_;
};

下面会展开讲解 promise_type 和 awaiter 的实现。

Implementing task::promise_type

promise_type 会定义在 coroutine frame 中创建的 Promise 对象的类型。

首先,需要实现 get_return_object() 去构造将来要被返回的 task 对象。这个对象的初始化需要借助于 std::coroutine_handle

这里是根据 from_promise 从 Promise 对象中重新构造出了 std::coroutine_handle。这是获得 std::coroutine_handle 的一种方法,另一种方法是 await_suspend 参数,前文中提到过。

1
2
3
4
5
6
class task::promise_type {
public:
task get_return_object() noexcept {
return task{std::coroutine_handle<promise_type>::from_promise(*this)};
}
...

后面,这个 coroutine 需要在 initial-suspend point 挂起,这样在 task 被 await 的时候,我们可以 resume 它。这样 lazy 的处理有下面几点好处:

  1. It means that we can attach the continuation’s std::coroutine_handle before starting execution of the coroutine. This means we don’t need to use thread-synchronisation to arbitrate the race between attaching the continuation later and the coroutine running to completion.
    我理解这里讲的是和“Comparison to Stackful Coroutines”这一章节中类似的问题。
  2. It means that the task destructor can unconditionally destroy the coroutine frame - we don’t need to worry about whether the coroutine is potentially executing on another thread since the coroutine will not start executing until we await it, and while it is executing the calling coroutine is suspended and so won’t attempt to call the task destructor until the coroutine finishes executing.
    这里说的是 task 的析构函数可以不加判断地直接销毁掉 coroutine frame。也就是说,并不需要担心 coroutine 是否此时还在另一个线程上执行。实际上我们只有在 await 它的时候,coroutine 才会开始执行。而这个时候,调用方 coroutine 已经被挂起了,直到 coroutine 执行完成后,都不会再调用 task 的 destructor 了。
    所以这让编译器更容易把分配 coroutine frame 的操作 inline 到 caller 的 frame 里面。我理解就是“Allocating a coroutine frame”里面讲的东西。
    See P0981R0 to read more about the Heap Allocation eLision Optimisation (HALO).
  3. It also improves the exception-safety of your coroutine code. If you don’t immediately co_await the returned task and do something else that can throw an exception that causes the stack to unwind and the task destructor to run then we can safely destroy the coroutine since we know it hasn’t started yet. We aren’t left with the difficult choice between detaching, potentially leaving dangling references, blocking in the destructor, terminating or undefined-behaviour.
    这也能提高异常安全性。

为了让 coroutine 能够 initially suspend,需要定义一个返回 suspend_alwaysinitial_suspend 方法。

1
2
3
4
5
...
std::suspend_always initial_suspend() noexcept {
return {};
}
...

然后,定义 return_void() 方法。这是在执行 co_return 的时候,或者执行到 coroutine 末尾的时候被调用的。这个方法并不会做什么事情,只是让编译器知道 co_return; 对于当前的 coroutine 类型是合法的。

1
2
3
...
void return_void() noexcept {}
...

We also need to add an unhandled_exception() method that is called if an exception escapes the body of the coroutine. For our purposes we can just treat the task coroutine bodies as noexcept and call std::terminate() if this happens.

1
2
3
4
5
...
void unhandled_exception() noexcept {
std::terminate();
}
...

最后,还需要 coroutine 能在 final-suspend point 被 suspend 住,然后 resume its Continuation。在当前的 case 中 continuation 就是在 awaiting 的 coroutine,在 task::awaiter::await_suspend 的时候被设置的。

【Q】“resume its Continuation” 中的 Continuation 指的是什么?这里我理解应该就是 loop_synchronously 里面循环的下一次迭代。可以在 Stack 图中看到 continuation 具体指向哪里的。

因此,需要在 promise 中引入一个成员,去持有 continuation 的 std::coroutine_handle,不然如何调用对应的 .resume() 方法呢?

还需要定义 final_suspend() 方法来返回一个 awaitable 对象也就是 final_awaiter,让它在当前 coroutine 被挂起后,去 resume 这个 continuation。

注意,需要再当前 coroutine 被 suspend 之后,才能 resume continuation。这是因为 continuation 可能立即就会调用 task 的析构函数,从而间接调用 coroutine frame 的 .destroy() 方法。.destroy() 方法只对 suspended 的 coroutine 生效。

The compiler inserts code to evaluate the statement co_await promise.final_suspend(); at the closing curly brace.

需要注意,在调用 final_suspend 的时候,coroutine 还没有进入 suspend 状态。需要等到返回的 awaitable 对象上的 await_suspend() 方法被调用之后,coroutine 才被 suspend。关于这个我觉得可以参考之前讲的 <return-to-caller-or-resumer>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
struct final_awaiter {
bool await_ready() noexcept {
return false;
}

void await_suspend(std::coroutine_handle<promise_type> h) noexcept {
// The coroutine is now suspended at the final-suspend point.
// Lookup its continuation in the promise and resume it.
h.promise().continuation.resume();
}

void await_resume() noexcept {}
};

final_awaiter final_suspend() noexcept {
return {};
}

std::coroutine_handle<> continuation;
};

【Q】为啥这里定义一个 final_awaiter,而不是直接用 std::suspend_always。
首先,suspend_alwayssuspend_never 的实现上分别定义了 await_ready() 方法始终返回 false 或者 true。await_suspend 或者 await_resume 方法都是空实现。而这里是希望 final_awaiterawait_suspend 能去 resume continuation。

Implementing task::operator co_await()

co_await 会返回一个 awaiter 对象,这个对象需要支持 await_ready()await_suspend()await_resume()

下面就是 awaiter 的简单实现。注意,一旦一个 coroutine 被 suspend 了,就需要保存 coroutine handle 到 promise 对象中。这样后续可以调用 std::coroutine_handle 中的 resume() 方法去执行这个 task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class task::awaiter {
public:
bool await_ready() noexcept {
return false;
}

void await_suspend(std::coroutine_handle<> continuation) noexcept {
// Store the continuation in the task's promise so that the final_suspend()
// knows to resume this coroutine when the task completes.
coro_.promise().continuation = continuation;

// Then we resume the task's coroutine, which is currently suspended
// at the initial-suspend-point (ie. at the open curly brace).
coro_.resume();
}

void await_resume() noexcept {}

private:
explicit awaiter(std::coroutine_handle<task::promise_type> h) noexcept
: coro_(h)
{}

std::coroutine_handle<task::promise_type> coro_;
};

task::awaiter task::operator co_await() && noexcept {
return awaiter{coro_};
}

作者给出了一个可编译的 demo 在 https://godbolt.org/z/-Kw6Nf。

The stack-overflow problem

考虑下面的代码,如果 count 足够大,程序就会爆栈。例如 https://godbolt.org/z/gy5Q8q 中展示了当 count 是 1000000 的时候,程序就爆栈了。

1
2
3
4
5
6
7
8
9
task completes_synchronously() {
co_return;
}

task loop_synchronously(int count) {
for (int i = 0; i < count; ++i) {
co_await completes_synchronously();
}
}

这是因为当 loop_synchronously() 开始执行时,有一个其他 coroutine 正在 co_await 的自己返回的 task,也就是在 co_await loop_synchronously()。因此,它会 suspend 正在 awating 的 coroutine,然后调用 task::awaiter::await_suspend()。如前文介绍,await_suspend 会负责调用对应 task 的 std::coroutine_handleresume() 方法。

Thus the stack will look something like this when loop_synchronously() starts.

我理解这里倒数第二底层的 task::awaiter::await_suspend 是由于这个其他 coroutine 在 await 从而产生的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
           Stack                                                   Heap
+------------------------------+ <-- top of stack +--------------------------+
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame |
+------------------------------+ | +----------------------+ |
| coroutine_handle::resume | | | task::promise | |
+------------------------------+ | | - continuation --. | |
| task::awaiter::await_suspend | | +------------------|---+ |
+------------------------------+ | ... | |
| awaiting_coroutine$resume | +--------------------|-----+
+------------------------------+ V
| .... | +--------------------------+
+------------------------------+ | awaiting_coroutine frame |
| |
+--------------------------+

这里的 $resume 后缀用来表示 coroutine 中的用户自定义逻辑。

然后,当 loop_synchronously()co_awaitcompletes_synchronously() 返回的 task 对象时,当前的 coroutine 会被 suspend,然后会调用 task::awaiter::await_suspend()。await_suspend() 方法会调用 completes_synchronously() 的 coroutine handle 上的 .resume() 方法。

这会 resume completes_synchronously() coroutine。这个 coroutine 会 synchronously 地运行结束,然后在 final-suspend point 被 suspend。然后它会调用 task::promise::final_awaiter::await_suspend(),然后最终调用 loop_synchronously() 这个 coroutine 的 coroutine handle 上的 .resume() 方法。

如果我们在 loop_synchronously() coroutine 被 resume 之后,它返回的临时的 task 被销毁之前,检查调用栈,就可以看到下面的情况。

这里的 final_awaiter 也就是 promise 对象的 final_suspend() 方法返回的内容。根据之前的说明,它是在 final-suspend point 之后 resume continuation 的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
           Stack                                                   Heap
+-------------------------------+ <-- top of stack
| loop_synchronously$resume | active coroutine -.
+-------------------------------+ |
| coroutine_handle::resume | .------'
+-------------------------------+ |
| final_awaiter::await_suspend | |
+-------------------------------+ | +--------------------------+ <-.
| completes_synchronously$resume| | | completes_synchronously | |
+-------------------------------+ | | frame | |
| coroutine_handle::resume | | +--------------------------+ |
+-------------------------------+ '---. |
| task::awaiter::await_suspend | V |
+-------------------------------+ <-- prev top +--------------------------+ |
| loop_synchronously$resume | of stack | loop_synchronously frame | |
+-------------------------------+ | +----------------------+ | |
| coroutine_handle::resume | | | task::promise | | |
+-------------------------------+ | | - continuation --. | | |
| task::awaiter::await_suspend | | +------------------|---+ | |
+-------------------------------+ | - task temporary --|---------'
| awaiting_coroutine$resume | +--------------------|-----+
+-------------------------------+ V
| .... | +--------------------------+
+-------------------------------+ | awaiting_coroutine frame |
| |
+--------------------------+

然后,就会调用 task 的析构函数,摧毁 completes_synchronously() 的 coroutine frame。然后就会进行新一轮的循环,创建新的 completes_synchronously() 的 coroutine frame,然后 resume。

最终结果是,loop_synchronously()completes_synchronously() 会递归地互相调用彼此。每次调用都会消耗一点栈空间,直到最后栈爆掉了。

Writing loops in coroutines built this way makes it very easy to write functions that perform unbounded recursion without looking like they are doing any recursion.

So, what would the solution look like under the original Coroutines TS design?

The Coroutines TS solution

TS 的解决方案是使用返回 bool 的版本的 await_suspend,根据的原理是
In the Coroutines TS there is also a version of await_suspend() that returns bool - if it returns true then the coroutine is suspended and execution returns to the caller of resume(), otherwise if it returns false then the coroutine is immediately resumed, but this time without consuming any additional stack-space.

具体来说,做出下面的修改:

  1. Inside the task::awaiter::await_suspend() method you can start executing the coroutine by calling .resume(). Then when the call to .resume() returns, check whether the coroutine has run to completion or not. If it has run to completion then we can return false, which indicates the awaiting coroutine should immediately resume, or we can return true, indicating that execution should return to the caller of std::coroutine_handle::resume().
  2. Inside task::promise_type::final_awaiter::await_suspend(), which is run when the coroutine runs to completion, we need to check whether the awaiting coroutine has (or will) return true from task::awaiter::await_suspend() and if so then resume it by calling .resume(). Otherwise, we need to avoid resuming the coroutine and notify task::awaiter::await_suspend() that it needs to return false.

从下面的代码来看,awaiter::await_suspend 和 final_awaiter::await_suspend 中都会尝试设置 promise.ready 为 true。但是:

  1. 在 awaiter::await_suspend 中如果发现 promise.ready 原来是 false,说明还没结束,则要返回 true 去挂起,并且返回给 std::coroutine_handle::resume() 的调用方。如果原来是 true,说明执行完了,就返回 false,则可以立即 resume。
  2. 在 final_awaiter::await_suspend 中如果发现 promise.ready 原来是 true,说明【Q】

There is an added complication, however, in that it’s possible for a coroutine to start executing on the current thread then suspend and later resume and run to completion on a different thread before the call to .resume() returns. Thus, we need to be able to resolve the potential race between part 1 and part 2 above happening concurrently.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class task::promise_type {
...

std::coroutine_handle<> continuation;
std::atomic<bool> ready = false;
};

bool task::awaiter::await_suspend(
std::coroutine_handle<> continuation) noexcept {
promise_type& promise = coro_.promise();
promise.continuation = continuation;
coro_.resume();
return !promise.ready.exchange(true, std::memory_order_acq_rel);
}

void task::promise_type::final_awaiter::await_suspend(
std::coroutine_handle<promise_type> h) noexcept {
promise_type& promise = h.promise();
if (promise.ready.exchange(true, std::memory_order_acq_rel)) {
// The coroutine did not complete synchronously, resume it here.
promise.continuation.resume();
}
}

修改后的代码在 https://godbolt.org/z/7fm8Za。

The problems

上面的方案依然存在问题:

  1. 依赖原子操作
    第一次是在调用者在 suspend awaiting coroutine 的时候。
    第二次是在被调用者即将完成执行的时候。
  2. 引入额外的分支操作。

最后一个最严重的问题是,被挂起的 coroutine 在被 resume 后,被哪个线程执行是不确定的。比如考虑下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cppcoro::static_thread_pool tp;

task foo()
{
std::cout << "foo1 " << std::this_thread::get_id() << "\n";
// Suspend coroutine and reschedule onto thread-pool thread.
co_await tp.schedule();
std::cout << "foo2 " << std::this_thread::get_id() << "\n";
}

task bar()
{
std::cout << "bar1 " << std::this_thread::get_id() << "\n";
co_await foo();
std::cout << "bar2" << std::this_thread::get_id() << "\n";
}

在原始的实现中,可能的输出如下。这是因为我们保证在 the code that runs after co_await foo() would run inline on the same thread that foo() completed on.

1
2
3
4
bar1 1234
foo1 1234
foo2 3456
bar2 3456

但是因为使用了原子变量,就可能 foo 的 completion 和 bar 的 suspension 之间有 race(我理解就是上面的两个 await_suspend 会竞争地设置 promise.ready 吧)。那么在一些情况下,co_await foo() might run on the original thread that bar() started executing on. 如下所示。

1
2
3
4
bar1 1234
foo1 1234
foo2 3456
bar2 1234

这对一些场景下是存在问题的。比如 via 这个函数可以指定一个 Scheduler 去运行某个 Awaitable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template<typename Awaitable, typename Scheduler>
task<await_result_t<Awaitable>> via(Awaitable a, Scheduler s)
{
auto result = co_await std::move(a);
co_await s.schedule();
co_return result;
}

task<T> get_value();
void consume(const T&);

task<void> consumer(static_thread_pool::scheduler s)
{
T result = co_await via(get_value(), s);
consume(result);
}

但像现在这样的话,consume() 可能在 s 上执行,但也有可能在 whatever thread the consumer() coroutine started execution on 上被执行。

Enter “symmetric transfer”

This paper proposed two key changes:

  1. Allow returning a std::coroutine_handle<T> from await_suspend() as a way of indicating that execution should be symmetrically transferred to the coroutine identified by the returned handle.
  2. Add a std::experimental::noop_coroutine() function that returns a special std::coroutine_handle that can be returned from await_suspend() to suspend the current coroutine and return from the call to .resume() instead of transferring execution to another coroutine.

首先,什么是 symmetric transfer?简单来说,像函数调用,返回那样的就是 asymmetric transfer,因为有明确的调用者和被调用者。具体到 coroutine 场景中,当 A 调用 .resume() 方法去 resume 一个 coroutine 的时候,这个 A 还在 stack 上,尽管 resumed coroutine 正在被执行。当这个 coroutine 后面挂起,并调用 await_suspend 返回 void(无条件 suspend)或者 true(条件 suspend),那么对 .resume() 的调用就返回了。

每次我们通过 .resume() 方法去 resume 一个 coroutine 的时候,都会创建一个新的 frame。
但如果通过 symmetric transfer,我们就只是 suspend 某个 coroutine,resume 另一个 coroutine。这两个 coroutine 之间没有任何的调用者或者被调用者的关系。当一个 coroutine 被 suspend 后,它可以将 execution 给到任意的被 suspend 的coroutine,甚至包括自己,并且在自己被 suspend 之后,也不需要把 execution 还给之前的 coroutine。

Let’s look at what the compiler lowers a co_await expression to when the awaiter makes use of symmetric-transfer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
decltype(auto) value = <expr>;
decltype(auto) awaitable =
get_awaitable(promise, static_cast<decltype(value)&&>(value));
decltype(auto) awaiter =
get_awaiter(static_cast<decltype(awaitable)&&>(awaitable));
if (!awaiter.await_ready())
{
using handle_t = std::coroutine_handle<P>;

//<suspend-coroutine>

auto h = awaiter.await_suspend(handle_t::from_promise(p));
h.resume();
//<return-to-caller-or-resumer>

//<resume-point>
}

return awaiter.await_resume();
}

Let’s zoom in on the key part that differs from other co_await forms:

1
2
3
auto h = awaiter.await_suspend(handle_t::from_promise(p));
h.resume();
//<return-to-caller-or-resumer>

Once the coroutine state-machine is lowered (a topic for another post), the <return-to-caller-or-resumer> part basically becomes a return; statement which causes the call to .resume() that last resumed the coroutine to return to its caller.

This means that we have the situation where we have a call to another function with the same signature, std::coroutine_handle::resume(), followed by a return; from the current function which is itself the body of a std::coroutine_handle::resume() call.

Some compilers, when optimisations are enabled, are able to apply an optimisation that turns calls to other functions the tail-position (ie. just before returning) into tail-calls as long as some conditions are met.

It just so happens that this kind of tail-call optimisation is exactly the kind of thing we want to be able to do to avoid the stack-overflow problem we were encountering before. But instead of being at the mercy of the optimiser as to whether or not the tail-call transformation is perfromed, we want to be able to guarantee that the tail-call transformation occurs, even when optimisations are not enabled.

But first let’s dig into what we mean by tail-calls.

Tail-calls

Tail-call 指的是当前的 stack frame 被在调用前就被弹出了,然后当前函数的返回地址变为了被调用者的返回地址。比如,被调用者会直接返回给调用者的调用者。

在 X86 架构上,编译器会首先弹出当前的栈帧,然后用一个 jmp 指令去跳转到被调用的函数的 entry-point。而不是使用一个 call 指令,然后在返回后再弹出当前 stack-frame。

This optimisation is generally only possible to do in limited circumstances, however. In particular, it requires that:

  1. the calling convention supports tail-calls and is the same for the caller and callee;
  2. the return-type is the same;
  3. there are no non-trivial destructors that need to be run after the call before returning to the caller; and
  4. the call is not inside a try/catch block.

The shape of the symmetric-transfer form of co_await has actually been designed specifically to allow coroutines to satisfy all of these requirements. Let’s look at them individually.

  1. Calling convention
    当编译器 lowers 一个 coroutine 到机器码的时候,它实际上将 coroutine 分为了两部分。第一部分是 ramp,它会分配并初始化 coroutine 帧。第二部分是 body,它包含了从用户自定义的 coroutine body 生成的状态机。
    The function signature of the coroutine (and thus any user-specified calling-convention) affects only the ramp part, whereas the body part is under the control of the compiler and is never directly called by any user-code - only by the ramp function and by std::coroutine_handle::resume().
    The calling-convention of the coroutine body part is not user-visible and is entirely up to the compiler and thus it can choose an appropriate calling convention that supports tail-calls and that is used by all coroutine bodies.

  2. Return type is the same
    “调用方” coroutine 和“被调用方” coroutine 的 .resume() 方法的返回值都是 void。

  3. No non-trivial destructors
    在执行 tail-call 时,需要能够在调用目标函数之前就释放当前的 stack frame。而这需要所有在栈上分配的对象的生命周期都在调用前完成。
    Normally, this would be problematic as soon as there are any objects with non-trivial destructors in-scope as the lifetime of those objects would not yet have ended and those objects would have been allocated on the stack.
    但是,当一个 coroutine 被 suspend 之后,它实际上会将需要续命的对象放到 coroutine frame 里面,而不是直接分配在 stack 上。
    对于真正的 local variable,也就是那些 lifetime 并不会跨越 suspend-point 的 variable,它们是会被分配在栈上的。但是它们的 lifetime 在 coroutine suspend 之前就已经结束了,并且对应的析构函数也已经被调用了。
    所以不会存在有 stack-allocated objects,它们的 non-trivial destructor 需要在 tail-call 返回之后被执行。

  4. Call not inside a try/catch block
    这里比较 tricky 的一点是每个 coroutine 都会有一个隐式的 try/catch block,来包裹其中的用户自定义的部分。
    类似下面这,F 就是用户自定义的部分。

    1
    2
    3
    4
    5
    6
    7
    8
    {
    promise_type promise;
    co_await promise.initial_suspend();
    try { F; }
    catch (...) { promise.unhandled_exception(); }
    final_suspend:
    co_await promise.final_suspend();
    }

    所以,每个用户自定义的 co_await 表达式(除了 initial_suspend 和 final_suspend 的)会被 try catch 包裹。
    However, implementations work around this by actually executing the call to .resume() outside of the context of the try-block.

So we see that coroutines performing a symmetric-transfer generally satisfy all of the requirements for being able to perform a tail-call. The compiler guarantees that this will always be a tail-call, regardless of whether optimisations are enabled or not.

This means that by using the std::coroutine_handle-returning flavour of await_suspend() we can suspend the current coroutine and transfer execution to another coroutine without consuming extra stack-space.

This allows us to write coroutines that mutually and recursively resume each other to an arbitrary depth without fear of overflowing the stack.

This is exactly what we need to fix our task implementation.

task revisited

So with the new “symmetric transfer” capability under our belt let’s go back and fix our task type implementation.

To do this we need to make changes to the two await_suspend() methods in our implementation:

  1. First so that when we await the task that we perform a symmetric-transfer to resume the task’s coroutine.
  2. Second so that when the task’s coroutine completes that it performs a symmetric transfer to resume the awaiting coroutine.

To address the await direction we need to change the task::awaiter method from this:

1
2
3
4
5
6
7
8
9
10
void task::awaiter::await_suspend(
std::coroutine_handle<> continuation) noexcept {
// Store the continuation in the task's promise so that the final_suspend()
// knows to resume this coroutine when the task completes.
coro_.promise().continuation = continuation;

// Then we resume the task's coroutine, which is currently suspended
// at the initial-suspend-point (ie. at the open curly brace).
coro_.resume();
}

会变成

1
2
3
4
5
6
7
8
9
10
11
std::coroutine_handle<> task::awaiter::await_suspend(
std::coroutine_handle<> continuation) noexcept {
// Store the continuation in the task's promise so that the final_suspend()
// knows to resume this coroutine when the task completes.
coro_.promise().continuation = continuation;

// Then we tail-resume the task's coroutine, which is currently suspended
// at the initial-suspend-point (ie. at the open curly brace), by returning
// its handle from await_suspend().
return coro_;
}

And to address the return-path we need to update the task::promise_type::final_awaiter method from this:

1
2
3
4
5
6
void task::promise_type::final_awaiter::await_suspend(
std::coroutine_handle<promise_type> h) noexcept {
// The coroutine is now suspended at the final-suspend point.
// Lookup its continuation in the promise and resume it.
h.promise().continuation.resume();
}

会变成

1
2
3
4
5
6
std::coroutine_handle<> task::promise_type::final_awaiter::await_suspend(
std::coroutine_handle<promise_type> h) noexcept {
// The coroutine is now suspended at the final-suspend point.
// Lookup its continuation in the promise and resume it symmetrically.
return h.promise().continuation;
}

And now we have a task implementation that doesn’t suffer from the stack-overflow problem that the void-returning await_suspend flavour had and that doesn’t have the non-deterministic resumption context problem of the bool-returning await_suspend flavour had.

Visualising the stack

这是之前的例子

1
2
3
4
5
6
7
8
9
task completes_synchronously() {
co_return;
}

task loop_synchronously(int count) {
for (int i = 0; i < count; ++i) {
co_await completes_synchronously();
}
}

现在,在 loop_synchronously() 第一次被执行的时候,可能是因为有些其他的 coroutine 去 co_await 了它。这是通过 symmetric transfer 来实现的,所以栈类似下面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
           Stack                                                Heap
+---------------------------+ <-- top of stack +--------------------------+
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame |
+---------------------------+ | +----------------------+ |
| coroutine_handle::resume | | | task::promise | |
+---------------------------+ | | - continuation --. | |
| ... | | +------------------|---+ |
+---------------------------+ | ... | |
+--------------------|-----+
V
+--------------------------+
| awaiting_coroutine frame |
| |
+--------------------------+

然后,执行 co_await completes_synchronously() 的时候,又会触发一次 symmetric transfer 到 completes_synchronously。

It does this by:

  1. 调用 task::operator co_await(),获得一个 task::awaiter 对象
  2. suspend,然后调用 task::awaiter::await_suspend()。它的 symmetric transfer 的版本会返回 coroutine_handle of the completes_synchronously coroutine.
  3. 执行一次 tail-call 或者说 jump 去到 completes_synchronously coroutine。这会弹出 loop_synchronously 的 frame,然后再 activate completes_synchronously 的 frame.

If we now look at the stack just after completes_synchronously is resumed it will now look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
              Stack                                          Heap
.-> +--------------------------+ <-.
| | completes_synchronously | |
| | frame | |
| | +----------------------+ | |
| | | task::promise | | |
| | | - continuation --. | | |
| | +------------------|---+ | |
`-, +--------------------|-----+ |
| V |
+-------------------------------+ <-- top of | +--------------------------+ |
| completes_synchronously$resume| stack | | loop_synchronously frame | |
+-------------------------------+ active -----' | +----------------------+ | |
| coroutine_handle::resume | coroutine | | task::promise | | |
+-------------------------------+ | | - continuation --. | | |
| ... | | +------------------|---+ | |
+-------------------------------+ | task temporary | | |
| - coro_ -----|---------`
+--------------------|-----+
V
+--------------------------+
| awaiting_coroutine frame |
| |
+--------------------------+

注意,stack-frame 的数量没有变多。

在 completes_synchronously 完成之后,当遇到右花括号的时候,会执行 co_await promise.final_suspend()

这会导致 coroutine 被挂起,并且调用 final_awaiter::await_suspend(),从而返回 continuation 的 std::coroutine_handle,实际上就指向的 loop_synchronously。这之后会做一个 symmetric transfer/tail-call 去 resume loop_synchronously。

If we look at the stack just after loop_synchronously is resumed then it will look something like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
           Stack                                                   Heap
+--------------------------+ <-.
| completes_synchronously | |
| frame | |
| +----------------------+ | |
| | task::promise | | |
| | - continuation --. | | |
| +------------------|---+ | |
+--------------------|-----+ |
V |
+----------------------------+ <-- top of stack +--------------------------+ |
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame | |
+----------------------------+ | +----------------------+ | |
| coroutine_handle::resume() | | | task::promise | | |
+----------------------------+ | | - continuation --. | | |
| ... | | +------------------|---+ | |
+----------------------------+ | task temporary | | |
| - coro_ -----|---------`
+--------------------|-----+
V
+--------------------------+
| awaiting_coroutine frame |
| |
+--------------------------+

loop_synchronously 在 resume 之后要做的第一件事,是调用临时的 task 对象的析构函数。这会销毁 coroutine-frame,释放它的内存,并产生下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
           Stack                                                   Heap
+---------------------------+ <-- top of stack +--------------------------+
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame |
+---------------------------+ | +----------------------+ |
| coroutine_handle::resume | | | task::promise | |
+---------------------------+ | | - continuation --. | |
| ... | | +------------------|---+ |
+---------------------------+ | ... | |
+--------------------|-----+
V
+--------------------------+
| awaiting_coroutine frame |
| |
+--------------------------+

We are now back to executing the loop_synchronously coroutine and we now have the same number of stack-frames and coroutine-frames as we started, and will do so each time we go around the loop.

Thus we can perform as many iterations of the loop as we want and will only use a constant amount of storage space.

For a full example of the symmetric-transfer version of the task type see the following Compiler Explorer link: https://godbolt.org/z/9baieF.