Boost C++ 库

...世界上最受推崇和设计最精良的 C++ 库项目之一。 Herb SutterAndrei Alexandrescu, C++ 编码标准

概述

以下是 cobalt 中相关功能的列表

表 1. 协程类型

promise

返回单个结果的急切协程 - 可以将其视为默认值

generator

可以产生多个值的急切协程。

task

promise 的惰性版本,可以生成到其他执行器上。

detached

类似于 promise 的协程,没有句柄

表 2. 同步函数

race

一个函数,以伪随机方式等待一组已准备好的协程中的一个,以避免饥饿。

join

一个函数,等待一组协程并将其全部作为值返回,或者如果任何可等待对象抛出异常,则抛出异常。

gather

一个函数,等待一组协程并将其全部作为 result 返回,单独捕获所有异常。

left_race

一个确定的 race,从左到右求值。

表 3. 实用程序

channel

一个线程本地实用程序,用于在协程之间发送值。

with

一个异步 RAII 助手,允许在发生异常时进行异步拆卸

表 4. 阅读指南

协程入门

C++ 协程简介

如果您以前从未使用过协程,请阅读

导览

功能和概念的简要高级概述

如果您熟悉 asio 和协程,并且想要了解此库提供的功能,请阅读。

教程

用法的低级视图

如果您想快速开始编码,请阅读

参考

API 参考

在编码时查找详细信息

深入

一些实现细节

如果您还没有感到困惑,请阅读

动机

许多编程语言(如 node.js 和 python)都提供了易于使用的单线程并发框架。虽然比同步代码更复杂,但单线程异步避免了多线程的许多陷阱和开销。

也就是说,一个协程可以工作,而其他协程等待事件(例如,来自服务器的响应)。这允许在单线程上编写同时执行多项操作的应用程序。

此库旨在为 C++ 提供此功能:简单的单线程异步,类似于 node.js 和 python 中的 asyncio,并且可以与现有库(如 boost.beastboost.mysqlboost.redis)一起使用。它基于 boost.asio

它从其他语言中提取了一系列概念,并基于 C++20 协程提供它们。

asio::awaitableasio::experimental::coro 不同,cobalt 协程是开放的。也就是说,asio::awaitable 只能等待和被其他 asio::awaitable 等待,并且不提供协程特定的同步机制。

另一方面,cobalt 提供了协程特定的 channel 和不同的等待类型(racegather 等),这些类型经过优化,可以与协程和可等待对象一起使用。

协程入门

异步编程

异步编程通常是指一种允许在后台运行任务,同时执行其他工作的编程风格。

想象一下,如果有一个 get-request 函数执行完整的 http 请求,包括连接和 ssl 握手等。

std::string http_get(std:string_view url);

int main(int argc, char * argv[])
{
    auto res = http_get("https://boost.ac.cn");
    printf("%s", res.c_str());
    return 0;
}

以上代码是传统的同步编程。如果我们想并行执行两个请求,我们需要创建另一个线程来运行另一个具有同步代码的线程。

std::string http_get(std:string_view url);

int main(int argc, char * argv[])
{
    std::string other_res;

    std::thread thr{[&]{ other_res = http_get("https://cppalliance.org"); }};
    auto res = http_get("https://boost.ac.cn");
    thr.join();

    printf("%s", res.c_str());
    printf("%s", other_res.c_str());
    return 0;
}

这是可行的,但我们的程序将大部分时间用于等待输入。操作系统提供了允许异步执行 IO 的 API,而诸如 boost.asio 之类的库提供了管理异步操作的可移植方法。Asio 本身并没有规定处理完成的方法。此库 (boost.cobalt) 提供了一种通过协程/可等待对象管理所有这些的方法。

cobalt::promise<std::string> http_cobalt_get(std:string_view url);

cobalt::main co_main(int argc, char * argv[])
{
    auto [res, other_res] =
            cobalt::join(
                http_cobalt_get(("https://boost.ac.cn"),
                http_cobalt_get(("https://cppalliance.org")
            );

    printf("%s", res.c_str());
    printf("%s", other_res.c_str());
    return 0;
}

在上面的代码中,用于执行请求的异步函数利用了操作系统 API,因此实际的 IO 不会阻塞。这意味着,当我们等待两个函数完成时,操作是交错的且非阻塞的。同时,cobalt 提供了使我们摆脱回调地狱的协程原语。

协程

协程是可恢复的函数。可恢复意味着一个函数可以暂停,即将控制权多次传递回调用者。

常规函数使用 return 函数将控制权返回给调用者,同时还会返回值。

另一方面,协程可能会将控制权返回给调用者并多次恢复。

协程有三个类似于 co_return 的控制关键字(其中只有 co_return 必须受支持)。

  • co_return

  • co_yield

  • co_await

co_return

这类似于 return,但会将函数标记为协程。

co_await

co_await 表达式将挂起一个 可等待对象,即停止执行直到 awaitable 恢复它。

例如

cobalt::promise<void> delay(std::chrono::milliseconds);

cobalt::task<void> example()
{
  co_await delay(std::chrono::milliseconds(50));
}

co_await 表达式可以根据它正在等待的内容产生一个值。

cobalt::promise<std::string> read_some();

cobalt::task<void> example()
{
  std::string res = co_await read_some();
}
cobalt 中,大多数协程原语也是 可等待对象

co_yield

co_yield 表达式类似于 co_await,但它将控制权返回给调用者并携带一个值。

例如

cobalt::generator<int> iota(int max)
{
  int i = 0;
  while (i < max)
    co_yield i++;

  co_return i;
}

co_yield 表达式还可以产生一个值,这允许 yield 协程的用户将值推送到其中。

cobalt::generator<int> iota()
{
  int i = 0;
  bool more = false;
  do
  {
    more = co_yield i++;
  }
  while(more);
  co_return -1;
}
无栈

C++ 协程是无栈的,这意味着它们只分配自己的函数帧。

有关更多详细信息,请参阅 无栈

可等待对象

可等待对象是在 co_await 表达式中可以使用的类型。

struct awaitable_prototype
{
    bool await_ready();

    template<typename T>
    see_below await_suspend(std::coroutine_handle<T>);

    return_type  await_resume();
};
如果存在可用的 operator co_await 调用,类型将隐式转换为可等待对象。本文档将使用 awaitable 来包含这些类型,并使用 "actual_awaitable" 来引用符合上述原型的类型。
awaitables

co_await 表达式中,等待的协程将首先调用 await_ready 来检查协程是否需要挂起。准备就绪后,它会直接进入 await_resume 以获取值,因为不需要挂起。否则,它将挂起自身并使用 std::coroutine_handle 调用 await_suspend 到自己的 promise。

std::coroutine_handle<void> 可用于类型擦除。

return_typeco_await 表达式的结果类型,例如 int

int i = co_await awaitable_with_int_result();

await_suspend 的返回类型可以是三种

  • void

  • bool

  • std::coroutine_handle<U>

如果它是 void,则等待的协程保持挂起。如果它是 bool,则将检查该值,如果为 false,则等待的协程将立即恢复。

如果返回 std::coroutine_handle,则将恢复此协程。后者允许 await_suspend 返回传入的句柄,实际上与返回 false 相同。

如果等待的协程立即重新恢复,即在调用 await_resume 之后,则在本库中称为“立即完成”。这不应与非挂起的可等待对象混淆,即从 await_ready 返回 true 的对象。

事件循环

由于 cobalt 中的协程可以 co_await 事件,因此它们需要在事件循环上运行。也就是说,另一段代码负责跟踪未完成的事件并恢复正在等待它们的恢复协程。这种模式非常常见,并且在 node.js 或 python 的 asyncio 中以类似的方式使用。

cobalt 使用 asio::io_context 作为其默认事件循环。也就是说,类 threadmainrun 函数在内部使用它。

你可以使用任何可以产生 asio::any_io_executor 的事件循环来配合这个库。最简单的方法是使用 spawn

事件循环通过执行器(遵循 asio 的术语)访问,并且可以使用 set_executor 手动设置。

导览

进入 cobalt 环境

为了使用 awaitables,我们需要能够 co_await 它们,即处于协程中。

我们有四种方法可以实现这一点:

cobalt/main.hpp

使用协程替换 int main

cobalt::main co_main(int argc, char* argv[])
{
    // co_await things here
    co_return 0;
}
cobalt/thread.hpp

为异步环境创建一个线程

cobalt::thread my_thread()
{
    // co_await things here
    co_return;
}

int main(int argc, char ** argv[])
{
    auto t = my_thread();
    t.join();
    return 0;
}
cobalt/task.hpp

创建一个任务并运行或派生它

cobalt::task<void> my_thread()
{
   // co_await things here
   co_return;
}

int main(int argc, char ** argv[])
{
    cobalt::run(my_task()); // sync
    asio::io_context ctx;
    cobalt::spawn(ctx, my_task(), asio::detached);
    ctx.run();
    return 0;
}

Promise

Promises 是推荐的默认协程类型。它们是即时的,因此易于用于临时的并发。

cobalt::promise<int> my_promise()
{
   co_await do_the_thing();
   co_return 0;
}

cobalt::main co_main(int argc, char * argv[])
{
    // start the promise here
    auto p = my_promise();
    // do something else here
    co_await do_the_other_thing();
    // wait for the promise to complete
    auto res = co_await p;

    co_return res;
}

Task

Tasks 是惰性的,这意味着它们在被等待或派生之前不会执行任何操作。

cobalt::task<int> my_task()
{
   co_await do_the_thing();
   co_return 0;
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the task here
    auto t = my_task();
    // do something else here first
    co_await do_the_other_thing();
    // start and wait for the task to complete
    auto res = co_await t;
    co_return res;
}

生成器

generator 是 cobalt 中唯一可以 co_yield 值的类型。

Generator 默认是即时的。与 std::generator 不同,cobalt::generator 可以 co_await,因此是异步的。

cobalt::generator<int> my_generator()
{
   for (int i = 0; i < 10; i++)
    co_yield i;
   co_return 10;
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator();
    while (g)
        printf("Generator %d\n", co_await g);
    co_return 0;
}

值可以被推入生成器,这些值将从 co_yield 返回。

cobalt::generator<double, int> my_eager_push_generator(int value)
{
   while (value != 0)
       value = co_yield value * 0.1;
   co_return std::numeric_limits<double>::quiet_NaN();
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator(5);

    assert(0.5 == co_await g(4)); // result of 5
    assert(0.4 == co_await g(3)); // result of 4
    assert(0.3 == co_await g(2)); // result of 3
    assert(0.2 == co_await g(1)); // result of 2
    assert(0.1 == co_await g(0)); // result of 1

    // we let the coroutine go out of scope while suspended
    // no need for another co_await of `g`

    co_return 0;
}

也可以使用 this_coro::initial 将协程设为惰性。

cobalt::generator<double, int> my_eager_push_generator()
{
    auto value = co_await this_coro::initial;
    while (value != 0)
        value = co_yield value * 0.1;
    co_return std::numeric_limits<double>::quiet_NaN();
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator(); // lazy, so the generator waits for the first pushed value
    assert(0.5 == co_await g(5)); // result of 5
    assert(0.4 == co_await g(4)); // result of 4
    assert(0.3 == co_await g(3)); // result of 3
    assert(0.2 == co_await g(2)); // result of 2
    assert(0.1 == co_await g(1)); // result of 1

    // we let the coroutine go out of scope while suspended
    // no need for another co_await of `g`

    co_return 0;
}

join

如果多个 awaitables 并行工作,可以使用 join 同时等待它们。

cobalt::promise<int> some_work();
cobalt::promise<double> more_work();

cobalt::main co_main(int argc, char * argv[])
{
    std::tuple<int, double> res = cobalt::join(some_work(), more_work());
    co_return 0;
}

race

如果多个 awaitables 并行工作,但我们希望在其中任何一个完成时收到通知,则应使用 race

cobalt::generator<int> some_data_source();
cobalt::generator<double> another_data_source();

cobalt::main co_main(int argc, char * argv[])
{
    auto g1 = some_data_source();
    auto g2 = another_data_source();

    int res1    = co_await g1;
    double res2 = co_await g2;

    printf("Result: %f", res1 * res2);

    while (g1 && g2)
    {
        switch(variant2::variant<int, double> nx = co_await cobalt::race(g1, g2))
        {
            case 0:
                res1 = variant2::get<0>(nx);
                break;
            case 1:
                res2 = variant2::get<1>(nx);
                break;
        }
        printf("New result: %f", res1 * res2);
    }

    co_return 0;
}
在这种情况下,race 不会导致任何数据丢失。

教程

delay

让我们从最简单的示例开始:一个简单的延迟。

example/delay.cpp
cobalt::main co_main(int argc, char * argv[]) (1)
{
  asio::steady_timer tim{co_await asio::this_coro::executor, (2)
                         std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
  co_await tim.async_wait(cobalt::use_op); (4)
  co_return 0; (5)
}
1 当使用 co_main 函数时,它会定义一个隐式的 main 函数,并且是设置环境以运行异步代码的最简单方法。
2 从当前协程 promise 获取执行器。
3 使用参数设置超时时间。
4 通过使用 cobalt::use_op 执行等待。
5 返回一个从隐式 main 返回的值。

在这个例子中,我们使用了 cobalt/main.hpp 头文件,如果定义了上面的 co_main,它会提供一个 main 协程。这有几个优点:

  • 环境设置正确(executor & memory

  • asio 被告知上下文是单线程的

  • 一个带有 SIGINT & SIGTERMasio::signal_set 会自动连接到取消操作(即 Ctrl+C 会导致取消)

然后,此协程在其 promise 中有一个执行器(promise 是 C++ 中协程状态的名称。不要与 cobalt/promise.hpp 混淆),我们可以通过 this_coro 命名空间中的虚拟 awaitable 获取该执行器。

然后,我们可以构造一个计时器,并使用 use_op 初始化 async_waitcobalt 提供了多种 co_await 的方式来与 asio 交互,其中 use_op 是最简单的。

echo 服务器

我们将到处使用 use_opasio completion)token,所以我们使用一个 默认完成 token,这样我们就可以跳过最后一个参数。

example/echo_server.cpp 声明
namespace cobalt = boost::cobalt;
using boost::asio::ip::tcp;
using boost::asio::detached;
using tcp_acceptor = cobalt::use_op_t::as_default_on_t<tcp::acceptor>;
using tcp_socket   = cobalt::use_op_t::as_default_on_t<tcp::socket>;
namespace this_coro = boost::cobalt::this_coro;

我们将 echo 函数编写为 promise 协程。它是一个即时协程,被推荐作为默认值;如果需要惰性协程,则可以使用 task

example/echo_server.cpp echo 函数
cobalt::promise<void> echo(tcp_socket socket)
{
  try (1)
  {
    char data[4096];
    while (socket.is_open()) (2)
    {
      std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data)); (3)
      co_await async_write(socket, boost::asio::buffer(data, n)); (4)
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo: exception: %s\n", e.what());
  }
}
1 当使用 use_op 完成 token 时,I/O 错误会被转换为 C++ 异常。此外,如果协程被取消(例如,因为用户按下了 Ctrl-C),也会引发异常。在这些条件下,我们会打印错误并退出循环。
2 我们运行循环直到被取消(异常)或用户关闭连接。
3 尽可能多地读取。
4 写入所有读取的字节。

注意,promise 是即时的。调用 echo 会立即执行代码,直到 async_read_some,然后将控制权返回给调用者。

接下来,我们还需要一个 acceptor 函数。在这里,我们使用 generator 来管理 acceptor 状态。这是一个可以多次 co_awaited 的协程,直到到达 co_return 表达式。

example/echo_server.cpp listen 函数
cobalt::generator<tcp_socket> listen()
{
  tcp_acceptor acceptor({co_await cobalt::this_coro::executor}, {tcp::v4(), 55555});
  for (;;) (1)
  {
    tcp_socket sock = co_await acceptor.async_accept(); (2)
    co_yield std::move(sock); (3)
  }
  co_return tcp_socket{acceptor.get_executor()}; (4)
}
1 取消操作也会导致此处从 co_await 抛出异常
2 异步接受连接
3 将其 yield 给等待协程
4 co_return 一个值以符合 C++ 规范。

有了这两个函数,我们现在可以编写服务器了

example/echo_server.cpp run_server 函数
cobalt::promise<void> run_server(cobalt::wait_group & workers)
{
  auto l = listen(); (1)
  while (true)
  {
    if (workers.size() == 10u)
      co_await workers.wait_one();  (2)
    else
      workers.push_back(echo(co_await l)); (3)
  }
}
1 构造监听器生成器协程。当对象被销毁时,协程将被取消,执行所有必需的清理操作。
2 当我们有超过 10 个 worker 时,我们等待一个 worker 完成
3 接受新连接并启动它。

wait_group 用于管理正在运行的 echo 函数。这个类将取消并等待正在运行的 echo 协程。

我们不需要对 listener 执行相同的操作,因为它会在 l 被销毁时自行停止。生成器的析构函数会取消它。

由于 promise 是即时的,因此只需调用它就足以启动。然后,我们将这些 promise 放入一个 wait_group 中,这使我们可以在作用域退出时关闭所有 worker。

example/echo_server.cpp co_main 函数
cobalt::main co_main(int argc, char ** argv)
{
  co_await cobalt::with(cobalt::wait_group(), &run_server); (1)
  co_return 0u;
}
1 在异步作用域中运行 run_server

上面显示的 with 函数将使用诸如 wait_group 之类的资源运行函数。在作用域退出时,with 将调用并 co_await 一个异步的拆卸函数。这会导致所有连接在 co_main 存在之前被正确关闭。

价格行情

为了演示 channels 和其他工具,我们需要一定的复杂性。为此,我们的项目是一个价格行情工具,它连接到 https://blockchain.info。然后,用户可以连接到 localhost 以查询给定的货币对,例如这样

wscat -c localhost:8080/btc/usd

首先,我们执行与 echo-server 相同的声明。

example/ticker.cpp 声明
using executor_type = cobalt::use_op_t::executor_with_default<cobalt::executor>;
using socket_type   = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;

下一步是编写一个函数来连接 ssl-stream,以上游连接

example/ticker.cpp 连接
cobalt::promise<asio::ssl::stream<socket_type>> connect(
        std::string host, boost::asio::ssl::context & ctx)
{
    asio::ip::tcp::resolver res{cobalt::this_thread::get_executor()};
    auto ep = co_await res.async_resolve(host, "https", cobalt::use_op); (1)

    asio::ssl::stream<socket_type> sock{cobalt::this_thread::get_executor(), ctx};
    co_await sock.next_layer().async_connect(*ep.begin()); (2)
    co_await sock.async_handshake(asio::ssl::stream_base::client); (3)

    co_return sock; (4)
}
1 查找主机
2 连接到端点
3 执行 ssl 握手
4 将 socket 返回给调用者

接下来,我们需要一个函数在现有的 ssl-stream 上执行 websocket 升级。

example/ticker.cpp connect_to_blockchain_info
cobalt::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
 ws.set_option(beast::websocket::stream_base::decorator(
     [](beast::websocket::request_type& req)
     {
       req.set(http::field::user_agent,
               std::string(BOOST_BEAST_VERSION_STRING) + " cobalt-ticker");
       req.set(http::field::origin,
               "https://exchange.blockchain.com"); (1)
     }));

 co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 blockchain.info 要求设置此标头。
2 执行 websocket 握手。

一旦 websocket 连接,我们希望持续接收 json 消息,为此,生成器是一个不错的选择。

example/ticker.cpp json_read
cobalt::generator<json::object> json_reader(websocket_type & ws)
try
{
    beast::flat_buffer buf;
    while (ws.is_open()) (1)
    {
        auto sz = co_await ws.async_read(buf); (2)
        json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
        auto obj = json::parse(data);
        co_yield obj.as_object(); (3)
        buf.consume(sz);
    }
    co_return {};
}
catch (std::exception & e)
{
  std::cerr << "Error reading: " << e.what() << std::endl;
  throw;
}
1 只要 socket 打开就保持运行
2 从 websocket 读取一个帧
3 解析并将其作为对象 co_yield

然后,这需要连接到订阅者,为此,我们将利用通道来传递原始 json。为了简化生命周期管理,订阅者将保留一个 shared_ptr,而生产者保留一个 weak_ptr

example/ticker.cpp 订阅类型
using subscription = std::pair<std::string, std::weak_ptr<cobalt::channel<json::object>>>;
using subscription_channel = std::weak_ptr<cobalt::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;

运行区块链连接器的主函数操作两个输入:来自 websocket 的数据和一个用于处理新订阅的通道。

example/ticker.cpp 运行 blockchain_info
cobalt::promise<void> run_blockchain_info(cobalt::channel<subscription> & subc)
try
{
    asio::ssl::context ctx{asio::ssl::context_base::tls_client};
    websocket_type ws{co_await connect("blockchain.info", ctx)};
    co_await connect_to_blockchain_info(ws); (1)

    subscription_map subs;
    std::list<std::string> unconfirmed;

    auto rd = json_reader(ws); (2)
    while (ws.is_open()) (3)
    {
      switch (auto msg = co_await cobalt::race(rd, subc.read()); msg.index()) (4)
      {
        case 0: (5)
          if (auto ms = get<0>(msg);
              ms.at("event") == "rejected") // invalid sub, cancel however subbed
            co_await handle_rejections(unconfirmed, subs, ms);
          else
            co_await handle_update(unconfirmed, subs, ms, ws);
        break;
        case 1: // (6)
            co_await handle_new_subscription(
                unconfirmed, subs,
                std::move(get<1>(msg)), ws);
        break;
      }
    }

    for (auto & [k ,c] : subs)
    {
        if (auto ptr = c.lock())
            ptr->close();
    }
}
catch(std::exception & e)
{
  std::cerr << "Exception: " << e.what() << std::endl;
  throw;
}
1 初始化连接
2 实例化 json_reader
3 只要 websocket 打开就运行
4 选择,即等待新的 json 消息或订阅
5 当它是 json 时,处理更新或拒绝
6 处理新的订阅消息

对于 cobalt 功能而言,handle_* 函数的内容并不重要,因此在本教程中跳过它。

handle_new_subscription 函数向 blockchain.info 发送一条消息,它将发回确认或拒绝。handle_rejectionhandle_update 将获取 json 值并将它们转发到订阅通道。

在消费者端,我们的服务器只会将数据转发给客户端。如果客户端输入数据,我们将立即关闭 websocket。我们使用 as_tuple 来忽略潜在的错误。

example/ticker.cpp 读取和关闭
cobalt::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
    system::error_code ec;
    co_await st.async_read(buf, asio::as_tuple(cobalt::use_op));
    co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(cobalt::use_op));
    st.next_layer().close(ec);
}

接下来,我们运行用户发送的会话

example/ticker.cpp run_session
cobalt::promise<void> run_session(beast::websocket::stream<socket_type> st,
                                 cobalt::channel<subscription> & subc)
try
{
    http::request<http::empty_body> req;
    beast::flat_buffer buf;
    co_await http::async_read(st.next_layer(), buf, req); (1)
    // check the target
    auto r = urls::parse_uri_reference(req.target());
    if (r.has_error() || (r->segments().size() != 2u)) (2)
    {
        http::response<http::string_body> res{http::status::bad_request, 11};
        res.body() = r.has_error() ? r.error().message() :
                    "url needs two segments, e.g. /btc/usd";
        co_await http::async_write(st.next_layer(), res);
        st.next_layer().close();
        co_return ;
    }

    co_await st.async_accept(req); (3)

    auto sym = std::string(r->segments().front()) + "-" +
               std::string(r->segments().back());
    boost::algorithm::to_upper(sym);
    // close when data gets sent
    auto p = read_and_close(st, std::move(buf)); (4)

    auto ptr = std::make_shared<cobalt::channel<json::object>>(1u); (5)
    co_await subc.write(subscription{sym, ptr}); (6)

    while (ptr->is_open() && st.is_open()) (7)
    {
      auto bb = json::serialize(co_await ptr->read());
      co_await st.async_write(asio::buffer(bb));
    }

    co_await st.async_close(beast::websocket::close_code::going_away,
                            asio::as_tuple(cobalt::use_op)); (8)
    st.next_layer().close();
    co_await p; (9)

}
catch(std::exception & e)
{
    std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 读取 http 请求,因为我们需要路径
2 检查路径,例如 /btc/usd
3 接受 websocket
4 如果消费者发送了任何内容,则开始读取并关闭
5 创建通道以接收更新
6 run_blockchain_info 发送订阅请求
7 当通道和 websocket 打开时,我们正在转发数据。
8 关闭 socket 并忽略错误
9 由于 websocket 此时肯定已关闭,请等待 read_and_close 关闭。

编写完 run_sessionrun_blockchain_info 后,我们可以继续进行 main

example/ticker.cpp main
cobalt::main co_main(int argc, char * argv[])
{
    acceptor_type acc{co_await cobalt::this_coro::executor,
                      asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
    std::cout << "Listening on localhost:8080" << std::endl;

    constexpr int limit = 10; // allow 10 ongoing sessions
    cobalt::channel<subscription> sub_manager; (1)

    co_await join( (2)
      run_blockchain_info(sub_manager),
      cobalt::with( (3)
        cobalt::wait_group(
            asio::cancellation_type::all,
            asio::cancellation_type::all),
        [&](cobalt::wait_group & sessions) -> cobalt::promise<void>
        {
          while (!co_await cobalt::this_coro::cancelled) (4)
          {
            if (sessions.size() >= limit) (5)
              co_await sessions.wait_one();

            auto conn = co_await acc.async_accept(); (6)
            sessions.push_back( (7)
                run_session(
                    beast::websocket::stream<socket_type>{std::move(conn)},
                    sub_manager));
          }
        })
      );

    co_return 0;
}
1 创建通道以管理订阅
2 使用 join 并行运行两个任务。
3 使用钴作用域来提供 wait_group
4 运行直到被取消。
5 当我们达到 limit 时,我们等待一个任务完成。
6 等待新连接。
7 将会话插入到 wait_group 中。

Main 使用 join,因为一个任务失败应该取消另一个任务。

delay op

到目前为止,我们使用了 use_op 来使用基于 asio 完成 token 机制的隐式操作。

但是,我们可以实现自己的操作,这些操作也可以利用 await_ready 优化。与立即完成不同,当 await_ready 返回 true 时,协程永远不会挂起。

为了利用此协程功能,cobalt 提供了一种创建可跳过操作的简单方法

example/delay_op.cpp
struct wait_op final : cobalt::op<system::error_code> (1)
{
  asio::steady_timer & tim;
  wait_op(asio::steady_timer & tim) : tim(tim) {}
  void ready(cobalt::handler<system::error_code> h ) override (2)
  {
    if (tim.expiry() < std::chrono::steady_clock::now())
      h(system::error_code{});
  }
  void initiate(cobalt::completion_handler<system::error_code> complete) override (3)
  {
    tim.async_wait(std::move(complete));
  }
};


cobalt::main co_main(int argc, char * argv[])
{
  asio::steady_timer tim{co_await asio::this_coro::executor,
                         std::chrono::milliseconds(std::stoi(argv[1]))};
  co_await wait_op(tim); (4)
  co_return 0; //
}
1 声明 op。我们继承 op 使其可等待。
2 预挂起检查在这里实现
3 如果需要,执行等待
4 像任何其他 awaitable 一样使用 op

这样,我们可以最大限度地减少协程挂起的次数。

虽然以上用法是配合 asio 使用的,但你也可以将这些处理程序用于任何其他基于回调的代码。

带推送值的生成器

带有推送值的协程并不常见,但可以显著简化某些问题。

由于我们在之前的示例中已经有了一个 json_reader,下面是如何编写一个可以接收推送值的 json_writer。

使用生成器的优势在于其内部状态管理。

cobalt::generator<system::error_code, json::object>
    json_writer(websocket_type & ws)
try
{
    char buffer[4096];
    json::serializer ser;

    while (ws.is_open()) (1)
    {
        auto val = co_yield system::error_code{}; (2)

        while (!ser.done())
        {
            auto sv = ser.read(buffer);
            co_await ws.cobalt_write({sv.data(), sv.size()}); (3)
        }

    }
    co_return {};
}
catch (system::system_error& e)
{
    co_return e.code();
}
catch (std::exception & e)
{
    std::cerr << "Error reading: " << e.what() << std::endl;
    throw;
}
1 只要 socket 打开就保持运行
2 co_yield 当前错误并检索一个新值。
3 向 websocket 写入一个帧。

现在我们可以像这样使用生成器

auto g = json_writer(my_ws);

extern std::vector<json::value> to_write;

for (auto && tw : std::move(to_write))
{
    if (auto ec = co_await g(std::move(tw)))
        return ec; // yield error
}

高级示例

更多示例仅以代码形式在存储库中提供。所有示例都列在下面。

表 5. 所有示例

example/http.cpp

执行单个 HTTP GET 请求的 HTTP 客户端。

example/outcome.cpp

使用 boost.outcome 协程类型。

example/python.cpp & example/python.py

使用 nanobind 将 cobalt 与 Python 集成。它使用 Python 的 asyncio 作为执行器,并允许 C++ 协程等待 Python 函数,反之亦然。

example/signals.cpp

boost.signals2 适配为可等待类型(单线程)。

example/spsc.cpp

创建一个基于 boost.lockfree 且可等待的 spsc_queue (多线程)。

example/thread.cpp

将工作线程与 asioconcurrent_channel 一起使用。

example/thread_pool.cpp

使用 asio::thread_pool 并向其生成 任务

example/delay.cpp

延迟 部分使用的示例

example/delay_op.cpp

延迟操作 部分使用的示例

example/echo_server.cpp

回显服务器 部分使用的示例

example/ticker.cpp

价格行情 部分使用的示例

example/channel.cpp

通道 参考部分使用的示例

设计

概念

这个库有两个基本概念

可等待对象 是一个可以在协程内部与 co_await 一起使用的表达式,例如:

co_await delay(50ms);

但是,协程的 promise 可以定义一个 await_transform,即,实际可以与 co_await 表达式一起使用的内容取决于协程。

因此,我们应该重新定义 可等待对象 的含义:**可等待对象** 是一个可以从不定义 await_transform 的协程 promise 中使用 co_await 的类型。

伪关键字 是可以在协程中使用的类型,由于其 promise 的 await_transform,它为协程添加了特殊功能。

this_coro 命名空间中的所有动词都是这样的伪关键字。

auto exec = co_await this_coro::executor;
此库公开了一组用于 promise 的 enable_* 基类,以简化自定义协程的创建。这包括 enable_awaitables,它提供了一个 await_transform,可以转发 可等待对象

本文档中的协程指的是异步协程,即,不考虑像 std::generator 这样的同步协程。

除了 main 之外的所有协程也都是实际的 可等待对象

执行器

由于所有内容都是异步的,因此该库需要使用事件循环。由于所有内容都是单线程的,因此可以假设每个线程只有一个执行器,这足以满足 97% 的用例。因此,有一个 thread_local 执行器被协程对象用作默认执行器(尽管以复制方式存储在协程 promise 中)。

同样,该库使用一种 executor 类型,默认为 asio::any_io_executor

如果你编写自己的协程,它应该持有执行器的副本,并具有一个通过常量引用返回执行器的 get_executor 函数。

使用 Strand

虽然可以使用 strand,但它们与 thread_local 执行器不兼容。这是因为它们可能会切换线程,因此它们不能是 thread_local

如果你希望使用 strand (例如,通过 spawn),则必须手动为任何 promise生成器通道 分配执行器。

通道 的情况下,这是一个构造函数参数,但对于其他协程类型,需要使用 asio::executor_arg。这是通过在协程的参数列表中直接使用 asio::executor_arg_t(在某个位置)后跟要使用的执行器来完成的,例如:

cobalt::promise<void> example_with_executor(int some_arg, asio::executor_arg_t, cobalt::executor);

这样,协程的 promise 可以从第三个参数中获取执行器,而不是默认使用 thread_local 执行器。

当然,如果有时使用 thread_local 执行器,则可以默认参数,以使其不那么不方便。

cobalt::promise<void> example_with_executor(int some_arg,
                                           asio::executor_arg_t = asio::executor_arg,
                                           cobalt::executor = cobalt::this_thread::get_executor());

如果在 strand 上省略此操作,则会抛出 asio::bad_allocator 类型的异常,或者更糟糕的是,会使用错误的执行器。

多态内存资源

类似地,该库使用 thread_local 的 pmr::memory_resource 来分配协程帧以及用作异步操作的分配器。

原因是,用户可能希望自定义分配,例如,避免锁、限制内存使用或监视使用情况。pmr 允许我们在不引入不必要的模板参数的情况下实现此目的,即,没有 promise<T, Allocator> 的复杂性。但是,使用 pmr 确实会引入一些最小的开销,因此用户可以选择通过定义 BOOST_COBALT_NO_PMR 来禁用它。

op 使用针对 asio 的分配器用法优化的内部资源,而 gatherracejoin 使用单调资源来最小化分配。两者仍然可以使用定义的 BOOST_COBALT_NO_PMR,在这种情况下,它们将使用 new/delete 作为上游分配。

mainthread 在启用 PMR 的情况下,每个线程使用一个 pmr::unsynchronized_pool_resource

如果你编写自己的协程,它应该具有一个返回 pmr::polymorphic_allocator<void> 的 get_allocator 函数。

取消

cobalt 使用基于 asio::cancellation_signal 的隐式取消。这主要隐式使用(例如,与 race 一起使用),因此在示例中很少有显式使用。

如果你编写自定义协程,它必须从 get_cancellation_slot 函数返回一个 cancellation_slot,以便能够取消其他操作。
如果你编写自定义的可等待对象,则可以在 await_suspend 中使用该函数来接收取消信号。

Promise

主要的协程类型是 promise,它是急切的。默认为此类型的原因是,编译器可以优化掉不挂起的 promise,如下所示:

cobalt::promise<void> noop()
{
  co_return;
}

理论上,等待上述操作是空操作,但实际上,到 2023 年为止,编译器还没有达到这种程度。

Race

最重要的同步机制是 race 函数。

它以伪随机顺序等待多个 可等待对象,并将返回第一个完成的结果,然后忽略其余的结果。

也就是说,它以伪随机顺序启动 co_await,并在发现一个可等待对象准备就绪或立即完成时停止。

cobalt::generator<int> gen1();
cobalt::generator<double> gen2();

cobalt::promise<void> p()
{
  auto g1 = gen1();
  auto g2 = gen2();
  while (!co_await cobalt::this_coro::cancelled)
  {
    switch(auto v = co_await race(g1, g2); v.index())
    {
    case 0:
      printf("Got int %d\n", get<0>(v));
      break;
    case 1:
      printf("Got double %f\n", get<1>(v));
      break;
    }
  }
}

但是,一旦 race 启动 co_await,它必须在内部等待所有可等待对象完成。因此,一旦第一个 可等待对象 完成,它会尝试 中断 其余的,如果失败则取消它们。

race 是触发取消的首选方式,例如:

cobalt::promise<void> timeout();
cobalt::promise<void> work();

race(timeout(), work());

interrupt_await

如果它天真地取消,则会丢失数据。因此,引入了 interrupt_await 的概念,该概念告诉可等待对象(支持它的)立即恢复等待器并返回或抛出忽略的值。

可中断等待对象的示例
struct awaitable
{
   bool await_ready() const;

   template<typename Promise>
   std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);

   T await_resume();

   void interrupt_await() &;
};

如果 interrupt_await 没有导致立即恢复(h),则 race 将发送取消信号。

race 使用正确的引用限定符应用这些

auto g = gen1();
race(g, gen2());

如果可用,以上代码将为 g1 调用 interrupt_await() & 函数,为 g2 调用 interrupt_await() && 函数。

一般来说,cobalt 中的协程支持左值中断,即 interrupt_await() &通道 操作是不限定的,即在两种情况下都有效。

joingather 将转发中断,即,只有在 gen2() 先完成时,才会中断 g1g2

关联器

cobalt 使用 asio 的 关联器 概念,但对其进行了简化。也就是说,它有三个关联器,它们是等待 promise 的成员函数。

  • const executor_type & get_executor()(始终为 executor,必须按常量引用返回)

  • allocator_type get_allocator()(始终为 pmr::polymorphic_allocator<void>

  • cancellation_slot_type get_cancellation_slot()(必须具有与 asio::cancellation_slot 相同的 IF)

cobalt 使用概念来检查这些概念是否存在于其 await_suspend 函数中。

这样,自定义协程可以支持取消、执行器和分配器。

在自定义的可等待对象中,你可以像这样获取它们

struct my_awaitable
{
    bool await_ready();
    template<typename T>
    void await_suspend(std::corutine_handle<P> h)
    {
        if constexpr (requires  (Promise p) {p.get_executor();})
            handle_executor(h.promise().get_executor();

        if constexpr (requires (Promise p) {p.get_cancellation_slot();})
            if ((cl = h.promise().get_cancellation_slot()).is_connected())
                cl.emplace<my_cancellation>();
    }

    void await_resume();
};

取消在 co_await 表达式中连接(如果协程和可等待对象支持),包括像 race 这样的同步机制。

线程

此库在设计上是单线程的,因为这简化了恢复,从而更有效地处理了像 race 这样的同步。race 需要锁定每个竞争的可等待对象以避免数据丢失,这将需要阻塞,并且随着每个附加元素的增加而变得更糟。

你不能让任何协程在与创建协程不同的线程上恢复,除了 任务(例如,使用 spawn)。

主要的技术原因是,切换协程最有效的方法是从 await_suspend 返回新协程的句柄,如下所示:

struct my_awaitable
{
    bool await_ready();
    std::coroutine_handle<T> await_suspend(std::coroutine_handle<U>);
    void await_resume();
};

在这种情况下,等待协程将在调用 await_suspend 之前挂起,并且恢复返回的协程。如果我们需要通过执行器,这当然不起作用。

这不仅适用于等待的协程,也适用于通道。此库中的通道使用可等待对象的侵入式列表,并且可能会从写操作的 await_suspend 返回读取(因此挂起)协程的句柄。

参考

cobalt/main.hpp

开始使用钴应用程序的最简单方法是使用具有以下签名的 co_main 函数

cobalt::main co_main(int argc, char *argv[]);

声明 co_main 将添加一个 main 函数,该函数执行在事件循环上运行协程的所有必要步骤。这使我们可以编写非常简单的异步程序。

cobalt::main co_main(int argc, char *argv[])
{
  auto exec = co_await cobalt::this_coro::executor;             (1)
  asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
  co_await tim.async_wait(cobalt::use_op);                      (3)
  co_return 0;
}
1 获取 main 运行的执行器
2 将其与 asio 对象一起使用
3 co_await 一个钴操作

主 promise 将创建一个 asio::signal_set 并将其用于取消。SIGINT 变为全部取消,而 SIGTERM 变为终端取消。

取消不会转发到分离的协程。用户需要注意在取消时结束它们,否则程序不允许正常终止。

执行器

它还会创建一个要运行的 asio::io_context,你可以通过 this_coro::executor 获取它。它将被分配给 cobalt::this_thread::get_executor()

内存资源

它还会创建一个内存资源,该资源将用作内部内存分配的默认值。它将被分配给 thread_localcobalt::this_thread::get_default_resource()

Promise

每个协程都有一个内部状态,称为 promise(不要与 cobalt::promise 混淆)。根据协程的属性,可以 co_await 不同的内容,就像我们在上面的示例中使用的一样。

它们通过继承来实现,并在不同的 promise 类型之间共享

主 promise 具有以下属性。

规范

  1. 声明 co_main 将隐式声明一个 main 函数

  2. 仅当定义了 co_main 时,才存在 main

  3. SIGINTSIGTERM 将导致取消内部任务。

cobalt/promise.hpp

promise 是一个急切的协程,可以 co_awaitco_return 值。也就是说,它不能使用 co_yield

cobalt::promise<void> delay(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
}

cobalt::main co_main(int argc, char *argv[])
{
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}

promise 默认是附加的。这意味着,当 promise 句柄超出范围时,会发送取消信号。

可以通过调用 detach 或使用前缀 + 运算符来分离 Promise。这是一种在运行时替代使用 detached 的方法。分离的 Promise 在销毁时不会发送取消信号。

cobalt::promise<void> my_task();

cobalt::main co_main(int argc, char *argv[])
{
  +my_task(); (1)
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}
1 通过使用 +,任务会被分离。如果没有它,编译器会生成一个 nodiscard 警告。

执行器

执行器从 thread_localget_executor 函数获取,除非在任何位置使用了 asio::executor_arg,后跟执行器参数。

cobalt::promise<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

内存资源

内存资源从 thread_localget_default_resource 函数获取,除非在任何位置使用了 std::allocator_arg,后跟 polymorphic_allocator 参数。

cobalt::promise<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概述

template<typename Return>
struct [[nodiscard]] promise
{
    promise(promise &&lhs) noexcept;
    promise& operator=(promise && lhs) noexcept;

    // enable `co_await`. (1)
    auto operator co_await ();

    // Ignore the return value, i.e. detach it. (2)
    void operator +() &&;

    // Cancel the promise.
    void cancel(asio::cancellation_type ct = asio::cancellation_type::all);

    // Check if the result is ready
    bool ready() const;
    // Check if the promise can be awaited.
    explicit operator bool () const; (3)

    // Detach or attach
    bool attached() const;
    void detach();
    void attach();
    // Create an already completed promimse

    static promise

    // Get the return value. If !ready() this function has undefined behaviour.
    Return get();
};
1 支持 中断等待
2 这允许通过简单的 +my_task() 表达式创建并行运行的 Promise。
3 这允许像 while (p) co_await p; 这样的代码。

Promise

协程 Promise(promise::promise_type)具有以下属性。

生成器是一个“渴望”协程,可以 co_awaitco_yield 值给调用者。

cobalt::generator<int> example()
{
  printf("In coro 1\n");
  co_yield 2;
  printf("In coro 3\n");
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main 1\n");
  printf("In main %d\n", co_await f);
  printf("In main %d\n", co_await f);
  return 0;
}

这将生成以下输出

In main 0
In coro 1
In main 1
In main 2
In coro 3
In main 4
generators1

Push(第二个模板参数)设置为非 void 时,可以将值推送到生成器。

cobalt::generator<int, int> example()
{
  printf("In coro 1\n");
  int i =  co_yield 2;
  printf("In coro %d\n", i);
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main %d\n", co_await f(3)); (1)
  co_return 0;
}
1 推送的值通过 operator() 传递给 co_yield 的结果。

这将生成以下输出

In main 0
In coro 1
In main 2
In coro 3

Lazy

可以通过等待 initial 来将生成器变为惰性。这个 co_await 表达式会产生 Push 值。这意味着生成器将等待直到第一次被等待,然后处理新推送的值,并在下一个 co_yield 处恢复。

cobalt::generator<int, int> example()
{
  int v = co_await cobalt::this_coro::initial;
  printf("In coro %d\n", v);
  co_yield 2;
  printf("In coro %d\n", v);
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main 1\n"); // < this is now before the co_await initial
  printf("In main %d\n", co_await f(1));
  printf("In main %d\n", co_await f(3));
  return 0;
}

这将生成以下输出

In main 0
In main 1
In coro 1
In main 2
In coro 3
In main 4
generators2

执行器

执行器从 thread_localget_executor 函数获取,除非在任何位置使用了 asio::executor_arg,后跟执行器参数。

cobalt::generator<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

内存资源

内存资源从 thread_localget_default_resource 函数获取,除非在任何位置使用了 std::allocator_arg,后跟 polymorphic_allocator 参数。

cobalt::generator<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概述

template<typename Yield, typename Push = void>
struct [[nodiscard]] generator
{
  // Movable

  generator(generator &&lhs) noexcept = default;
  generator& operator=(generator &&) noexcept;

  // True until it co_returns & is co_awaited after (1)
  explicit operator bool() const;

  // Cancel the generator. (3)
  void cancel(asio::cancellation_type ct = asio::cancellation_type::all);

  // Check if a value is available
  bool ready() const;

  // Get the returned value. If !ready() this function has undefined behaviour.
  Yield get();

  // Cancel & detach the generator.
  ~generator();

  // an awaitable that results in value of Yield.
  using generator_awaitable = unspecified;

  // Present when Push != void
  generator_awaitable operator()(      Push && push);
  generator_awaitable operator()(const Push &  push);

  // Present when Push == void, i.e. can co_await the generator directly.
  generator_awaitable operator co_await (); (2)

};
1 这允许像 while (gen) co_await gen: 这样的代码。
2 支持 中断等待
3 取消的生成器可能是可恢复的。

cobalt/task.hpp

任务是一个“惰性”协程,可以 co_awaitco_return 值。也就是说,它不能使用 co_yield

cobalt::task<void> delay(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
}

cobalt::main co_main(int argc, char *argv[])
{
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}

promise 不同,任务可以在创建它的执行器之外的其他执行器上等待或生成。

执行器

由于 task 是惰性的,因此它在构造时不需要执行器。它会尝试从调用者或等待者那里获取,如果存在的话。否则,它将默认为 thread_local 执行器。

内存资源

内存资源thread_localget_default_resource 函数获取,而是从 pmr::get_default_resource() 获取,除非在任何位置使用了 std::allocator_arg,后跟 polymorphic_allocator 参数。

cobalt::task<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概述

template<typename Return>
struct [[nodiscard]] task
{
    task(task &&lhs) noexcept = default;
    task& operator=(task &&) noexcept = default;

    // enable `co_await`
    auto operator co_await ();

};
可以通过调用 run(my_task()) 从同步函数同步使用任务。

use_task

use_task 完成令牌可用于从 cobalt_ 函数创建任务。这比 use_op 效率低,因为它需要分配一个协程帧,但具有更简单的返回类型并支持 中断等待

cobalt/detached.hpp

detached 是一个“渴望”协程,可以 co_await 但不能 co_return 值。也就是说,它不能被恢复,通常也不被等待。

cobalt::detached delayed_print(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
  printf("Hello world\n");
}

cobalt::main co_main(int argc, char *argv[])
{
  delayed_print();
  co_return 0;
}

Detached 用于在后台轻松运行协程。

cobalt::detached my_task();

cobalt::main co_main(int argc, char *argv[])
{
  my_task(); (1)
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}
1 生成 detached 协程。

detached 可以像这样为自己分配一个新的取消源

cobalt::detached my_task(asio::cancellation_slot sl)
{
   co_await this_coro::reset_cancellation_source(sl);
   // do somework
}

cobalt::main co_main(int argc, char *argv[])
{
  asio::cancellation_signal sig;
  my_task(sig.slot()); (1)
  co_await delay(std::chrono::milliseconds(50));
  sig.emit(asio::cancellation_type::all);
  co_return 0;
}

执行器

执行器从 thread_localget_executor 函数获取,除非在任何位置使用了 asio::executor_arg,后跟执行器参数。

cobalt::detached my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

内存资源

内存资源从 thread_localget_default_resource 函数获取,除非在任何位置使用了 std::allocator_arg,后跟 polymorphic_allocator 参数。

cobalt::detached my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概述

struct detached {};
1 支持 中断等待

Promise

线程 detached 具有以下属性。

cobalt 中的操作是一个包装 asio 操作的 可等待对象

use_op

use_op 令牌是直接创建 op 的方式,即使用 cobalt::use_op 作为完成令牌将创建所需的可等待对象。

auto tim = cobalt::use_op.as_default_on(asio::steady_timer{co_await cobalt::this_coro::executor});
co_await tim.async_wait();

根据完成签名,co_await 表达式可能会抛出异常。

签名 返回类型 异常

void()

void

noexcept

void(T)

T

noexcept

void(T…​)

std::tuple<T…​>

noexcept

void(system::error_code, T)

T

system::system_error

void(system::error_code, T…​)

std::tuple<T…​>

system::system_error

void(std::exception_ptr, T)

T

任何异常

void(std::exception_ptr, T…​)

std::tuple<T…​>

任何异常

use_op 永远不会立即完成,即 await_ready 将始终返回 false,但始终会挂起协程。

手动编码操作

操作是 [cobalt_operation] 功能的更高级实现。

该库使得可以轻松创建具有早期完成条件的异步操作,即避免协程挂起的条件。

例如,我们可以创建一个 wait_op,如果计时器已经过期,则什么也不做。

struct wait_op : cobalt::op<system::error_code> (1)
{
  asio::steady_timer & tim;

  wait_op(asio::steady_timer & tim) : tim(tim) {}

  bool ready(cobalt::handler<system::error_code> ) (2)
  {
    if (tim.expiry() < std::chrono::steady_clock::now())
        h(system::error_code{});
  }
  void initiate(cobalt::completion_handler<system::error_code> complete) (3)
  {
    tim.async_wait(std::move(complete));
  }
};
1 继承具有匹配签名的 opawait_transform 会将其拾取。
2 检查操作是否就绪 - 从 await_ready 调用。
3 如果操作尚未就绪,则启动操作。

cobalt/concepts.hpp

可等待

可等待对象是一个可以与 co_await 一起使用的表达式。

template<typename Awaitable, typename Promise = void>
concept awaitable_type = requires (Awaitable aw, std::coroutine_handle<Promise> h)
{
    {aw.await_ready()} -> std::convertible_to<bool>;
    {aw.await_suspend(h)};
    {aw.await_resume()};
};

template<typename Awaitable, typename Promise = void>
concept awaitable =
        awaitable_type<Awaitable, Promise>
    || requires (Awaitable && aw) { {std::forward<Awaitable>(aw).operator co_await()} -> awaitable_type<Promise>;}
    || requires (Awaitable && aw) { {operator co_await(std::forward<Awaitable>(aw))} -> awaitable_type<Promise>;};
此库中的 可等待对象 要求协程 Promise 通过常量引用返回它们的执行器(如果它们提供执行器)。否则,它将使用 this_thread::get_executor()

启用可等待对象

继承 enable_awaitables 将使协程能够通过 await_transform 等待任何在没有任何 await_transform 的情况下可被 co_await 的对象。

cobalt/this_coro.hpp

this_coro 命名空间提供了访问协程 Promise 内部状态的实用程序。

伪可等待对象

// Awaitable type that returns the executor of the current coroutine.
struct executor_t {}
constexpr executor_t executor;

// Awaitable type that returns the cancellation state of the current coroutine.
struct cancellation_state_t {};
constexpr cancellation_state_t cancellation_state;

// Reset the cancellation state with custom or default filters.
constexpr unspecified reset_cancellation_state();
template<typename Filter>
constexpr unspecified reset_cancellation_state(
    Filter && filter);
template<typename InFilter, typename OutFilter>
constexpr unspecified reset_cancellation_state(
    InFilter && in_filter,
    OutFilter && out_filter);

// get & set the throw_if_cancelled setting.
unspecified throw_if_cancelled();
unspecified throw_if_cancelled(bool value);

// Set the cancellation source in a detached.
unspecified reset_cancellation_source();
unspecified reset_cancellation_source(asio::cancellation_slot slot);


// get the allocator the promise
struct allocator_t {};
constexpr allocator_t allocator;

// get the current cancellation state-type
struct cancelled_t {};
constexpr cancelled_t cancelled;

// set the over-eager mode of a generator
struct initial_t {};
constexpr initial_t initial;

Await Allocator

支持 enable_await_allocator 的协程的分配器可以通过以下方式获得

co_await cobalt::this_coro::allocator;

为了在您自己的协程中启用此功能,可以使用 CRTP 模式继承 enable_await_allocator

struct my_promise : cobalt::enable_await_allocator<my_promise>
{
  using allocator_type = __your_allocator_type__;
  allocator_type get_allocator();
};
如果可用,则分配器将由 use_op 使用。

Await 执行器

支持 enable_await_executor 的协程的执行器可以通过以下方式获得

co_await cobalt::this_coro::executor;

为了在您自己的协程中启用此功能,可以使用 CRTP 模式继承 enable_await_executor

struct my_promise : cobalt::enable_await_executor<my_promise>
{
  using executor_type = __your_executor_type__;
  executor_type get_executor();
};
如果可用,则执行器将由 use_op 使用。

Await deferred

您的协程 Promise 可以继承 enable_await_deferred,以便在 co_await 表达式中使用单个签名 asio::deferred

由于 asio::deferred 现在是默认的完成令牌,因此这允许以下代码,而无需指定任何完成令牌或其他特殊化。

asio::steady_timer t{co_await cobalt::this_coro::executor};
co_await t.async_wait();

内存资源基类

Promise 的 promise_memory_resource_base 基类将在 Promise 中提供一个从默认资源或在 std::allocator_arg 参数之后传递的资源中获取的 get_allocator。同样,它将添加 operator new 重载,以便协程在其帧分配中使用相同的内存资源。

如果已取消则抛出异常

promise_throw_if_cancelled_base 提供了基本选项,允许操作在另一个实际的 可等待对象 被等待时启用协程抛出异常。

co_await cobalt::this_coro::throw_if_cancelled;

取消状态

promise_cancellation_base 提供了基本选项,允许操作启用协程具有可通过 reset_cancellation_state 重置的 cancellation_state

co_await cobalt::this_coro::reset_cancellation_state();

为了方便起见,还有一个检查当前取消状态的快捷方式

asio::cancellation_type ct = (co_await cobalt::this_coro::cancellation_state).cancelled();
asio::cancellation_type ct = co_await cobalt::this_coro::cancelled; // same as above

cobalt/this_thread.hpp

由于一切都是单线程的,因此此库为每个线程提供了一个执行器和默认内存资源。

namespace boost::cobalt::this_thread
{

pmr::memory_resource* get_default_resource() noexcept; (1)
pmr::memory_resource* set_default_resource(pmr::memory_resource* r) noexcept; (2)
pmr::polymorphic_allocator<void> get_allocator(); (3)

typename asio::io_context::executor_type & get_executor(); (4)
void set_executor(asio::io_context::executor_type exec) noexcept; (5)

}
1 获取默认资源 - 除非设置,否则将为 pmr::get_default_resource。
2 设置默认资源 - 返回之前设置的资源。
3 获取包装 (1) 的分配器。
4 获取线程的执行器 - 如果未设置则抛出异常。
5 设置当前线程的执行器。

协程将使用这些作为默认值,但保留一份副本以防万一。

唯一的例外是 cobalt-operation 的初始化,它将使用 this_thread::executor 从中重新抛出。

cobalt/channel.hpp

通道可用于在单个线程上的不同协程之间交换数据。

概述

通道大纲
template<typename T>
struct channel
{
  // create a channel with a buffer limit, executor & resource.
  explicit
  channel(std::size_t limit = 0u,
          executor executor = this_thread::get_executor(),
          pmr::memory_resource * resource = this_thread::get_default_resource());
  // not movable.
  channel(channel && rhs) noexcept = delete;
  channel & operator=(channel && lhs) noexcept = delete;

  using executor_type = executor;
  const executor_type & get_executor();

  // Closes the channel
  ~channel();
  bool is_open() const;
  // close the operation, will cancel all pending ops, too
  void close();

  // an awaitable that yields T
  using read_op = unspecified;

  // an awaitable that yields void
  using write_op = unspecified;

  // read a value to a channel
  read_op  read();

  // write a value to the channel
  write_op write(const T  && value);
  write_op write(const T  &  value);
  write_op write(      T &&  value);
  write_op write(      T  &  value);

  // write a value to the channel if T is void

};

描述

通道是两个协程进行通信和同步的工具。

const std::size_t buffer_size = 2;
channel<int> ch{exec, buffer_size};

// in coroutine (1)
co_await ch.write(42);

// in coroutine (2)
auto val = co_await ch.read();
1 将值发送到通道 - 将阻塞直到可以发送为止。
2 从通道读取值 - 将阻塞直到值可等待为止。

这两个操作都可能阻塞,具体取决于通道缓冲区大小。

如果缓冲区大小为零,则需要同时进行 readwrite,即充当 rendezvous。

如果缓冲区未满,则写入操作不会挂起协程;同样,如果缓冲区不为空,则读取操作不会挂起。

如果两个操作同时完成(就像空缓冲区的情况一样),则第二个操作将被发布到执行器以供稍后完成。

通道类型可以是 void,在这种情况下,write 不带参数。

通道操作可以取消而不会丢失数据。这使得它们可以与 race 一起使用。

generator<variant2::variant<int, double>> merge(
    channel<int> & c1,
    channel<double> & c2)
{
    while (c1 && c2)
       co_yield co_await race(c1, c2);
}

示例

cobalt::promise<void> producer(cobalt::channel<int> & chan)
{
  for (int i = 0; i < 4; i++)
    co_await chan.write(i);

  chan.close();
}

cobalt::main co_main(int argc, char * argv[])
{
  cobalt::channel<int> c;

  auto p = producer(c);
  while (c.is_open())
    std::cout << co_await c.read() << std::endl;

  co_await p;
  co_return 0;
}

此外,还提供了一个 channel_reader,使读取通道更方便,并且可以与 BOOST_COBALT_FOR 一起使用。

cobalt::main co_main(int argc, char * argv[])
{
  cobalt::channel<int> c;

  auto p = producer(c);
  BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
    std::cout << value << std::endl;

  co_await p;
  co_return 0;
}

cobalt/with.hpp

with 机制提供了一种执行协程异步销毁的方法。也就是说,它类似于异步析构函数调用。

struct my_resource
{
  cobalt::promise<void> await_exit(std::exception_ptr e);
};

cobalt::promise<void> work(my_resource & res);

cobalt::promise<void> outer()
{
  co_await cobalt::with(my_resource(), &work);
}

可以通过提供 await_exit 成员函数或返回 可等待对象tag_invoke 函数,或者通过将销毁作为 with 的第三个参数来完成销毁。

using ws_stream = beast::websocket::stream<asio::ip::tcp::socket>>;
cobalt::promise<ws_stream> connect(urls::url); (1)
cobalt::promise<void>   disconnect(ws_stream &ws); (2)

auto teardown(const boost::cobalt::with_exit_tag & wet , ws_stream & ws, std::exception_ptr e)
{
  return disconnect(ws);
}

cobalt::promise<void> run_session(ws_stream & ws);

cobalt::main co_main(int argc, char * argv[])
{
  co_await cobalt::with(co_await connect(argv[1]), &run_session, &teardown);
  co_return 0;
}
1 实现 websocket 连接和 websocket 初始化。
2 实现有序关闭。
如果作用域在没有异常的情况下退出,则 std::exception_ptr 为 null。注意:exit 函数通过引用获取 exception_ptr 并对其进行修改是合法的。

cobalt/race.hpp

race 函数可用于从一组 可等待对象co_await 一个。

可以将其作为具有多个 可等待对象 的可变参数函数调用,也可以在 可等待对象 的范围内调用。

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_wait()
{
  co_await cobalt::race(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::race(aws); (2)
}
1 等待一组可变参数的 可等待对象
2 等待一个 可等待对象 的向量。

race 的第一个参数可以是 均匀随机位生成器

race 的签名
extern promise<void> pv1, pv2;
std::vector<promise<void>> pvv;

std::default_random_engine rdm{1};
// if everything returns void race returns the index
std::size_t r1 = co_await race(pv1, pv2);
std::size_t r2 = co_await race(rdm, pv1, pv2);
std::size_t r3 = co_await race(pvv);
std::size_t r4 = co_await race(rdm, pvv);

// variant if not everything is void. void become monostate
extern promise<int> pi1, pi2;
variant2::variant<monostate, int, int> r5 = co_await race(pv1, pi1, pi2);
variant2::variant<monostate, int, int> r6 = co_await race(rdm, pv1, pi1, pi2);

// a range returns a pair of the index and the result if non-void
std::vector<promise<int>> piv;
std::pair<std::size_t, int> r7 = co_await race(piv);
std::pair<std::size_t, int> r8 = co_await race(rdm, piv);

中断等待

当参数作为右值引用传递时,race 将尝试在 可等待对象 上使用 .interrupt_await 来通知可等待对象立即完成并且结果将被忽略。如果支持,可等待对象 必须在 interrupt_await 返回之前恢复等待协程。如果 race 没有检测到该函数,则会发送取消。

这意味着你可以像这样重用 race

cobalt::promise<void> do_wait()
{
  auto t1 = task1();
  auto t2 = task2();
  co_await cobalt::race(t1, t2); (1)
  co_await cobalt::race(t1, t2); (2)
}
1 等待第一个任务完成。
2 等待另一个任务完成。

promise生成器gather 支持此功能。

race 将调用 可等待对象 的函数,就像在 co_await 表达式中使用一样,或者根本不评估它们。

left_race

left_race 函数类似于 race,但遵循严格的从左到右的扫描。这可能会导致饥饿问题,这就是为什么这不是推荐的默认值,但如果在适当的情况下小心处理,则可以用于优先级排序。

概述

// Concept for the random number generator.
template<typename G>
  concept uniform_random_bit_generator =
    requires ( G & g)
    {
      {typename std::decay_t<G>::result_type() } -> std::unsigned_integral; // is an unsigned integer type
      // T	Returns the smallest value that G's operator() may return. The value is strictly less than G::max(). The function must be constexpr.
      {std::decay_t<G>::min()} -> std::same_as<typename std::decay_t<G>::result_type>;
      // T	Returns the largest value that G's operator() may return. The value is strictly greater than G::min(). The function must be constexpr.
      {std::decay_t<G>::max()} -> std::same_as<typename std::decay_t<G>::result_type>;
      {g()} -> std::same_as<typename std::decay_t<G>::result_type>;
    } && (std::decay_t<G>::max() > std::decay_t<G>::min());


// Variadic race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
         uniform_random_bit_generator URBG, awaitable ... Promise>
awaitable race(URBG && g, Promise && ... p);

// Ranged race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
         uniform_random_bit_generator URBG, range<awaitable> PromiseRange>
awaitable race(URBG && g, PromiseRange && p);

// Variadic race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable race(Promise && ... p);

// Ranged race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable race(PromiseRange && p);

// Variadic left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable left_race(Promise && ... p);

// Ranged left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable left_race(PromiseRange && p);
选择空范围将导致抛出异常。

cobalt/gather.hpp

gather 函数可用于同时 co_await 多个 可等待对象,并传递取消信号。

该函数将收集所有完成,并将其作为 system::result 返回,即捕获作为值概念。一个可等待对象抛出异常不会取消其他对象。

它可以被调用为具有多个 Awaitable 的可变参数函数,也可以在 awaitable 的范围内调用。

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_gather()
{
  co_await cobalt::gather(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::gather(aws); (2)
}
1 等待一组可变参数的 可等待对象
2 等待一个 awaitable 的向量

gather 将调用 awaitable 的函数,就像在 co_await 表达式中使用一样。

gather 的签名
extern promise<void> pv1, pv2;
std::tuple<system::result<int>, system::result<int>> r1 = co_await gather(pv1, pv2);

std::vector<promise<void>> pvv;
pmr::vector<system::result<void>> r2 =  co_await gather(pvv);

extern promise<int> pi1, pi2;
std::tuple<system::result<monostate>,
           system::result<monostate>,
           system::result<int>,
           system::result<int>> r3 = co_await gather(pv1, pv2, pi1, pi2);

std::vector<promise<int>> piv;
pmr::vector<system::result<int>> r4 = co_await gather(piv);

概述

// Variadic gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable gather(Promise && ... p);

// Ranged gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable gather(PromiseRange && p);

cobalt/join.hpp

join 函数可以用于一次 co_await 多个 awaitable,并具有正确连接的取消操作。

该函数将收集所有完成并将其作为值返回,除非抛出异常。如果抛出异常,所有未完成的操作将被取消(如果可能,则中断),并重新抛出第一个异常。

void 将在元组中以 variant2::monostate 的形式返回,除非所有 awaitable 都产生 void。

它可以被调用为具有多个 Awaitable 的可变参数函数,也可以在 awaitable 的范围内调用。

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_join()
{
  co_await cobalt::join(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::join(aws); (2)
}
1 等待一组可变参数的 可等待对象
2 等待一个 awaitable 的向量

join 将调用 awaitable 的函数,就像在 co_await 表达式中使用一样。

join 的签名
extern promise<void> pv1, pv2;
/* void */ co_await join(pv1, pv2);

std::vector<promise<void>> pvv;
/* void */ co_await join(pvv);

extern promise<int> pi1, pi2;
std::tuple<monostate, monostate, int, int> r1 = co_await join(pv1, pv2, pi1, pi2);

std::vector<promise<int>> piv;
pmr::vector<int> r2 = co_await join(piv);

概述

// Variadic join
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable join(Promise && ... p);

// Ranged join
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable join(PromiseRange && p);
在空范围上选择将导致异常。

cobalt/wait_group.hpp

wait_group 函数可用于管理类型为 promise<void> 的多个协程。通过使用匹配的 await_exit 成员,它可以与 cobalt/with.hpp 一起开箱即用。

本质上,wait_group 是一个动态的 promise 列表,它具有 race 函数 (wait_one)、gather 函数 (wait_all),并且会在作用域退出时清理。

struct wait_group
{
    // create a wait_group
    explicit
    wait_group(asio::cancellation_type normal_cancel = asio::cancellation_type::none,
               asio::cancellation_type exception_cancel = asio::cancellation_type::all);

    // insert a task into the group
    void push_back(promise<void> p);

    // the number of tasks in the group
    std::size_t size() const;
    // remove completed tasks without waiting (i.e. zombie tasks)
    std::size_t reap();
    // cancel all tasks
    void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
    // wait for one task to complete.
    wait_one_op wait_one();
    // wait for all tasks to complete
    wait_op wait();
    // wait for all tasks to complete
    wait_op operator co_await ();
    // when used with with , this will receive the exception
    // and wait for the completion
    // if ep is set, this will use the exception_cancel level,
    // otherwise the normal_cancel to cancel all promises.
    wait_op await_exit(std::exception_ptr ep);
};

cobalt/spawn.hpp

spawn 函数允许在 asio executor/execution_context 上运行 task,并使用 completion token 来消耗结果。

auto spawn(Context & context, task<T> && t, CompletionToken&& token);
auto spawn(Executor executor, task<T> && t, CompletionToken&& token);

Spawn 将分发其启动并发布完成。这使得可以使用 task 在另一个执行器上运行 task,并在当前执行器上使用 use_op 来消耗结果。也就是说,spawn 可以用于跨线程。

示例

cobalt::task<int> work();

int main(int argc, char *argv[])
{
  asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_1};
  auto f = spawn(ctx, work(), asio::use_future);
  ctx.run();

  return f.get();
}
调用者需要确保执行器不是在多个线程上并发运行的,例如,通过使用单线程的 asio::io_contextstrand

cobalt/run.hpp

run 函数类似于 spawn,但它是同步运行的。它会在内部设置执行上下文和内存资源。

当将一段 cobalt 代码集成到同步应用程序中时,这会很有用。

概述

// Run the task and return it's value or rethrow any exception.
T run(task<T> t);

示例

cobalt::task<int> work();

int main(int argc, char *argv[])
{
  return run(work());
}

cobalt/thread.hpp

线程类型是创建类似于 main 的环境的另一种方式,但不使用 signal_set

cobalt::thread my_thread()
{
  auto exec = co_await cobalt::this_coro::executor;             (1)
  asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
  co_await tim.async_wait(cobalt::use_op);                      (3)
  co_return 0;
}
1 获取执行器 thread 的运行位置
2 将其与 asio 对象一起使用
3 co_await 一个钴操作

要使用线程,您可以像使用 std::thread 一样使用它

int main(int argc, char * argv[])
{
  auto thr = my_thread();
  thr.join();
  return 0;
}

线程也是一个 awaitable (包括取消)。

cobalt::main co_main(int argc, char * argv[])
{
  auto thr = my_thread();
  co_await thr;
  co_return 0;
}
销毁一个分离的线程将导致硬停止 (io_context::stop) 并加入该线程。
此库中没有任何内容(除了等待 cobalt/thread.hppcobalt/spawn.hpp 之外)是线程安全的。如果需要在线程之间传输数据,则需要一个线程安全的实用程序,例如 asio::concurrent_channel。您不能在线程之间共享任何钴原语,唯一的例外是可以将 task spawn 到另一个线程的执行器上。

执行器

它还会创建一个要运行的 asio::io_context,你可以通过 this_coro::executor 获取它。它将被分配给 cobalt::this_thread::get_executor()

内存资源

它还会创建一个内存资源,该资源将用作内部内存分配的默认值。它将被分配给 thread_localcobalt::this_thread::get_default_resource()

概述

struct thread
{
  // Send a cancellation signal
  void cancel(asio::cancellation_type type = asio::cancellation_type::all);


  // Allow the thread to be awaited. NOOP if the thread is invalid.
  auto operator co_await() &-> detail::thread_awaitable; (1)
  auto operator co_await() && -> detail::thread_awaitable; (2)

  // Stops the io_context & joins the executor
  ~thread();
  /// Move constructible
  thread(thread &&) noexcept = default;

  using executor_type = executor;

  using id = std::thread::id;
  id get_id() const noexcept;

  // Add the functions similar to `std::thread`
  void join();
  bool joinable() const;
  void detach();

  executor_type get_executor() const;
};
1 支持 中断等待
2 始终转发取消

cobalt/result.hpp

可以修改 Awaitable 以返回 system::resultstd::tuple,而不是使用异常。

// value only
T res = co_await foo();

// as result
system::result<T, std::exception_ptr> res = co_await cobalt::as_result(foo());

// as tuple
std::tuple<std::exception_ptr, T> res = co_await cobalt::as_tuple(foo());

Awaitable 还可以通过使用 cobalt::as_result_tagcobalt::as_tuple_tag 提供 await_resume 重载,从而提供处理结果和元组的自定义方式。

your_result_type await_resume(cobalt::as_result_tag);
your_tuple_type  await_resume(cobalt::as_tuple_tag);

这允许 awaitable 提供 std::exception_ptr 以外的其他错误类型,例如 system::error_code。这是通过 opchannel 完成的。

// example of an op with result system::error_code, std::size_t
system::result<std::size_t>                 await_resume(cobalt::as_result_tag);
std::tuple<system::error_code, std::size_t> await_resume(cobalt::as_tuple_tag);
Awaitable 仍然允许抛出异常,例如针对 OOM 等严重异常。

cobalt/async_for.hpp

对于生成器等类型,提供了 BOOST_COBALT_FOR 宏,以模拟 for co_await 循环。

cobalt::generator<int> gen();

cobalt::main co_main(int argc, char * argv[])
{
    BOOST_COBALT_FOR(auto i, gen())
        printf("Generated value %d\n", i);

    co_return 0;
}

要求是在 for 循环中使用的 awaitable 具有 operator bool 来检查是否可以再次等待。 generatorpromise 就是这种情况。

cobalt/error.hpp

为了使错误更容易管理,cobalt 提供了一个 error_category,用于 boost::system::error_code

enum class error
{
  moved_from,
  detached,
  completed_unexpected,
  wait_not_ready,
  already_awaited,
  allocation_failed
};

system::error_category & cobalt_category();
system::error_code make_error_code(error e);

cobalt/config.hpp

配置添加器允许配置 boost.cobalt 的一些实现细节。

executor_type

执行器类型默认为 boost::asio::any_io_executor

您可以通过定义 BOOST_COBALT_CUSTOM_EXECUTOR 并自行添加 boost::cobalt::executor 类型,将其设置为 boost::asio::any_io_executor

或者,可以定义 BOOST_COBALT_USE_IO_CONTEXT 以将执行器设置为 boost::asio::io_context::executor_type

pmr

Boost.cobalt 可以与不同的 pmr 实现一起使用,默认为 std::pmr

可以使用以下宏来配置它

  • BOOST_COBALT_USE_STD_PMR

  • BOOST_COBALT_USE_BOOST_CONTAINER_PMR

  • BOOST_COBALT_USE_CUSTOM_PMR

如果您定义 BOOST_COBALT_USE_CUSTOM_PMR,则需要提供一个 boost::cobalt::pmr 命名空间,它是 std::pmr 的替代品。

或者,可以使用以下方式禁用 pmr 的使用

  • BOOST_COBALT_NO_PMR.

在这种情况下,cobalt 将为同步函数(racegatherjoin)使用非 pmr 单调资源。

use_op 使用一个小型缓冲区优化的资源,其大小可以通过定义 BOOST_COBALT_SBO_BUFFER_SIZE 来设置,默认为 4096 字节。

cobalt/leaf.hpp

Async 提供了与 boost.leaf 的集成。它提供了类似于 leaf 的函数,这些函数采用 awaitable 而不是函数对象,并返回一个 awaitable

template<awaitable TryAwaitable, typename ... H >
auto try_catch(TryAwaitable && try_coro, H && ... h );

template<awaitable TryAwaitable, typename ... H >
auto try_handle_all(TryAwaitable && try_coro, H && ... h );

template<awaitable TryAwaitable, typename ... H >
auto try_handle_some(TryAwaitable && try_coro, H && ... h );

有关详细信息,请参阅 leaf 文档。

cobalt/experimental/context.hpp

这是(很可能)未定义的行为,因为它违反了标准中的前提条件。可以在此处找到解决此问题的论文 (https://isocpp.org/files/papers/P3203R0.html)。

此标头为使用基于 boost.fiber 的有栈协程提供了 experimental 支持,就像它们是 C20 协程一样。也就是说,它们可以通过放入 coroutine_handle 来使用 `awaitables`。同样,该实现使用 C20 协程 promise,并像运行 C++20 协程一样运行。

//
void delay(experimental::context<promise<void>> h, std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  h.await(tim.async_wait(cobalt::use_op)); // instead of co_await.
}

cobalt::main co_main(int argc, char *argv[])
{
  cobalt::promise<void> dl = cobalt::experimental::make_context(&delay, 50);
  co_await dl;
  co_return 0;
}

参考

// The internal coroutine context.
/// Args are the function arguments after the handle.
template<typename Return, typename ... Args>
struct context
{
  // Get a handle to the promise
        promise_type & promise();
  const promise_type & promise() const;

  // Convert it to any context if the underlying promise is the same
  template<typename Return_, typename ... Args_>
  constexpr operator context<Return_, Args_...>() const;

  // Await something. Uses await_transform automatically.
  template<typename Awaitable>
  auto await(Awaitable && aw);
  // Yield a value, if supported by the promise.
  template<typename Yield>
  auto yield(Yield && value);
};


// Create a fiber with a custom stack allocator (see boost.fiber for details) and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc  && salloc, Args && ... args);

// Create a fiber with the default allocator and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func>
auto make_context(Func && func, Args && ... args);

// Create a fiber with a custom stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc  && salloc, Args && ... args);

// Create a fiber with the default stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func>
auto make_context(Func && func, Args && ... args);

深入

自定义执行器

cobalt 默认使用 asio::any_io_executor 的原因之一是它是一种类型擦除的执行器,即您可以提供自己的事件循环,而无需重新编译 cobalt

但是,在 Executor TS 的开发过程中,执行器概念变得有点不直观,委婉地说。

Ruben Perez 写了一篇出色的 博客文章,我将毫不客气地从中借鉴。

定义

执行器是指向实际事件循环的类型,并且(廉价地)可复制,它支持属性(见下文),具有可比较的相等性,并且具有 execute 函数。

执行
struct example_executor
{
  template<typename Fn>
  void execute(Fn && fn) const;
};

以上函数根据其属性执行 fn

属性

可以查询、首选或要求属性,例如

struct example_executor
{
  // get a property by querying it.
  asio::execution::relationship_t &query(asio::execution::relationship_t) const
  {
    return asio::execution::relationship.fork;
  }

  // require an executor with a new property
  never_blocking_executor require(const execution::blocking_t::never_t);

  // prefer an executor with a new property. the executor may or may not support it.
  never_blocking_executor prefer(const execution::blocking_t::never_t);
  // not supported
  example_executor prefer(const execution::blocking_t::always_t);
};
asio::any_io_executor 的属性

为了将执行器包装在 asio::any_io_executor 中,需要两个属性

  • execution::context_t

  • execution::blocking_t::never_t

这意味着我们需要使它们可要求(对于上下文而言没有意义)或从 query 返回期望的值。

execution::context_t 查询应返回 asio::execution_context&,如下所示

struct example_executor
{
  asio::execution_context &query(asio::execution::context_t) const;
};

执行上下文用于管理管理 io 对象生命周期的服务的生命周期,例如 asio 的计时器和套接字。也就是说,通过提供此上下文,asio 的所有 io 都可以使用它。

执行上下文必须在执行器被销毁后仍然存在。

以下可能是首选的

  • execution::blocking_t::possibly_t

  • execution::outstanding_work_t::tracked_t

  • execution::outstanding_work_t::untracked_t

  • execution::relationship_t::fork_t

  • execution::relationship_t::continuation_

这意味着您可能希望在执行器中支持它们以进行优化。

blocking 属性

正如我们之前看到的,此属性控制传递给 execute() 的函数是可以在 execute() 中立即运行,还是必须排队以供以后执行。可能的值是

  • asio::execution::blocking.never:永远不要在 execute() 中运行该函数。这是 asio::post() 的作用。

  • asio::execution::blocking.possibly:该函数可能在 execute() 中运行,也可能不运行。这是默认值(调用 io_context::get_executor 时获得的值)。

  • asio::execution::blocking.always:该函数始终在 execute() 中运行。io_context::executor 不支持此功能。

relationship 属性

relationship 可以取两个值

  • asio::execution::relationship.continuation:表示传递给 execute() 的函数是调用 execute() 的函数的延续。

  • asio::execution::relationship.fork:与上述相反。这是默认值(调用 io_context::get_executor() 时获得的值)。

将此属性设置为 continuation 可以在调度函数的方式中启用一些优化。仅当函数排队时(而不是立即运行时)才有效。对于 io_context,设置后,该函数将安排在更快的线程本地队列中运行,而不是上下文全局队列中运行。

outstanding_work_t 属性

outstanding_work 可以取两个值

  • asio::execution::outstanding_work.tracked:表示当执行器处于活动状态时,仍有工作要做。

  • asio::execution::outstanding_work.untracked:与上述相反。这是默认值(调用 io_context::get_executor() 时获得的值)。

将此属性设置为 tracked 意味着只要 executor 处于活动状态,事件循环就不会返回。

最小执行器

有了这些,让我们看一下最小执行器的接口。

struct minimal_executor
{
  minimal_executor() noexcept;

  asio::execution_context &query(asio::execution::context_t) const;

  static constexpr asio::execution::blocking_t
  query(asio::execution::blocking_t) noexcept
  {
    return asio::execution::blocking.never;
  }

  template<class F>
  void execute(F && f) const;

  bool operator==(minimal_executor const &other) const noexcept;
  bool operator!=(minimal_executor const &other) const noexcept;
};
有关使用 python 的 asyncio 事件循环的实现,请参见 example/python.cpp

添加工作保护。

现在,让我们为 outstanding_work 属性添加一个 require 函数,该函数使用多种类型。

struct untracked_executor : minimal_executor
{
  untracked_executor() noexcept;

  constexpr   tracked_executor require(asio::execution::outstanding_work::  tracked_t) const;
  constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const {return *this; }
};

struct untracked_executor : minimal_executor
{
  untracked_executor() noexcept;

  constexpr   tracked_executor require(asio::execution::outstanding_work::  tracked_t) const {return *this;}
  constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const;
};

请注意,从 require 函数返回不同的类型不是必需的,也可以像这样完成

struct trackable_executor : minimal_executor
{
  trackable_executor() noexcept;

  constexpr trackable_executor require(asio::execution::outstanding_work::  tracked_t) const;
  constexpr trackable_executor require(asio::execution::outstanding_work::untracked_t) const;
};

如果我们想使用 prefer,它看起来如下所示

struct trackable_executor : minimal_executor
{
  trackable_executor() noexcept;

  constexpr trackable_executor prefer(asio::execution::outstanding_work::  tracked_t) const;
  constexpr trackable_executor prefer(asio::execution::outstanding_work::untracked_t) const;
};

总结

如您所见,属性系统并非微不足道,但功能非常强大。实现自定义执行器本身就是一个问题类别,这就是为什么本文档不这样做。相反,有一个关于如何在执行器中包装 python 事件循环的示例。

以下是一些阅读建议。

无栈

C++20 协程是无栈的,这意味着它们没有自己的堆栈。

C++ 中的堆栈描述了调用堆栈,即所有堆叠的函数帧。函数帧是函数运行所需的内存,即存储其变量和返回地址等信息的内存片。

函数帧的大小在编译时已知,但在包含其定义的编译单元之外未知。
int bar() {return 0;} // the deepest point of the stack
int foo() {return bar();}

int main()
{
    return foo();
}

在上面的示例中,调用堆栈是

main()
  foo()
    bar()
stackless1

协程可以实现为有栈的,这意味着它会分配固定大小的内存块,并像线程一样堆叠函数帧。C++20 协程是无栈的,即它们只分配自己的帧,并在恢复时使用调用者的堆栈。使用我们之前的例子

fictional_eager_coro_type<int> example()
{
    co_yield 0;
    co_yield 1;
}

void nested_resume(fictional_eager_coro_type<int>& f)
{
    f.resume();
}

int main()
{
    auto f = example();
    nested_resume(f);
    f.reenter();
    return 0;
}

这将产生类似于这样的调用堆栈

main()
  f$example()
  nested_resume()
    f$example()
  f$example()
stackless2

如果协程在线程之间移动,情况也是如此。

惰性 & 急切

协程是惰性的,如果它们只在恢复后才开始执行其代码,而 eager 协程将立即执行,直到其第一个挂起点(即 co_awaitco_yieldco_return 表达式)。

lazy_coro co_example()
{
    printf("Entered coro\n");
    co_yield 0;
    printf("Coro done\n");
}

int main()
{
    printf("enter main\n");
    auto lazy = co_example();
    printf("constructed coro\n");
    lazy.resume();
    printf("resumed once\n");
    lazy.resume();
    printf("resumed twice\n");
    return 0;
}

这将产生如下输出

enter main
constructed coro
Entered coro
resumed once
Coro Done
resumed twice
lazy eager1

而 eager 协程看起来会是这样

eager_coro co_example()
{
    printf("Entered coro\n");
    co_yield 0;
    printf("Coro done\n");
}

int main()
{
    printf("enter main\n");
    auto lazy = co_example();
    printf("constructed coro\n");
    lazy.resume();
    printf("resumed once\n");
    return 0;
}

这将产生如下输出

enter main
Entered coro
constructed coro
resume once
Coro Done
lazy eager2

基准测试

在第 11 代 Intel® Core™ i7-1185G7 @ 3.00GHz 上运行

发布到执行器

该基准测试运行以下代码,使用了 cobalt 的任务、asio::awaitable 和基于 `asio` 的有栈协程 (boost.context)。

cobalt::task<void> atest()
{
  for (std::size_t i = 0u; i < n; i++)
    co_await asio::post(cobalt::use_op);
}
表 6. 5000 万次运行的结果,单位为毫秒
gcc 12 clang 16

cobalt

2472

2098

可等待对象

2432

2253

有栈

3655

3725

并行运行 noop 协程

此基准测试使用大小为零的 asio::experimental::channel,以便并行地读取和写入。它使用带有 cobalt 的 gatherasio::awaitable 中的 awaitable_operator

cobalt::task<void> atest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
  for (std::size_t i = 0u; i < n; i++)
    co_await cobalt::gather(
              chan.async_send(system::error_code{}, cobalt::use_task),
              chan.async_receive(cobalt::use_task));
}

asio::awaitable<void> awtest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
  using boost::asio::experimental::awaitable_operators::operator&&;
  for (std::size_t i = 0u; i < n; i++)
    co_await (
        chan.async_send(system::error_code{}, asio::use_awaitable)
        &&
        chan.async_receive(asio::use_awaitable));
}
表 7. 300 万次运行的结果,单位为毫秒
gcc 12 clang 16

cobalt

1563

1468

可等待对象

2800

2805

立即

此基准测试通过使用大小为 1 的通道来利用立即完成,以便每次操作都是立即的。

cobalt::task<void> atest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 1u};
  for (std::size_t i = 0u; i < n; i++)
  {
    co_await chan.async_send(system::error_code{}, cobalt::use_op);
    co_await chan.async_receive(cobalt::use_op);
  }
}
表 8. 1000 万次运行的结果,单位为毫秒
gcc 12 clang 16

cobalt

1810

1864

可等待对象

3109

4110

有栈

3922

4705

通道

在此基准测试中,将比较 asio::experimental::channel 和 cobalt::channel。

这与并行测试类似,但使用了 cobalt::channel 代替。

表 9. 运行测试 300 万次的结果,单位为毫秒
gcc clang

cobalt

500

350

可等待对象

790

770

有栈

867

907

操作分配

此基准测试比较了异步操作的相关分配器的不同可能解决方案

表 10. 运行测试 200 万次的结果,单位为毫秒
gcc clang

std::allocator

1136

1139

cobalt::monotonic

1149

1270

pmr::monotonic

1164

1173

cobalt::sbo

1021

1060

后一种方法在 cobalt 内部使用。

要求

Boost.cobalt 需要 C++20 编译器,并直接依赖以下 boost 库

  • boost.asio

  • boost.system

  • boost.circular_buffer

  • boost.intrusive

  • boost.smart_ptr

  • boost.container(对于 clang < 16)

编译器

该库自 Clang 14、Gcc 10 和 MSVC 19.30 (Visual Studio 2022) 起受支持。

Gcc 版本 12.1 和 12.2 似乎存在协程的错误,如[此处](https://godbolt.org/z/6adGcqP1z)所示,应避免用于协程。

Clang 仅在 16 中添加了 std::pmr 支持,因此较旧的 clang 版本使用 boost::contianer::pmr 作为替代品。

某些(如果不是全部)MSVC 版本存在协程实现的错误,此库需要解决此问题。这可能会导致不确定的行为和开销。

协程的延续可以在从 final_suspend 返回的 awaitable 中完成,如下所示

// in promise
auto final_suspend() noexcept
{
    struct final_awaitable
    {
      std::coroutine_handle<void> continuation{std::noop_coroutine()}; (1)
      bool await_ready() const noexcept;
      std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> h) noexcept
      {
        auto cc = continuation;
        h.destroy(); (2)
        return cc;
      }

      void await_resume() noexcept {}
    };
    return final_awaitable{my_continuation};
};
1 延续
2 在延续之前自销毁协程

final_suspend 在 MSVC 上无法正确挂起协程,因此 h.destroy() 将导致协程帧上的元素被双重销毁。因此,msvc 需要发布销毁,以便在外部执行。这将导致开销,并使内存的实际释放不确定。

致谢

如果没有 CppAlliance 及其创始人 Vinnie Falco,这个库就不可能实现。Vinnie 非常信任我,让我在这个项目上工作,尽管他对这个库的设计方式有非常不同的看法。

还要感谢 Ruben Perez 和 Richard Hodges,他们听取了我的设计问题并给了我建议和用例。此外,如果没有 Chris Kohlhoff 出色的 boost.asio,这个库也不可能实现。