Boost C++ 库

...世界上备受推崇和精心设计的 C++ 库项目之一。 Herb SutterAndrei Alexandrescu, C++ 编码标准

文档 boost.cobalt - Boost C++ 函数库

概述

以下是 cobalt 中相关功能的列表

表 1. 协程类型

promise

返回单个结果的急切协程 - 视为默认值

generator

可以产生多个值的急切协程。

task

promise 的惰性版本,可以派生到其他执行器。

detached

类似于 promise 的协程,没有句柄

表 2. 同步函数

race

一个函数,以伪随机方式等待一组协程中就绪的一个,以避免饥饿。

join

一个函数,等待一组协程并将其全部作为值返回,如果任何 awaitable 抛出异常,则抛出异常。

gather

一个函数,等待一组协程并将其全部作为 result 返回,单独捕获所有异常。

left_race

一个确定性的 race,从左到右评估。

表 3. 实用工具

channel

一个线程局部实用工具,用于在协程之间发送值。

带有

一个异步 RAII 辅助工具,允许在发生异常时进行异步拆卸

表 4. 阅读指南

协程入门

C++ 协程简介

如果您以前从未使用过协程,请阅读

导览

功能和概念的简要高级概述

如果您熟悉 asio 和协程,并想大致了解此库提供的功能,请阅读

教程

低级用法视图

如果您想快速编码,请阅读

参考

API 参考

编码时查找详细信息

深入

一些实现细节

如果您不够困惑,请阅读

动机

许多编程语言,如 node.js 和 python,提供了易于使用的单线程并发框架。虽然比同步代码更复杂,但单线程异步性避免了多线程的许多陷阱和开销。

也就是说,一个协程可以工作,而其他协程等待事件(例如,来自服务器的响应)。这允许编写在单个线程同时执行多项任务的应用程序。

该库旨在为 C++ 提供此功能:简单的单线程异步性,类似于 node.js 和 python 中的 asyncio,并与现有库(如 boost.beastboost.mysqlboost.redis)配合使用。它基于 boost.asio

它从其他语言中提取了一系列概念,并基于 C++20 协程提供了它们。

asio::awaitableasio::experimental::coro 不同,cobalt 协程是开放的。也就是说,asio::awaitable 只能 await 和被其他 asio::awaitable await,并且不提供协程特定的同步机制。

另一方面,cobalt 提供了一个协程特定的 channel 和不同的等待类型(racegather 等),这些类型经过优化,可以与协程和 awaitable 配合使用。

协程入门

异步编程

异步编程通常指一种编程风格,它允许任务在后台运行,同时执行其他工作。

想象一下,如果您有一个 get-request 函数,它执行完整的 http 请求,包括连接和 ssl 握手等。

std::string http_get(std:string_view url);

int main(int argc, char * argv[])
{
    auto res = http_get("https://boost.ac.cn");
    printf("%s", res.c_str());
    return 0;
}

上述代码将是传统的同步编程。如果我们要并行执行两个请求,我们将需要创建另一个线程来运行另一个带有同步代码的线程。

std::string http_get(std:string_view url);

int main(int argc, char * argv[])
{
    std::string other_res;

    std::thread thr{[&]{ other_res = http_get("https://cppalliance.org"); }};
    auto res = http_get("https://boost.ac.cn");
    thr.join();

    printf("%s", res.c_str());
    printf("%s", other_res.c_str());
    return 0;
}

这可行,但我们的程序大部分时间将花费在等待输入上。操作系统提供允许异步执行 IO 的 API,而 boost.asio 等库提供了管理异步操作的便携式方法。Asio 本身不规定处理完成的方式。该库 (boost.cobalt) 提供了一种通过协程/awaitable 管理所有这些的方法。

cobalt::promise<std::string> http_cobalt_get(std:string_view url);

cobalt::main co_main(int argc, char * argv[])
{
    auto [res, other_res] =
            cobalt::join(
                http_cobalt_get("https://boost.ac.cn"),
                http_cobalt_get("https://cppalliance.org")
            );

    printf("%s", res.c_str());
    printf("%s", other_res.c_str());
    return 0;
}

在上述代码中,执行请求的异步函数利用了操作系统 API,因此实际的 IO 不会阻塞。这意味着,当我们等待两个函数完成时,操作是交错且非阻塞的。同时,cobalt 提供了协程原语,使我们摆脱了回调地狱。

协程

协程是可恢复的函数。可恢复意味着函数可以暂停,即将控制多次返回给调用者。

常规函数使用 return 函数将控制权返回给调用者,并在那里也返回该值。

另一方面,协程可能会将控制权交给调用者并多次恢复。

协程有三个控制关键字,类似于 co_return(其中只有 co_return 必须受支持)。

  • co_return

  • co_yield

  • co_await

co_return

这类似于 return,但将函数标记为协程。

co_await

co_await 表达式暂停等待一个 Awaitable,即停止执行直到 awaitable 恢复它。

例如:

cobalt::promise<void> delay(std::chrono::milliseconds);

cobalt::task<void> example()
{
  co_await delay(std::chrono::milliseconds(50));
}

co_await 表达式可以产生一个值,具体取决于它正在等待什么。

cobalt::promise<std::string> read_some();

cobalt::task<void> example()
{
  std::string res = co_await read_some();
}
cobalt 中,大多数协程原语也是 Awaitable

co_yield

co_yield 表达式类似于 co_await,但它将控制权交给调用者并带有一个值。

例如:

cobalt::generator<int> iota(int max)
{
  int i = 0;
  while (i < max)
    co_yield i++;

  co_return i;
}

co_yield 表达式也可以产生一个值,这允许产生值的协程的用户向其推送值。

cobalt::generator<int> iota()
{
  int i = 0;
  bool more = false;
  do
  {
    more = co_yield i++;
  }
  while(more);
  co_return -1;
}
无栈

C++ 协程是无栈的,这意味着它们只分配自己的函数帧。

有关详细信息,请参阅无栈

可等待对象

Awaitable 是可以在 co_await 表达式中使用的类型。

struct awaitable_prototype
{
    bool await_ready();

    template<typename T>
    see_below await_suspend(std::coroutine_handle<T>);

    return_type  await_resume();
};
如果存在 operator co_await 调用,则类型将隐式转换为 awaitable。本文档将使用 awaitable 来包含这些类型,并使用 "actual_awaitable" 来指代符合上述原型的类型。
awaitables

co_await 表达式中,等待协程将首先调用 await_ready 以检查协程是否需要暂停。准备好后,它直接进入 await_resume 以获取值,因为不需要暂停。否则,它将暂停自身并使用 std::coroutine_handle 调用 await_suspend 到其自己的 promise。

std::coroutine_handle<void> 可用于类型擦除。

return_typeco_await expression 的结果类型,例如 int

int i = co_await awaitable_with_int_result();

await_suspend 的返回类型可以是三种情况

  • void

  • bool

  • std::coroutine_handle<U>

如果它是 void,则等待协程保持暂停。如果它是 bool,则将检查该值,如果为 false,则等待协程将立即恢复。

如果返回 std::coroutine_handle,则此协程将恢复。后者允许 await_suspend 返回传入的句柄,这实际上与返回 false 相同。

如果等待协程立即重新恢复,即在调用 await_resume 后,它在该库中被称为“立即完成”。这不要与非暂停 awaitable 混淆,即从 await_ready 返回 true 的 awaitable。

事件循环

由于 cobalt 中的协程可以 co_await 事件,因此它们需要在事件循环上运行。也就是说,另一段代码负责跟踪未完成的事件并恢复正在等待它们的协程。这种模式非常常见,并以类似的方式被 node.js 或 python 的 asyncio 使用。

cobalt 使用 asio::io_context 作为其默认事件循环。也就是说,类 threadmainrun 函数在内部使用它。

您可以使用任何可以生成 asio::any_io_executor 的事件循环与该库一起使用。实现这一点的最简单方法是使用 spawn

事件循环通过执行器(遵循 asio 术语)访问,并且可以使用 set_executor 手动设置。

导览

进入 cobalt 环境

为了使用 awaitable,我们需要能够 co_await 它们,即在协程内。

我们有四种方法可以实现这一点

cobalt/main.hpp

用协程替换 int main

cobalt::main co_main(int argc, char* argv[])
{
    // co_await things here
    co_return 0;
}
cobalt/thread.hpp

为异步环境创建线程

cobalt::thread my_thread()
{
    // co_await things here
    co_return;
}

int main(int argc, char ** argv[])
{
    auto t = my_thread();
    t.join();
    return 0;
}
cobalt/task.hpp

创建一个任务并运行或派生它

cobalt::task<void> my_thread()
{
   // co_await things here
   co_return;
}

int main(int argc, char ** argv[])
{
    cobalt::run(my_task()); // sync
    asio::io_context ctx;
    cobalt::spawn(ctx, my_task(), asio::detached);
    ctx.run();
    return 0;
}

Promise

Promise 是推荐的默认协程类型。它们是急切的,因此易于用于临时并发。

cobalt::promise<int> my_promise()
{
   co_await do_the_thing();
   co_return 0;
}

cobalt::main co_main(int argc, char * argv[])
{
    // start the promise here
    auto p = my_promise();
    // do something else here
    co_await do_the_other_thing();
    // wait for the promise to complete
    auto res = co_await p;

    co_return res;
}

Task

Task 是惰性的,这意味着它们在被 await 或派生之前不会做任何事情。

cobalt::task<int> my_task()
{
   co_await do_the_thing();
   co_return 0;
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the task here
    auto t = my_task();
    // do something else here first
    co_await do_the_other_thing();
    // start and wait for the task to complete
    auto res = co_await t;
    co_return res;
}

Generator

一个 generator 是 cobalt 中唯一可以 co_yield 值的类型。

Generator 默认是急切的。与 std::generator 不同,cobalt::generator 可以 co_await,因此是异步的。

cobalt::generator<int> my_generator()
{
   for (int i = 0; i < 10; i++)
    co_yield i;
   co_return 10;
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator();
    while (g)
        printf("Generator %d\n", co_await g);
    co_return 0;
}

值可以推送到生成器中,这些值将从 co_yield 返回。

一个急切的生成器将在被等待之前产生第一个结果。也就是说,当我们调用 co_await g(4) 时,初始 co_yield 的结果已就绪。然后生成器接收作为 co_yield 返回传入的值 4,并处理它。

cobalt::generator<std::string, int> my_eager_push_generator(int value)
{
   while (value != 0)
       value = co_yield std::to_string(value);
   co_return "";
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator(5);

    assert("5" == co_await g(4)); // result of 5
    assert("4" == co_await g(3)); // result of 4
    assert("3" == co_await g(2)); // result of 3
    assert("2" == co_await g(1)); // result of 2
    assert("1" == co_await g(0)); // result of 1

    // we let the coroutine go out of scope while suspended
    // no need for another co_await of `g`

    co_return 0;
}

协程也可以使用 this_coro::initial 变为惰性。

一个惰性生成器确实等待第一次 co_await 开始工作,即当 initial 被等待时它会暂停。数据处理不是提前一步,而是在 co_await 时发生。

cobalt::generator<std::string, int> my_eager_push_generator()
{
    auto value = co_await this_coro::initial;
    while (value != 0)
       value = co_yield std::to_string(value);
    co_return "";
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator(); // lazy, so the generator waits for the first pushed value
    assert("5" == co_await g(5)); // result of 5
    assert("4" == co_await g(4)); // result of 4
    assert("3" == co_await g(3)); // result of 3
    assert("2" == co_await g(2)); // result of 2
    assert("1" == co_await g(1)); // result of 1

    // we let the coroutine go out of scope while suspended
    // no need for another co_await of `g`

    co_return 0;
}

join

如果多个 awaitable 并行工作,它们可以使用 join 同时等待。

cobalt::promise<int> some_work();
cobalt::promise<double> more_work();

cobalt::main co_main(int argc, char * argv[])
{
    std::tuple<int, double> res = cobalt::join(some_work(), more_work());
    co_return 0;
}

race

如果多个 awaitable 并行工作,但我们希望在其中任何一个完成时收到通知,我们应该使用 race

cobalt::generator<int> some_data_source();
cobalt::generator<double> another_data_source();

cobalt::main co_main(int argc, char * argv[])
{
    auto g1 = some_data_source();
    auto g2 = another_data_source();

    int res1    = co_await g1;
    double res2 = co_await g2;

    printf("Result: %f", res1 * res2);

    while (g1 && g2)
    {
        switch(variant2::variant<int, double> nx = co_await cobalt::race(g1, g2))
        {
            case 0:
                res1 = variant2::get<0>(nx);
                break;
            case 1:
                res2 = variant2::get<1>(nx);
                break;
        }
        printf("New result: %f", res1 * res2);
    }

    co_return 0;
}
在这种情况下,race 不会造成任何数据丢失。

教程

delay

让我们从最简单的示例开始:一个简单的延迟。

example/delay.cpp
cobalt::main co_main(int argc, char * argv[]) (1)
{
  asio::steady_timer tim{co_await asio::this_coro::executor, (2)
                         std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
  co_await tim.async_wait(cobalt::use_op); (4)
  co_return 0; (5)
}
1 co_main 函数在使用时定义了一个隐式的 main,是设置运行异步代码环境的最简单方法。
2 从当前协程 promise 中获取执行器。
3 使用参数设置超时
4 使用 cobalt::use_op 执行等待。
5 返回一个从隐式 main 返回的值。

在此示例中,我们使用 cobalt/main.hpp 头文件,如果如上定义 co_main,它将为我们提供一个 main 协程。这有几个优点

  • 环境设置正确(executormemory

  • asio 被告知上下文是单线程的

  • 一个带有 SIGINTSIGTERMasio::signal_set 会自动连接到取消(即 Ctrl+C 会导致取消)

此协程的 promise 中有一个执行器(promise 是协程状态的 C++ 名称。不要与 cobalt/promise.hpp 混淆),我们可以通过 this_coro 命名空间中的虚拟 awaitable 获得它。

然后我们可以构造一个计时器并使用 use_op 启动 async_waitcobalt 提供了多种方式来 co_await 与 asio 交互,其中 use_op 是最简单的一种。

echo 服务器

我们将到处使用 use_op (asio 完成) 令牌,因此我们使用 默认完成令牌,这样我们就可以跳过最后一个参数。

example/echo_server.cpp 声明
namespace cobalt = boost::cobalt;
namespace this_coro = boost::cobalt::this_coro;

我们将 echo 函数编写为 promise 协程。它是一个急切的协程,推荐作为默认值;如果需要惰性协程,可以使用 task

example/echo_server.cpp echo 函数
cobalt::promise<void> echo(cobalt::io::stream_socket socket)
{
  try (1)
  {
    char data[4096];
    while (socket.is_open()) (2)
    {
      std::size_t n = co_await socket.read_some(boost::asio::buffer(data)); (3)
      co_await cobalt::io::write(socket, boost::asio::buffer(data, n)); (4)
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo: exception: %s\n", e.what());
  }
}
1 使用 use_op 完成令牌时,I/O 错误会转换为 C++ 异常。此外,如果协程被取消(例如,因为用户按下了 Ctrl-C),也会引发异常。在这些情况下,我们打印错误并退出循环。
2 我们运行循环直到被取消(异常)或用户关闭连接。
3 尽可能多地阅读。
4 写入所有读取的字节。

请注意,promise 是急切的。调用 echo 将立即执行代码直到 async_read_some,然后将控制权返回给调用者。

接下来,我们还需要一个接受函数。在这里,我们使用 generator 来管理接受器状态。这是一个可以多次 co_awaited 的协程,直到达到 co_return 表达式。

example/echo_server.cpp 监听函数
cobalt::generator<cobalt::io::stream_socket> listen()
{
  cobalt::io::acceptor acceptor(cobalt::io::endpoint{cobalt::io::tcp_v4, "0.0.0.0", 55555});
  for (;;) (1)
  {
    cobalt::io::stream_socket sock = co_await acceptor.accept(); (2)
    co_yield std::move(sock); (3)
  }
  co_return cobalt::io::stream_socket{}; (4)
}
1 取消也会导致 co_await 抛出异常
2 异步接受连接
3 将其 yield 给等待协程
4 co_return 一个值以符合 C++ 标准。

有了这两个函数,我们现在可以编写服务器了

example/echo_server.cpp run_server 函数
cobalt::promise<void> run_server(cobalt::wait_group & workers)
{
  auto l = listen(); (1)
  while (true)
  {
    if (workers.size() == 10u)
      co_await workers.wait_one();  (2)
    else
      workers.push_back(echo(co_await l)); (3)
  }
}
1 构造监听器生成器协程。当对象被销毁时,协程将被取消,执行所有必要的清理工作。
2 当工作器数量超过 10 个时,我们等待一个工作器完成
3 接受新连接并启动它。

wait_group 用于管理正在运行的 echo 函数。此类别将取消并等待正在运行的 echo 协程。

我们不需要对 listener 进行同样的操作,因为它在 l 被销毁时会自行停止。生成器的析构函数会取消它。

由于 promise 是急切的,只需调用它即可启动。然后我们将这些 promises 放入 wait_group,这将允许我们在作用域退出时销毁所有工作器。

example/echo_server.cpp co_main 函数
cobalt::main co_main(int argc, char ** argv)
{
  co_await cobalt::with(cobalt::wait_group(), &run_server); (1)
  co_return 0u;
}
1 使用异步作用域运行 run_server

上面所示的 with 函数将运行一个带资源(例如 wait_group)的函数。在作用域退出时,with 将调用并 co_await 一个异步拆卸函数。这将导致所有连接在 co_main 存在之前正确关闭。

价格行情

为了演示 channels 和其他工具,我们需要一定的复杂性。为此,我们的项目是一个价格行情应用程序,它连接到 https://blockchain.info。用户可以连接到 localhost 查询给定的货币对,如下所示

wscat -c localhost:8080/btc/usd

首先,我们进行与 echo 服务器相同的声明。

example/ticker.cpp 声明
using executor_type = cobalt::use_op_t::executor_with_default<cobalt::executor>;
using socket_type   = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;

下一步是编写一个函数来连接 ssl 流,以连接上游

example/ticker.cpp connect
cobalt::promise<asio::ssl::stream<socket_type>> connect(
        std::string host, boost::asio::ssl::context & ctx)
{
    asio::ip::tcp::resolver res{cobalt::this_thread::get_executor()};
    auto ep = co_await res.async_resolve(host, "https", cobalt::use_op); (1)

    asio::ssl::stream<socket_type> sock{cobalt::this_thread::get_executor(), ctx};
    co_await sock.next_layer().async_connect(*ep.begin()); (2)
    co_await sock.async_handshake(asio::ssl::stream_base::client); (3)

    co_return sock; (4)
}
1 查找主机
2 连接到端点
3 执行 SSL 握手
4 将套接字返回给调用者

接下来,我们需要一个函数来对现有 ssl 流进行 websocket 升级。

example/ticker.cpp connect_to_blockchain_info
cobalt::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
 ws.set_option(beast::websocket::stream_base::decorator(
     [](beast::websocket::request_type& req)
     {
       req.set(http::field::user_agent,
               std::string(BOOST_BEAST_VERSION_STRING) + " cobalt-ticker");
       req.set(http::field::origin,
               "https://exchange.blockchain.com"); (1)
     }));

 co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 blockchain.info 要求设置此头文件。
2 执行 websocket 握手。

一旦 websocket 连接,我们希望连续接收 json 消息,为此生成器是一个不错的选择。

example/ticker.cpp json_read
cobalt::generator<json::object> json_reader(websocket_type & ws)
try
{
    beast::flat_buffer buf;
    while (ws.is_open()) (1)
    {
        auto sz = co_await ws.async_read(buf); (2)
        json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
        auto obj = json::parse(data);
        co_yield obj.as_object(); (3)
        buf.consume(sz);
    }
    co_return {};
}
catch (std::exception & e)
{
  std::cerr << "Error reading: " << e.what() << std::endl;
  throw;
}
1 只要套接字保持打开状态,就一直运行
2 从 websocket 读取帧
3 解析并 co_yield 它作为对象。

然后需要将其连接到订阅者,为此我们将利用通道传递原始 json。为了便于生命周期管理,订阅者将持有 shared_ptr,生产者将持有 weak_ptr

example/ticker.cpp 订阅类型
using subscription = std::pair<std::string, std::weak_ptr<cobalt::channel<json::object>>>;
using subscription_channel = std::weak_ptr<cobalt::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;

运行区块链连接器的主要函数操作两个输入:来自 websocket 的数据和用于处理新订阅的通道。

example/ticker.cpp 运行 blockchain_info
cobalt::promise<void> run_blockchain_info(cobalt::channel<subscription> & subc)
try
{
    asio::ssl::context ctx{asio::ssl::context_base::tls_client};
    websocket_type ws{co_await connect("blockchain.info", ctx)};
    co_await connect_to_blockchain_info(ws); (1)

    subscription_map subs;
    std::list<std::string> unconfirmed;

    auto rd = json_reader(ws); (2)
    while (ws.is_open()) (3)
    {
      switch (auto msg = co_await cobalt::race(rd, subc.read()); msg.index()) (4)
      {
        case 0: (5)
          if (auto ms = get<0>(msg);
              ms.at("event") == "rejected") // invalid sub, cancel however subbed
            co_await handle_rejections(unconfirmed, subs, ms);
          else
            co_await handle_update(unconfirmed, subs, ms, ws);
        break;
        case 1: // (6)
            co_await handle_new_subscription(
                unconfirmed, subs,
                std::move(get<1>(msg)), ws);
        break;
      }
    }

    for (auto & [k ,c] : subs)
    {
        if (auto ptr = c.lock())
            ptr->close();
    }
}
catch(std::exception & e)
{
  std::cerr << "Exception: " << e.what() << std::endl;
  throw;
}
1 初始化连接
2 实例化 json_reader
3 只要 websocket 打开就运行
4 选择,即等待新的 json 消息或订阅
5 当是 json 时,处理更新或拒绝
6 处理新订阅消息

handle_* 函数的内容对于 cobalt 功能而言不那么重要,因此在本教程中省略了它。

handle_new_subscription 函数向 blockchain.info 发送消息,后者会发回确认或拒绝。handle_rejectionhandle_update 将获取 json 值并将其转发到订阅通道。

在消费者端,我们的服务器将只将数据转发给客户端。如果客户端输入数据,我们将立即关闭 websocket。我们使用 as_tuple 来忽略潜在的错误。

example/ticker.cpp read and close
cobalt::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
    system::error_code ec;
    co_await st.async_read(buf, asio::as_tuple(cobalt::use_op));
    co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(cobalt::use_op));
    st.next_layer().close(ec);
}

接下来,我们运行用户发送的会话

example/ticker.cpp run_session
cobalt::promise<void> run_session(beast::websocket::stream<socket_type> st,
                                 cobalt::channel<subscription> & subc)
try
{
    http::request<http::empty_body> req;
    beast::flat_buffer buf;
    co_await http::async_read(st.next_layer(), buf, req); (1)
    // check the target
    auto r = urls::parse_uri_reference(req.target());
    if (r.has_error() || (r->segments().size() != 2u)) (2)
    {
        http::response<http::string_body> res{http::status::bad_request, 11};
        res.body() = r.has_error() ? r.error().message() :
                    "url needs two segments, e.g. /btc/usd";
        co_await http::async_write(st.next_layer(), res);
        st.next_layer().close();
        co_return ;
    }

    co_await st.async_accept(req); (3)

    auto sym = std::string(r->segments().front()) + "-" +
               std::string(r->segments().back());
    boost::algorithm::to_upper(sym);
    // close when data gets sent
    auto p = read_and_close(st, std::move(buf)); (4)

    auto ptr = std::make_shared<cobalt::channel<json::object>>(1u); (5)
    co_await subc.write(subscription{sym, ptr}); (6)

    while (ptr->is_open() && st.is_open()) (7)
    {
      auto bb = json::serialize(co_await ptr->read());
      co_await st.async_write(asio::buffer(bb));
    }

    co_await st.async_close(beast::websocket::close_code::going_away,
                            asio::as_tuple(cobalt::use_op)); (8)
    st.next_layer().close();
    co_await p; (9)

}
catch(std::exception & e)
{
    std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 读取 http 请求,因为我们想要路径
2 检查路径,例如 /btc/usd
3 接受 WebSocket
4 开始读取,如果消费者发送了内容,则关闭
5 创建通道以接收更新
6 run_blockchain_info 发送订阅请求
7 当通道和 websocket 打开时,我们正在转发数据。
8 关闭套接字并忽略错误
9 由于 websocket 现在肯定已关闭,等待 read_and_close 关闭。

在编写完 run_sessionrun_blockchain_info 之后,我们现在可以转到 main

example/ticker.cpp main
cobalt::main co_main(int argc, char * argv[])
{
    acceptor_type acc{co_await cobalt::this_coro::executor,
                      asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
    std::cout << "Listening on localhost:8080" << std::endl;

    constexpr int limit = 10; // allow 10 ongoing sessions
    cobalt::channel<subscription> sub_manager; (1)

    co_await join( (2)
      run_blockchain_info(sub_manager),
      cobalt::with( (3)
        cobalt::wait_group(
            asio::cancellation_type::all,
            asio::cancellation_type::all),
        [&](cobalt::wait_group & sessions) -> cobalt::promise<void>
        {
          while (!co_await cobalt::this_coro::cancelled) (4)
          {
            if (sessions.size() >= limit) (5)
              co_await sessions.wait_one();

            auto conn = co_await acc.async_accept(); (6)
            sessions.push_back( (7)
                run_session(
                    beast::websocket::stream<socket_type>{std::move(conn)},
                    sub_manager));
          }
        })
      );

    co_return 0;
}
1 创建通道以管理订阅
2 使用 join 并行运行两个任务。
3 使用 cobalt 作用域提供 wait_group
4 运行直到被取消。
5 达到 limit 后,我们等待一个任务完成。
6 等待新连接。
7 将会话插入 wait_group

Main 使用 join 是因为一个任务失败应该取消另一个任务。

延迟操作

到目前为止,我们已经使用 use_op 来使用基于 asio 完成令牌机制的隐式操作。

然而,我们可以实现自己的操作,这些操作也可以利用 await_ready 优化。与立即完成不同,当 await_ready 返回 true 时,协程永远不会暂停。

为了利用此协程功能,cobalt 提供了一种创建可跳过操作的简单方法

example/delay_op.cpp
struct wait_op final : cobalt::op<system::error_code> (1)
{
  asio::steady_timer & tim;
  wait_op(asio::steady_timer & tim) : tim(tim) {}
  void ready(cobalt::handler<system::error_code> h ) override (2)
  {
    if (tim.expiry() < std::chrono::steady_clock::now())
      h(system::error_code{});
  }
  void initiate(cobalt::completion_handler<system::error_code> complete) override (3)
  {
    tim.async_wait(std::move(complete));
  }
};


cobalt::main co_main(int argc, char * argv[])
{
  asio::steady_timer tim{co_await asio::this_coro::executor,
                         std::chrono::milliseconds(std::stoi(argv[1]))};
  co_await wait_op(tim); (4)
  co_return 0; //
}
1 声明操作。我们继承 op 以使其可等待。
2 预暂停检查在此处实现
3 如果需要,执行等待
4 像使用任何其他 awaitable 一样使用 op

通过这种方式,我们可以最小化协程暂停的数量。

虽然上述方法与 asio 一起使用,但您也可以将这些处理程序与任何其他基于回调的代码一起使用。

带推送值的生成器

带推送值的协程并不常见,但可以显著简化某些问题。

由于我们已经在前面的示例中有了 json_reader,这里是如何编写一个接收推送值的 json_writer。

使用生成器的优点是内部状态管理。

cobalt::generator<system::error_code, json::object>
    json_writer(websocket_type & ws)
try
{
    char buffer[4096];
    json::serializer ser;

    while (ws.is_open()) (1)
    {
        auto val = co_yield system::error_code{}; (2)

        while (!ser.done())
        {
            auto sv = ser.read(buffer);
            co_await ws.cobalt_write({sv.data(), sv.size()}); (3)
        }

    }
    co_return {};
}
catch (system::system_error& e)
{
    co_return e.code();
}
catch (std::exception & e)
{
    std::cerr << "Error reading: " << e.what() << std::endl;
    throw;
}
1 只要套接字保持打开状态,就一直运行
2 co_yield 当前错误并检索新值。
3 向 websocket 写入一个帧

现在我们可以这样使用生成器

auto g = json_writer(my_ws);

extern std::vector<json::value> to_write;

for (auto && tw : std::move(to_write))
{
    if (auto ec = co_await g(std::move(tw)))
        return ec; // yield error
}

高级示例

更多示例仅以代码形式在仓库中提供。所有示例均列在下方。

表 5. 所有示例

example/http.cpp

一个执行单个 http get 请求的 http 客户端。

example/outcome.cpp

使用 boost.outcome 协程类型。

example/python.cpp & example/python.py

使用 nanobind 将 cobalt 与 python 集成。它使用 python 的 asyncio 作为执行器,并允许 C++ co_await python 函数,反之亦然。

example/signals.cpp

boost.signals2 转换为 awaitable 类型(单线程)。

example/spsc.cpp

创建基于 boost.lockfree 且可等待的 spsc_queue(多线程)。

example/thread.cpp

使用带有 asioconcurrent_channel 的工作线程。

example/thread_pool.cpp

使用 asio::thread_pool 并将 任务 派生到其上。

example/delay.cpp

延迟 部分使用的示例

example/delay_op.cpp

延迟操作 部分使用的示例

example/echo_server.cpp

echo 服务器 部分使用的示例

example/ticker.cpp

价格行情 部分使用的示例

example/channel.cpp

通道 引用使用的示例

设计

概念

该库有两个基本概念

awaitable 是可以在协程内部与 co_await 一起使用的表达式,例如:

co_await delay(50ms);

但是,协程 promise 可以定义一个 await_transform,即实际上什么可以与 co_await 表达式一起使用取决于协程。

因此,我们应该重新定义 awaitable 是什么:一个 awaitable 是一种类型,可以在协程内部 co_await,其 promise 不定义 await_transform

伪关键字是一种可以在协程中使用的类型,由于其 promise 的 await_transform,它为其添加了特殊功能。

this_coro 命名空间中的所有动词都是此类伪关键字。

auto exec = co_await this_coro::executor;
该库公开了一组 enable_* 协程基类,以便于创建自定义协程。这包括 enable_awaitables,它提供了一个 await_transform,只转发 awaitable

本文档中协程指的是异步协程,即不考虑 std::generator 等同步协程。

除了 main 之外的所有协程也是实际的 awaitable

执行器

由于一切都是异步的,库需要使用事件循环。因为一切都是单线程的,可以假设每个线程只有一个执行器,这足以满足 97% 的用例。因此,有一个 thread_local 执行器,它被协程对象用作默认值(尽管以副本形式存储在协程 promise 中)。

同样,库使用一种 executor 类型,默认为 asio::any_io_executor

如果您编写自己的协程,它应该持有执行器的副本,并有一个 get_executor 函数,以 const 引用返回它。

使用 Strands

虽然可以使用 strands,但它们与 thread_local 执行器不兼容。这是因为它们可能会切换线程,因此它们不能是 thread_local

如果您希望使用 strands(例如通过 spawn),则任何 promisegeneratorchannel 的执行器都必须手动分配。

通道 的情况下,这是一个构造函数参数,但对于其他协程类型,需要使用 asio::executor_arg。这是通过在参数列表(某个位置)中直接后跟要在协程参数列表中使用的执行器来完成的,例如:

cobalt::promise<void> example_with_executor(int some_arg, asio::executor_arg_t, cobalt::executor);

通过这种方式,协程 promise 可以从第三个参数中获取执行器,而不是默认使用 thread_local 执行器。

当然,参数可以设置为默认值,以减少不便,如果它们有时与 thread_local 执行器一起使用。

cobalt::promise<void> example_with_executor(int some_arg,
                                           asio::executor_arg_t = asio::executor_arg,
                                           cobalt::executor = cobalt::this_thread::get_executor());

如果在 strand 上省略此操作,则会抛出 asio::bad_allocator 类型的异常,或者——更糟糕的是——使用了错误的执行器。

多态内存资源

类似地,该库使用 thread_local pmr::memory_resource 来分配协程帧并用作异步操作中的分配器。

原因是用户可能希望自定义分配,例如避免锁、限制内存使用或监控使用情况。pmr 允许我们在不引入不必要的模板参数(即没有 promise 复杂性)的情况下实现这一点。然而,使用 pmr 会引入一些最小的开销,因此用户可以选择通过定义 BOOST_COBALT_NO_PMR 来禁用它。

op 使用内部资源,该资源针对 asio 的分配器用法进行了优化,而 gatherracejoin 使用单调资源来最小化分配。两者在定义 BOOST_COBALT_NO_PMR 的情况下仍然有效,在这种情况下,它们将使用 new/delete 作为上游分配。

启用 PMR 后,mainthread 为每个线程提供一个 pmr::unsynchronized_pool_resource

如果你编写自己的协程,它应该有一个返回 pmr::polymorphic_allocator 的 get_allocator 函数。

取消

cobalt 使用基于 asio::cancellation_signal 的隐式取消。这主要用于隐式(例如与 race 一起使用),因此示例中很少有显式使用。

如果您编写自定义协程,它必须从 get_cancellation_slot 函数返回一个 cancellation_slot,以便能够取消其他操作。
如果您编写自定义 awaitable,它可以在 await_suspend 中使用该函数来接收取消信号。

Promise

主要的协程类型是 promise,它是急切的。默认使用它的原因是可以优化不暂停的 promise,例如:

cobalt::promise<void> noop()
{
  co_return;
}

理论上,等待上述操作是一个空操作,但实际上,截至 2023 年,编译器尚未达到这一水平。

Race

最重要的同步机制是 race 函数。

它以伪随机顺序等待多个 awaitable,并在第一个完成时返回结果,然后忽略其余的。

也就是说,它以伪随机顺序启动 co_await,并在找到一个 awaitable 准备就绪或立即完成时停止。

cobalt::generator<int> gen1();
cobalt::generator<double> gen2();

cobalt::promise<void> p()
{
  auto g1 = gen1();
  auto g2 = gen2();
  while (!co_await cobalt::this_coro::cancelled)
  {
    switch(auto v = co_await race(g1, g2); v.index())
    {
    case 0:
      printf("Got int %d\n", get<0>(v));
      break;
    case 1:
      printf("Got double %f\n", get<1>(v));
      break;
    }
  }
}

然而,race 必须在内部等待所有 awaitable 在其启动 co_await 后完成。因此,一旦第一个 awaitable 完成,它就会尝试 中断 其余的,如果失败则取消它们。

race 是触发取消的首选方式,例如:

cobalt::promise<void> timeout();
cobalt::promise<void> work();

race(timeout(), work());

interrupt_await

如果它天真地取消,然而会丢失数据。因此,引入了 interrupt_await 的概念,它告诉 awaitable(如果支持)立即恢复 awaiter 并返回或抛出被忽略的值。

可中断 awaitable 的示例
struct awaitable
{
   bool await_ready() const;

   template<typename Promise>
   std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);

   T await_resume();

   void interrupt_await() &;
};

如果 interrupt_await 未导致立即恢复 (h),则 race 将发送取消信号。

race 以正确的引用限定符应用这些

auto g = gen1();
race(g, gen2());

如果可用,上述代码将为 g1 调用 interrupt_await() & 函数,为 g2 调用 interrupt_await() && 函数。

一般来说,cobalt 中的协程支持左值中断,即 interrupt_await() &channel 操作是非限定的,即在两种情况下都有效。

joingather 将转发中断,即只有当 gen2() 首先完成时,这才会中断 g1g2

关联器

cobalt 使用 asio 的 associator 概念,但对其进行了简化。也就是说,它有三个 associator,它们是等待 promise 的成员函数。

  • const executor_type & get_executor()(始终是 executor,必须按 const 引用返回)

  • allocator_type get_allocator()(始终是 pmr::polymorphic_allocator

  • cancellation_slot_type get_cancellation_slot()(必须与 asio::cancellation_slot 具有相同的接口)

cobalt 使用概念在其 await_suspend 函数中检查这些是否存在。

通过这种方式,自定义协程可以支持取消、执行器和分配器。

在自定义 awaitable 中,您可以这样获取它们

struct my_awaitable
{
    bool await_ready();
    template<typename T>
    void await_suspend(std::corutine_handle<P> h)
    {
        if constexpr (requires  (Promise p) {p.get_executor();})
            handle_executor(h.promise().get_executor();

        if constexpr (requires (Promise p) {p.get_cancellation_slot();})
            if ((cl = h.promise().get_cancellation_slot()).is_connected())
                cl.emplace<my_cancellation>();
    }

    void await_resume();
};

取消在 co_await 表达式中连接(如果协程和 awaitable 支持),包括 race 等同步机制。

线程

该库设计为单线程,因为这简化了恢复,从而更高效地处理 race 等同步。 race 需要锁定每个 race 的 awaitable 以避免数据丢失,这将是阻塞的,并且随着每个额外元素的增加而恶化。

除了 task(例如使用 spawn),任何协程都不能在与创建它的线程不同的线程上恢复。

主要的技术原因是,切换协程最有效的方式是通过 await_suspend 返回新协程的句柄,如下所示

struct my_awaitable
{
    bool await_ready();
    std::coroutine_handle<T> await_suspend(std::coroutine_handle<U>);
    void await_resume();
};

在这种情况下,等待协程将在调用 await_suspend 之前暂停,并恢复返回的协程。当然,如果需要通过执行器,这就不起作用。

这不仅适用于等待的协程,也适用于通道。此库中的通道使用 awaitable 的侵入式列表,并且可能会从写入操作的 await_suspend 返回读取(因此暂停)协程的句柄。

I/O

cobalt::io 命名空间为 boost.asio 提供了一个包装库。它被简化和编译以加快开发和编译时间。

参考

cobalt/main.hpp

开始使用 cobalt 应用程序最简单的方法是使用具有以下签名的 co_main 函数

cobalt::main co_main(int argc, char *argv[]);

声明 co_main 将添加一个 main 函数,该函数执行在事件循环上运行协程所需的所有步骤。这使我们能够编写非常简单的异步程序。

cobalt::main co_main(int argc, char *argv[])
{
  auto exec = co_await cobalt::this_coro::executor;             (1)
  asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
  co_await tim.async_wait(cobalt::use_op);                      (3)
  co_return 0;
}
1 获取 main 运行的执行器
2 与 asio 对象一起使用
3 co_await 一个 cobalt 操作

主 promise 将创建一个 asio::signal_set 并将其用于取消。SIGINT 变为完全取消,而 SIGTERM 变为终端取消。

取消将不会转发到 detached 协程。用户需要注意在取消时结束它们,否则程序将无法正常终止。

执行器

它还将创建一个 asio::io_context 来运行,您可以通过 this_coro::executor 获取它。它将被分配给 cobalt::this_thread::get_executor()

内存资源

它还创建了一个内存资源,该资源将用作内部内存分配的默认值。它将被分配给 thread_localcobalt::this_thread::get_default_resource()

Promise

每个协程都有一个内部状态,称为 promise(不要与 cobalt::promise 混淆)。根据协程属性的不同,可以 co_await 不同的事物,就像我们在上面的示例中使用的一样。

它们通过继承实现,并在不同的 promise 类型之间共享

主要的 promise 具有以下属性。

规范

  1. 声明 co_main 将隐式声明一个 main 函数

  2. main 仅在定义了 co_main 时才存在。

  3. SIGINTSIGTERM 将导致内部任务被取消。

cobalt/promise.hpp

Promise 是一个急切的协程,可以 co_awaitco_return 值。也就是说,它不能使用 co_yield

cobalt::promise<void> delay(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
}

cobalt::main co_main(int argc, char *argv[])
{
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}

Promise 默认是附加的。这意味着当 promise 句柄超出范围时会发送取消。

Promise 可以通过调用 detach 或使用前缀 + 运算符来分离。这是使用 detached 的运行时替代方案。分离的 Promise 在销毁时不会发送取消。

cobalt::promise<void> my_task();

cobalt::main co_main(int argc, char *argv[])
{
  +my_task(); (1)
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}
1 通过使用 +,任务将被分离。否则,编译器将生成 nodiscard 警告。

执行器

除非在任何位置使用后跟执行器参数的 asio::executor_arg,否则执行器将从 thread_local get_executor 函数获取。

cobalt::promise<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

内存资源

内存资源从 thread_local get_default_resource 函数获取,除非在任何位置使用后跟 polymorphic_allocator 参数的 std::allocator_arg

cobalt::promise<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概述

template<typename Return>
struct [[nodiscard]] promise
{
    promise(promise &&lhs) noexcept;
    promise& operator=(promise && lhs) noexcept;

    // enable `co_await`. (1)
    auto operator co_await ();

    // Ignore the return value, i.e. detach it. (2)
    void operator +() &&;

    // Cancel the promise.
    void cancel(asio::cancellation_type ct = asio::cancellation_type::all);

    // Check if the result is ready
    bool ready() const;
    // Check if the promise can be awaited.
    explicit operator bool () const; (3)

    // Detach or attach
    bool attached() const;
    void detach();
    void attach();
    // Create an already completed promimse

    static promise

    // Get the return value. If !ready() this function has undefined behaviour.
    Return get();
};
1 支持 中断等待
2 这允许使用简单的 +my_task() 表达式创建并行运行的 promise。
3 这允许类似 while (p) co_await p; 的代码

Promise

协程 promise (promise::promise_type) 具有以下属性。

Generator 是一个急切的协程,可以 co_awaitco_yield 值给调用者。

cobalt::generator<int> example()
{
  printf("In coro 1\n");
  co_yield 2;
  printf("In coro 3\n");
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main 1\n");
  printf("In main %d\n", co_await f);
  printf("In main %d\n", co_await f);
  return 0;
}

这将生成以下输出

In main 0
In coro 1
In main 1
In main 2
In coro 3
In main 4
generators1

Push(第二个模板参数)设置为非 void 时,可以将值推送到生成器中

cobalt::generator<int, int> example()
{
  printf("In coro 1\n");
  int i =  co_yield 2;
  printf("In coro %d\n", i);
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main %d\n", co_await f(3)); (1)
  co_return 0;
}
1 推送的值通过 operator() 传递给 co_yield 的结果。

这将生成以下输出

In main 0
In coro 1
In main 2
In coro 3

惰性

通过等待 initial 可以使生成器变为惰性。这个 co_await 表达式将产生 Push 值。这意味着生成器将等到第一次被等待时才开始工作,然后处理新推送的值并在下一个 co_yield 处恢复。

cobalt::generator<int, int> example()
{
  int v = co_await cobalt::this_coro::initial;
  printf("In coro %d\n", v);
  co_yield 2;
  printf("In coro %d\n", v);
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main 1\n"); // < this is now before the co_await initial
  printf("In main %d\n", co_await f(1));
  printf("In main %d\n", co_await f(3));
  return 0;
}

这将生成以下输出

In main 0
In main 1
In coro 1
In main 2
In coro 3
In main 4
generators2

执行器

除非在任何位置使用后跟执行器参数的 asio::executor_arg,否则执行器将从 thread_local get_executor 函数获取。

cobalt::generator<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

内存资源

内存资源从 thread_local get_default_resource 函数获取,除非在任何位置使用后跟 polymorphic_allocator 参数的 std::allocator_arg

cobalt::generator<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概述

template<typename Yield, typename Push = void>
struct [[nodiscard]] generator
{
  // Movable

  generator(generator &&lhs) noexcept = default;
  generator& operator=(generator &&) noexcept;

  // True until it co_returns & is co_awaited after (1)
  explicit operator bool() const;

  // Cancel the generator. (3)
  void cancel(asio::cancellation_type ct = asio::cancellation_type::all);

  // Check if a value is available
  bool ready() const;

  // Get the returned value. If !ready() this function has undefined behaviour.
  Yield get();

  // Cancel & detach the generator.
  ~generator();

  // an awaitable that results in value of Yield.
  using generator_awaitable = unspecified;

  // Present when Push != void
  generator_awaitable operator()(      Push && push);
  generator_awaitable operator()(const Push &  push);

  // Present when Push == void, i.e. can co_await the generator directly.
  generator_awaitable operator co_await (); (2)

};
1 这允许类似 while (gen) co_await gen: 的代码
2 支持 中断等待
3 取消的生成器可能可恢复
急切生成器的销毁将被延迟,惰性生成器将立即销毁。

cobalt/task.hpp

任务是一个惰性协程,可以 co_awaitco_return 值。也就是说,它不能使用 co_yield

cobalt::task<void> delay(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
}

cobalt::main co_main(int argc, char *argv[])
{
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}

promise 不同,任务可以在创建它的执行器之外的另一个执行器上被等待或派生。

执行器

由于 task 是惰性的,它在构造时不需要执行器。它会尝试从调用者或等待者(如果存在)获取执行器。否则,它将默认为 thread_local 执行器。

内存资源

内存资源不是thread_local get_default_resource 函数获取的,而是 pmr::get_default_resource(),除非在任何位置使用后跟 polymorphic_allocator 参数的 std::allocator_arg

cobalt::task<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概述

template<typename Return>
struct [[nodiscard]] task
{
    task(task &&lhs) noexcept = default;
    task& operator=(task &&) noexcept = default;

    // enable `co_await`
    auto operator co_await ();

};
通过调用 run(my_task()),任务可以从同步函数中同步使用。

use_task

use_task 完成令牌可用于从 cobalt_ 函数创建任务。这比 use_op 效率低,因为它需要分配协程帧,但返回类型更简单并支持 中断等待

cobalt/detached.hpp

Detached 是一个急切的协程,可以 co_await 但不能 co_return 值。也就是说,它不能被恢复,并且通常不会被等待。

cobalt::detached delayed_print(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
  printf("Hello world\n");
}

cobalt::main co_main(int argc, char *argv[])
{
  delayed_print();
  co_return 0;
}

Detached 用于轻松地在后台运行协程。

cobalt::detached my_task();

cobalt::main co_main(int argc, char *argv[])
{
  my_task(); (1)
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}
1 派生 detached 协程。

一个 detached 可以这样给自己分配一个新的取消源

cobalt::detached my_task(asio::cancellation_slot sl)
{
   co_await this_coro::reset_cancellation_source(sl);
   // do somework
}

cobalt::main co_main(int argc, char *argv[])
{
  asio::cancellation_signal sig;
  my_task(sig.slot()); (1)
  co_await delay(std::chrono::milliseconds(50));
  sig.emit(asio::cancellation_type::all);
  co_return 0;
}

执行器

除非在任何位置使用后跟执行器参数的 asio::executor_arg,否则执行器将从 thread_local get_executor 函数获取。

cobalt::detached my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

内存资源

内存资源从 thread_local get_default_resource 函数获取,除非在任何位置使用后跟 polymorphic_allocator 参数的 std::allocator_arg

cobalt::detached my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概述

struct detached {};
1 支持 中断等待

Promise

线程 detached 具有以下属性。

cobalt 中的操作是包装 asio 操作的 awaitable

use_op

use_op 令牌是创建 op 的直接方式,即使用 cobalt::use_op 作为完成令牌将创建所需的 awaitable。

auto tim = cobalt::use_op.as_default_on(asio::steady_timer{co_await cobalt::this_coro::executor});
co_await tim.async_wait();

根据完成签名,co_await 表达式可能会抛出异常。

签名 返回类型 异常

void()

void

无异常

void(T)

T

无异常

void(T…​)

std::tuple

无异常

void(system::error_code, T)

T

system::system_error

void(system::error_code, T…​)

std::tuple

system::system_error

void(std::exception_ptr, T)

T

任何异常

void(std::exception_ptr, T…​)

std::tuple

任何异常

use_op 永远不会立即完成,即 await_ready 总是返回 false,但总是暂停协程。

手动编码操作

操作是 [cobalt_operation] 功能的更高级实现。

该库使创建具有早期完成条件的异步操作变得容易,即完全避免协程暂停的条件。

例如,我们可以创建一个 wait_op,如果计时器已经过期,它什么也不做。

struct wait_op : cobalt::op<system::error_code> (1)
{
  asio::steady_timer & tim;

  wait_op(asio::steady_timer & tim) : tim(tim) {}

  bool ready(cobalt::handler<system::error_code> ) (2)
  {
    if (tim.expiry() < std::chrono::steady_clock::now())
        h(system::error_code{});
  }
  void initiate(cobalt::completion_handler<system::error_code> complete) (3)
  {
    tim.async_wait(std::move(complete));
  }
};
1 继承带有匹配签名的 opawait_transform 会将其提取
2 检查操作是否就绪 - 从 await_ready 调用
3 如果操作未就绪,则启动它。

cobalt/concepts.hpp

Awaitable

Awaitable 是可以在 co_await 中使用的表达式。

template<typename Awaitable, typename Promise = void>
concept awaitable_type = requires (Awaitable aw, std::coroutine_handle<Promise> h)
{
    {aw.await_ready()} -> std::convertible_to<bool>;
    {aw.await_suspend(h)};
    {aw.await_resume()};
};

template<typename Awaitable, typename Promise = void>
concept awaitable =
        awaitable_type<Awaitable, Promise>
    || requires (Awaitable && aw) { {std::forward<Awaitable>(aw).operator co_await()} -> awaitable_type<Promise>;}
    || requires (Awaitable && aw) { {operator co_await(std::forward<Awaitable>(aw))} -> awaitable_type<Promise>;};
该库中的 awaitable 要求协程 promise 如果提供执行器,则以 const 引用返回执行器。否则它将使用 this_thread::get_executor()

启用 awaitable

继承 enable_awaitables 将使协程能够通过 await_transform co_await 任何在没有 await_transform 的情况下可以 co_await 的东西。

cobalt/this_coro.hpp

this_coro 命名空间提供了访问协程 promise 内部状态的实用工具。

伪 awaitable

// Awaitable type that returns the executor of the current coroutine.
struct executor_t {}
constexpr executor_t executor;

// Awaitable type that returns the cancellation state of the current coroutine.
struct cancellation_state_t {};
constexpr cancellation_state_t cancellation_state;

// Reset the cancellation state with custom or default filters.
constexpr unspecified reset_cancellation_state();
template<typename Filter>
constexpr unspecified reset_cancellation_state(
    Filter && filter);
template<typename InFilter, typename OutFilter>
constexpr unspecified reset_cancellation_state(
    InFilter && in_filter,
    OutFilter && out_filter);

// get & set the throw_if_cancelled setting.
unspecified throw_if_cancelled();
unspecified throw_if_cancelled(bool value);

// Set the cancellation source in a detached.
unspecified reset_cancellation_source();
unspecified reset_cancellation_source(asio::cancellation_slot slot);


// get the allocator the promise
struct allocator_t {};
constexpr allocator_t allocator;

// get the current cancellation state-type
struct cancelled_t {};
constexpr cancelled_t cancelled;

// set the over-eager mode of a generator
struct initial_t {};
constexpr initial_t initial;

Await 分配器

支持 enable_await_allocator 的协程的分配器可以通过以下方式获取

co_await cobalt::this_coro::allocator;

为了为您自己的协程启用此功能,您可以使用 CRTP 模式继承 enable_await_allocator

struct my_promise : cobalt::enable_await_allocator<my_promise>
{
  using allocator_type = __your_allocator_type__;
  allocator_type get_allocator();
};
如果可用,分配器将由 use_op 使用

Await 执行器

支持 enable_await_executor 的协程的执行器可以通过以下方式获取

co_await cobalt::this_coro::executor;

为了为您自己的协程启用此功能,您可以使用 CRTP 模式继承 enable_await_executor

struct my_promise : cobalt::enable_await_executor<my_promise>
{
  using executor_type = __your_executor_type__;
  executor_type get_executor();
};
如果可用,执行器将由 use_op 使用

Await 延迟

您的协程 promise 可以继承 enable_await_deferred,以便在 co_await 表达式中支持单个签名 asio::deferred

由于 asio::deferred 现在是默认完成令牌,因此这允许以下代码,而无需指定任何完成令牌或其他特化。

asio::steady_timer t{co_await cobalt::this_coro::executor};
co_await t.async_wait();

内存资源基类

promise 的 promise_memory_resource_base 基类将提供一个 get_allocator,该分配器从默认资源或遵循 std::allocator_arg 参数传递的资源中获取。同样,它将添加 operator new 重载,以便协程为其帧分配使用相同的内存资源。

如果取消则抛出

promise_throw_if_cancelled_base 提供了基本选项,允许操作使协程在等待另一个实际的 awaitable 时抛出异常。

co_await cobalt::this_coro::throw_if_cancelled;

取消状态

promise_cancellation_base 提供了基本选项,允许操作启用协程具有可通过 reset_cancellation_state 重置的 cancellation_state。

co_await cobalt::this_coro::reset_cancellation_state();

为了方便起见,还有一个快捷方式可以检查当前的取消状态

asio::cancellation_type ct = (co_await cobalt::this_coro::cancellation_state).cancelled();
asio::cancellation_type ct = co_await cobalt::this_coro::cancelled; // same as above

cobalt/this_thread.hpp

由于一切都是单线程的,此库为每个线程提供了一个执行器和默认内存资源。

namespace boost::cobalt::this_thread
{

pmr::memory_resource* get_default_resource() noexcept; (1)
pmr::memory_resource* set_default_resource(pmr::memory_resource* r) noexcept; (2)
pmr::polymorphic_allocator<void> get_allocator(); (3)

typename asio::io_context::executor_type & get_executor(); (4)
void set_executor(asio::io_context::executor_type exec) noexcept; (5)

}
1 获取默认资源 - 除非设置,否则将为 pmr::get_default_resource
2 设置默认资源 - 返回之前设置的资源
3 获取一个包装 (1) 的分配器
4 获取线程的执行器 - 如果未设置则抛出
5 设置当前线程的执行器。

协程将使用这些作为默认值,但会保留一个副本以防万一。

唯一的例外是 cobalt 操作的初始化,它将使用 this_thread::executor 进行重新抛出。

cobalt/channel.hpp

通道可用于在单个线程上的不同协程之间交换数据。

概述

通道概述
template<typename T>
struct channel
{
  // create a channel with a buffer limit, executor & resource.
  explicit
  channel(std::size_t limit = 0u,
          executor executor = this_thread::get_executor(),
          pmr::memory_resource * resource = this_thread::get_default_resource());
  // not movable.
  channel(channel && rhs) noexcept = delete;
  channel & operator=(channel && lhs) noexcept = delete;

  using executor_type = executor;
  const executor_type & get_executor();

  // Closes the channel
  ~channel();
  bool is_open() const;
  // close the operation, will cancel all pending ops, too
  void close();

  // an awaitable that yields T
  using read_op = unspecified;

  // an awaitable that yields void
  using write_op = unspecified;

  // read a value to a channel
  read_op  read();

  // write a value to the channel
  write_op write(const T  && value);
  write_op write(const T  &  value);
  write_op write(      T &&  value);
  write_op write(      T  &  value);

  // write a value to the channel if T is void

};

描述

通道是两个协程通信和同步的工具。

const std::size_t buffer_size = 2;
channel<int> ch{exec, buffer_size};

// in coroutine (1)
co_await ch.write(42);

// in coroutine (2)
auto val = co_await ch.read();
1 向通道发送一个值 - 将阻塞直到可以发送
2 从通道读取一个值 - 将阻塞直到有一个值可等待。

两个操作都可能阻塞,具体取决于通道缓冲区大小。

如果缓冲区大小为零,则 readwrite 需要同时发生,即充当集合点。

如果缓冲区未满,则写操作不会暂停协程;同样,如果缓冲区不为空,则读操作不会暂停。

如果两个操作同时完成(空缓冲区总是如此),则第二个操作将被发布到执行器以供稍后完成。

通道类型可以是 void,在这种情况下 write 不需要参数。

通道操作可以在不丢失数据的情况下被取消。这使得它们可以与 race 一起使用。

generator<variant2::variant<int, double>> merge(
    channel<int> & c1,
    channel<double> & c2)
{
    while (c1 && c2)
       co_yield co_await race(c1, c2);
}

示例

cobalt::promise<void> producer(cobalt::channel<int> & chan)
{
  for (int i = 0; i < 4; i++)
    co_await chan.write(i);

  chan.close();
}

cobalt::main co_main(int argc, char * argv[])
{
  cobalt::channel<int> c;

  auto p = producer(c);
  while (c.is_open())
    std::cout << co_await c.read() << std::endl;

  co_await p;
  co_return 0;
}

此外,还提供了 channel_reader,以使读取通道更方便且可与 BOOST_COBALT_FOR 一起使用。

cobalt::main co_main(int argc, char * argv[])
{
  cobalt::channel<int> c;

  auto p = producer(c);
  BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
    std::cout << value << std::endl;

  co_await p;
  co_return 0;
}

cobalt/with.hpp

with 设施提供了一种执行协程异步拆卸的方法。也就是说,它就像一个异步析构函数调用。

struct my_resource
{
  cobalt::promise<void> await_exit(std::exception_ptr e);
};

cobalt::promise<void> work(my_resource & res);

cobalt::promise<void> outer()
{
  co_await cobalt::with(my_resource(), &work);
}

拆卸可以通过提供 await_exit 成员函数或返回 awaitabletag_invoke 函数来完成,或者通过将拆卸作为 with 的第三个参数提供。

using ws_stream = beast::websocket::stream<asio::ip::tcp::socket>>;
cobalt::promise<ws_stream> connect(urls::url); (1)
cobalt::promise<void>   disconnect(ws_stream &ws); (2)

auto teardown(const boost::cobalt::with_exit_tag & wet , ws_stream & ws, std::exception_ptr e)
{
  return disconnect(ws);
}

cobalt::promise<void> run_session(ws_stream & ws);

cobalt::main co_main(int argc, char * argv[])
{
  co_await cobalt::with(co_await connect(argv[1]), &run_session, &teardown);
  co_return 0;
}
1 实现 websocket 连接和 websocket 初始化
2 实现有序关机。
如果作用域在没有异常的情况下退出,则 std::exception_ptr 为空。注意:exit 函数通过引用获取 exception_ptr 并修改它是合法的。

cobalt/race.hpp

race 函数可用于从一组 awaitableco_await 一个 awaitable

它可以作为带有多个 awaitable 的可变参数函数调用,也可以作为 awaitable 范围调用。

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_wait()
{
  co_await cobalt::race(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::race(aws); (2)
}
1 等待一组可变参数的 awaitable
2 等待向量 awaitable

race 的第一个参数可以是 uniform random bit generator

race 的签名
extern promise<void> pv1, pv2;
std::vector<promise<void>> pvv;

std::default_random_engine rdm{1};
// if everything returns void race returns the index
std::size_t r1 = co_await race(pv1, pv2);
std::size_t r2 = co_await race(rdm, pv1, pv2);
std::size_t r3 = co_await race(pvv);
std::size_t r4 = co_await race(rdm, pvv);

// variant if not everything is void. void become monostate
extern promise<int> pi1, pi2;
variant2::variant<monostate, int, int> r5 = co_await race(pv1, pi1, pi2);
variant2::variant<monostate, int, int> r6 = co_await race(rdm, pv1, pi1, pi2);

// a range returns a pair of the index and the result if non-void
std::vector<promise<int>> piv;
std::pair<std::size_t, int> r7 = co_await race(piv);
std::pair<std::size_t, int> r8 = co_await race(rdm, piv);

中断等待

当参数作为右值引用传递时,race 将尝试在 awaitable 上使用 .interrupt_await 来通知 awaitable 立即完成,并且结果将被忽略。如果支持,Awaitable 必须在 interrupt_await 返回之前恢复等待协程。如果 race 未检测到该函数,它将发送取消。

这意味着你可以像这样重复使用 race

cobalt::promise<void> do_wait()
{
  auto t1 = task1();
  auto t2 = task2();
  co_await cobalt::race(t1, t2); (1)
  co_await cobalt::race(t1, t2); (2)
}
1 等待第一个任务完成
2 等待另一个任务完成

这受 promisegeneratorgather 支持。

race 将调用 awaitable 的函数,就像在 co_await 表达式中使用一样,或者根本不评估它们。

left_race

left_race 函数类似于 race,但遵循严格的从左到右扫描。这可能导致饥饿问题,这就是为什么它不是推荐的默认值,但如果采取适当的措施,它对优先级排序很有用。

概述

// Concept for the random number generator.
template<typename G>
  concept uniform_random_bit_generator =
    requires ( G & g)
    {
      {typename std::decay_t<G>::result_type() } -> std::unsigned_integral; // is an unsigned integer type
      // T	Returns the smallest value that G's operator() may return. The value is strictly less than G::max(). The function must be constexpr.
      {std::decay_t<G>::min()} -> std::same_as<typename std::decay_t<G>::result_type>;
      // T	Returns the largest value that G's operator() may return. The value is strictly greater than G::min(). The function must be constexpr.
      {std::decay_t<G>::max()} -> std::same_as<typename std::decay_t<G>::result_type>;
      {g()} -> std::same_as<typename std::decay_t<G>::result_type>;
    } && (std::decay_t<G>::max() > std::decay_t<G>::min());


// Variadic race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
         uniform_random_bit_generator URBG, awaitable ... Promise>
awaitable race(URBG && g, Promise && ... p);

// Ranged race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
         uniform_random_bit_generator URBG, range<awaitable> PromiseRange>
awaitable race(URBG && g, PromiseRange && p);

// Variadic race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable race(Promise && ... p);

// Ranged race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable race(PromiseRange && p);

// Variadic left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable left_race(Promise && ... p);

// Ranged left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable left_race(PromiseRange && p);
选择空范围将导致抛出异常。

cobalt/gather.hpp

gather 函数可用于同时 co_await 多个 awaitable,并传递取消。

该函数将收集所有完成并以 system::result 的形式返回它们,即以值形式捕获概念。一个 awaitable 抛出异常不会取消其他 awaitable。

它可以作为带有多个 Awaitable 的可变参数函数调用,也可以作为 awaitable 范围调用。

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_gather()
{
  co_await cobalt::gather(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::gather(aws); (2)
}
1 等待一组可变参数的 awaitable
2 等待一个 awaitable 向量

gather 将调用 awaitable 的函数,就像在 co_await 表达式中使用一样。

gather 的签名
extern promise<void> pv1, pv2;
std::tuple<system::result<int>, system::result<int>> r1 = co_await gather(pv1, pv2);

std::vector<promise<void>> pvv;
pmr::vector<system::result<void>> r2 =  co_await gather(pvv);

extern promise<int> pi1, pi2;
std::tuple<system::result<monostate>,
           system::result<monostate>,
           system::result<int>,
           system::result<int>> r3 = co_await gather(pv1, pv2, pi1, pi2);

std::vector<promise<int>> piv;
pmr::vector<system::result<int>> r4 = co_await gather(piv);

概述

// Variadic gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable gather(Promise && ... p);

// Ranged gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable gather(PromiseRange && p);

cobalt/join.hpp

join 函数可用于同时 co_await 多个 awaitable,并正确连接取消。

该函数将收集所有完成并将其作为值返回,除非抛出异常。如果抛出异常,所有未完成的操作都将被取消(如果可能,则中断),并重新抛出第一个异常。

void 将作为 variant2::monostate 在元组中返回,除非所有 awaitable 都产生 void。

它可以作为带有多个 Awaitable 的可变参数函数调用,也可以作为 awaitable 范围调用。

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_join()
{
  co_await cobalt::join(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::join(aws); (2)
}
1 等待一组可变参数的 awaitable
2 等待一个 awaitable 向量

join 将调用 awaitable 的函数,就像在 co_await 表达式中使用一样。

join 的签名
extern promise<void> pv1, pv2;
/* void */ co_await join(pv1, pv2);

std::vector<promise<void>> pvv;
/* void */ co_await join(pvv);

extern promise<int> pi1, pi2;
std::tuple<monostate, monostate, int, int> r1 = co_await join(pv1, pv2, pi1, pi2);

std::vector<promise<int>> piv;
pmr::vector<int> r2 = co_await join(piv);

概述

// Variadic join
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable join(Promise && ... p);

// Ranged join
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable join(PromiseRange && p);
选择空范围将导致异常。

cobalt/wait_group.hpp

wait_group 函数可用于管理多个 promise 类型的协程。它与 cobalt/with.hpp 开箱即用,因为它具有匹配的 await_exit 成员。

本质上,wait_group 是一个动态的 promise 列表,它有一个 race 函数 (wait_one),一个 gather 函数 (wait_all),并在作用域退出时进行清理。

struct wait_group
{
    // create a wait_group
    explicit
    wait_group(asio::cancellation_type normal_cancel = asio::cancellation_type::none,
               asio::cancellation_type exception_cancel = asio::cancellation_type::all);

    // insert a task into the group
    void push_back(promise<void> p);

    // the number of tasks in the group
    std::size_t size() const;
    // remove completed tasks without waiting (i.e. zombie tasks)
    std::size_t reap();
    // cancel all tasks
    void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
    // wait for one task to complete.
    wait_one_op wait_one();
    // wait for all tasks to complete
    wait_op wait();
    // wait for all tasks to complete
    wait_op operator co_await ();
    // when used with with , this will receive the exception
    // and wait for the completion
    // if ep is set, this will use the exception_cancel level,
    // otherwise the normal_cancel to cancel all promises.
    wait_op await_exit(std::exception_ptr ep);
};

cobalt/spawn.hpp

spawn 函数允许在 asio executor/execution_context 上运行 task,并使用 完成令牌 消费结果。

auto spawn(Context & context, task<T> && t, CompletionToken&& token);
auto spawn(Executor executor, task<T> && t, CompletionToken&& token);

Spawn 将调度其启动并发布完成。这使得使用 task 在另一个执行器上运行任务并在当前执行器上使用 use_op 消费结果是安全的。也就是说,spawn 可用于跨线程。

示例

cobalt::task<int> work();

int main(int argc, char *argv[])
{
  asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_1};
  auto f = spawn(ctx, work(), asio::use_future);
  ctx.run();

  return f.get();
}
调用者需要确保执行器不会同时在多个线程上运行,例如,通过使用单线程 asio::io_context

cobalt/run.hpp

run 函数类似于 spawn,但同步运行。它将在内部设置执行上下文和内存资源。

这在将一段 cobalt 代码集成到同步应用程序中时非常有用。

概述

// Run the task and return it's value or rethrow any exception.
T run(task<T> t);

示例

cobalt::task<int> work();

int main(int argc, char *argv[])
{
  return run(work());
}

cobalt/thread.hpp

线程类型是创建类似于 main 但不使用 signal_set 的环境的另一种方式。

cobalt::thread my_thread()
{
  auto exec = co_await cobalt::this_coro::executor;             (1)
  asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
  co_await tim.async_wait(cobalt::use_op);                      (3)
  co_return 0;
}
1 获取线程运行的执行器
2 与 asio 对象一起使用
3 co_await 一个 cobalt 操作

要使用线程,您可以像使用 std::thread 一样使用它

int main(int argc, char * argv[])
{
  auto thr = my_thread();
  thr.join();
  return 0;
}

线程也是一个 awaitable(包括取消)。

cobalt::main co_main(int argc, char * argv[])
{
  auto thr = my_thread();
  co_await thr;
  co_return 0;
}
销毁一个 detached 线程将导致硬停止(io_context::stop)并加入线程。
除了等待 cobalt/thread.hppcobalt/spawn.hpp 之外,该库中没有任何东西是线程安全的。如果您需要在线程之间传输数据,您将需要一个线程安全的实用工具,例如 asio::concurrent_channel。您不能在线程之间共享任何 cobalt 原语,唯一的例外是能够将 task 派生 到另一个线程的执行器上。

执行器

它还将创建一个 asio::io_context 来运行,您可以通过 this_coro::executor 获取它。它将被分配给 cobalt::this_thread::get_executor()

内存资源

它还创建了一个内存资源,该资源将用作内部内存分配的默认值。它将被分配给 thread_localcobalt::this_thread::get_default_resource()

概述

struct thread
{
  // Send a cancellation signal
  void cancel(asio::cancellation_type type = asio::cancellation_type::all);


  // Allow the thread to be awaited. NOOP if the thread is invalid.
  auto operator co_await() &-> detail::thread_awaitable; (1)
  auto operator co_await() && -> detail::thread_awaitable; (2)

  // Stops the io_context & joins the executor
  ~thread();
  /// Move constructible
  thread(thread &&) noexcept = default;

  using executor_type = executor;

  using id = std::thread::id;
  id get_id() const noexcept;

  // Add the functions similar to `std::thread`
  void join();
  bool joinable() const;
  void detach();

  executor_type get_executor() const;
};
1 支持 中断等待
2 始终转发取消

cobalt/result.hpp

Awaitable 可以修改为返回 system::resultstd::tuple,而不是使用异常。

// value only
T res = co_await foo();

// as result
system::result<T, std::exception_ptr> res = co_await cobalt::as_result(foo());

// as tuple
std::tuple<std::exception_ptr, T> res = co_await cobalt::as_tuple(foo());

Awaitable 还可以通过提供使用 cobalt::as_result_tagcobalt::as_tuple_tagawait_resume 重载来提供处理结果和元组的自定义方式。

your_result_type await_resume(cobalt::as_result_tag);
your_tuple_type  await_resume(cobalt::as_tuple_tag);

这允许 awaitable 提供除 std::exception_ptr 之外的其他错误类型,例如 system::error_code。这由 opchannel 完成。

// example of an op with result system::error_code, std::size_t
system::result<std::size_t>                 await_resume(cobalt::as_result_tag);
std::tuple<system::error_code, std::size_t> await_resume(cobalt::as_tuple_tag);
Awaitable 仍然允许抛出异常,例如对于 OOM 等关键异常。

cobalt/async_for.hpp

对于像生成器这样的类型,提供了 BOOST_COBALT_FOR 宏,以模拟 for co_await 循环。

cobalt::generator<int> gen();

cobalt::main co_main(int argc, char * argv[])
{
    BOOST_COBALT_FOR(auto i, gen())
        printf("Generated value %d\n", i);

    co_return 0;
}

要求是用于 for 循环的 awaitable 具有 operator bool 来检查它是否可以再次被等待。对于 generatorpromise 来说是这种情况。

cobalt/error.hpp

为了更方便地管理错误,cobalt 提供了一个 error_categoryboost::system::error_code 一起使用。

enum class error
{
  moved_from,
  detached,
  completed_unexpected,
  wait_not_ready,
  already_awaited,
  allocation_failed
};

system::error_category & cobalt_category();
system::error_code make_error_code(error e);

cobalt/composition.hpp

包含 cobalt/composition.hpp 提供了将任何函数(最后一个参数为 cobalt::completion_handler)转换为组合操作的定义。这允许简单地创建组合操作。

co_await 语句的结果会自动转换为元组(类似于使用的 as_tuple)以避免异常。

结果需要 co_return。传递给协程的 Handler 不得修改。

struct echo_op final : op<system::error_code>
{
  echo_op(cobalt::io::stream & str) : stream(str) {}
  void initiate(completion_handler<system::error_code>) final;
  cobalt::io::stream & str;
};

void echo_op::initiate(completion_handler<system::error_code, std::size_t>)
{
  char buf[4096];
  auto [ec, n] = co_await str.read_some(buf);

  auto buf = asio::buffer(buf, n);

  while (!ec && !buf.empty() && !!co_await this_coro::cancelled)
  {
    std::tie(ec, n) = co_await str.write_some(buf);
    buf += n;
  }

  if (!!co_await this_coro::cancelled)
    co_return {asio::error::operation_aborted, m};
  else
    co_return {system::error_code{}, m};
}

cobalt/config.hpp

配置添加器允许配置 boost.cobalt 的一些实现细节。

executor_type

执行器类型默认为 boost::asio::any_io_executor

您可以通过定义 BOOST_COBALT_CUSTOM_EXECUTOR 并自行添加 boost::cobalt::executor 类型来将其设置为 boost::asio::any_io_executor

或者,可以定义 BOOST_COBALT_USE_IO_CONTEXT 来将执行器设置为 boost::asio::io_context::executor_type

pmr

Boost.cobalt 可以与不同的 pmr 实现一起使用,默认为 std::pmr

可以使用以下宏进行配置

  • BOOST_COBALT_USE_STD_PMR

  • BOOST_COBALT_USE_BOOST_CONTAINER_PMR

  • BOOST_COBALT_USE_CUSTOM_PMR

如果您定义 BOOST_COBALT_USE_CUSTOM_PMR,您将需要提供一个 boost::cobalt::pmr 命名空间,它是 std::pmr 的直接替代品。

或者,可以使用以下方式禁用 pmr

  • BOOST_COBALT_NO_PMR.

在这种情况下,cobalt 将为同步函数(racegatherjoin)使用非 pmr 单调资源。

use_op 使用一个小型缓冲区优化资源,其大小可以通过定义 BOOST_COBALT_SBO_BUFFER_SIZE 来设置,默认为 4096 字节。

cobalt/io/buffer.hpp

缓冲区头文件提供了用于 IO 函数的通用缓冲区序列表示。它实现为一个单个缓冲区后跟一个缓冲区跨度。这允许从前面丢弃字节,也允许使用单个 buffer,一个 registered_buffer

const_buffer_sequence 实现 ConstBufferSequencemutable_buffer_sequence 实现 MutableBufferSequence

namespace io
{

// Aliases for the asio functions.
using asio::buffer;
using asio::buffer_copy;
using asio::buffer_size;

}

mutable_buffer_sequence

namespace io
{

using asio::buffer;
using asio::mutable_buffer;


struct mutable_buffer_sequence
{
  std::size_t buffer_count() const;
  mutable_buffer_sequence(asio::mutable_registered_buffer buffer = {});
  mutable_buffer_sequence(asio::mutable_buffer head);
  template<typename T>
    requires (std::constructible_from<std::span<const asio::mutable_buffer>, T&&>)
  mutable_buffer_sequence(T && value);
  mutable_buffer_sequence(std::span<const asio::mutable_buffer> spn);

  // drop n bytes from the buffer sequence
  mutable_buffer_sequence & operator+=(std::size_t n);
  // a random access iterator over the buffers
  struct const_iterator;

  const_iterator begin() const;
  const_iterator   end() const;

  // is this a registered buffer
  bool is_registered() const;
};

// Invokes Func either with the mutable_buffer_sequence, an `asio::mutable_registered_buffer` or a `asio::mutable_buffer`.
template<typename Func>
auto visit(const mutable_buffer_sequence & seq, Func && func);

const_buffer_sequence

using asio::const_buffer;

struct const_buffer_sequence
{
  std::size_t buffer_count();
  const_buffer_sequence(asio::const_registered_buffer buffer);
  const_buffer_sequence(asio::mutable_registered_buffer buffer);

  const_buffer_sequence(asio::const_buffer head)  { this->head_ = head; }
  const_buffer_sequence(asio::mutable_buffer head)  { this->head_ = head; }

  template<typename T>
    requires (std::constructible_from<std::span<const asio::const_buffer>, T&&>)
  const_buffer_sequence(T && value);
  const_buffer_sequence(std::span<const asio::const_buffer> spn);

  // drop bytes from the front, i.e. advance the buffer
  const_buffer_sequence & operator+=(std::size_t n);

  // a random access iterator over the buffers
  struct const_iterator;

  // Access the sequence as a range.
  const_iterator begin() const;
  const_iterator   end() const;

  // is this a registered buffer
  bool is_registered() const;
};

// Invokes Func either with the const_buffer_sequence, an `asio::const_registered_buffer` or a `asio::const_buffer.
template<typename Func>
auto visit(const const_buffer_sequence & seq, Func && func);

}

cobalt/io/ops.hpp

此包装器中的大多数功能都是通过操作实现的。

它们是类型擦除的,但不是通过使用 virtual。这使得去虚拟化更容易。必须提供 implementation 函数,而 try_implementation_t 是可选的,并将在 ready 函数中使用。两者都将以 void *this 作为第一个参数调用。

struct [[nodiscard]] write_op final : op<system::error_code, std::size_t>
{
  const_buffer_sequence buffer;

  using     implementation_t = void(void*, const_buffer_sequence, completion_handler<system::error_code, std::size_t>);
  using try_implementation_t = void(void*, const_buffer_sequence,            handler<system::error_code, std::size_t>);

  write_op(const_buffer_sequence buffer,
           void * this_,
           implementation_t *implementation,
           try_implementation_t * try_implementation = nullptr);


  void initiate(completion_handler<system::error_code, std::size_t> handler) final;
  void ready(handler<system::error_code, std::size_t> handler) final;
};



struct [[nodiscard]] read_op final : op<system::error_code, std::size_t>
{
  mutable_buffer_sequence buffer;

  using     implementation_t = void(void*, mutable_buffer_sequence, completion_handler<system::error_code, std::size_t>);
  using try_implementation_t = void(void*, mutable_buffer_sequence,            handler<system::error_code, std::size_t>);

  read_op(mutable_buffer_sequence buffer,
           void * this_,
           implementation_t *implementation,
           try_implementation_t * try_implementation = nullptr);

  void initiate(completion_handler<system::error_code, std::size_t> handler) final;
  void ready(handler<system::error_code, std::size_t> handler) final;
};


struct [[nodiscard]] write_at_op final : op<system::error_code, std::size_t>
{
  std::uint64_t offset;
  const_buffer_sequence buffer;

  using     implementation_t = void(void*, std::uint64_t, const_buffer_sequence, completion_handler<system::error_code, std::size_t>);
  using try_implementation_t = void(void*, std::uint64_t, const_buffer_sequence,            handler<system::error_code, std::size_t>);

  write_at_op(std::uint64_t offset,
           const_buffer_sequence buffer,
           void * this_,
           implementation_t *implementation,
           try_implementation_t * try_implementation = nullptr);

  void initiate(completion_handler<system::error_code, std::size_t> handler) final;
  void ready(handler<system::error_code, std::size_t> handler) final;
};


struct [[nodiscard]] read_at_op final : op<system::error_code, std::size_t>
{
  std::uint64_t offset;
  mutable_buffer_sequence buffer;

  using     implementation_t = void(void*, std::uint64_t, mutable_buffer_sequence, completion_handler<system::error_code, std::size_t>);
  using try_implementation_t = void(void*, std::uint64_t, mutable_buffer_sequence,            handler<system::error_code, std::size_t>);

  read_at_op(std::uint64_t offset,
          mutable_buffer_sequence buffer,
          void * this_,
          implementation_t *implementation,
          try_implementation_t * try_implementation = nullptr);


  void initiate(completion_handler<system::error_code, std::size_t> handler) final;
  void ready(handler<system::error_code, std::size_t> handler) final;
};


struct [[nodiscard]] wait_op final : op<system::error_code>
{
  using     implementation_t = void(void*, completion_handler<system::error_code>);
  using try_implementation_t = void(void*,            handler<system::error_code>);

  wait_op(void * this_,
          implementation_t *implementation,
          try_implementation_t * try_implementation = nullptr);

  void initiate(completion_handler<system::error_code> handler) final;
  void ready(handler<system::error_code> handler) final;
};

当操作未被等待时,操作可以重用,并且公共成员(例如缓冲区)可以修改。例如:

write_op wo = do_w;
auto sz = co_await wo;
while (sz > 0)
{
    wo.buffer += sz;
    sz = co_await wo;
}

cobalt/io/steady_timer.hpp

steady_timer 是 asio::steady_timer 的一个简单包装器。

如果计时器已经过期,co_await t.wait() 不会暂停。
struct steady_timer
{
  /// The clock type.
  typedef std::chrono::steady_clock clock_type;

  /// The duration type of the clock.
  typedef typename clock_type::duration duration;

  /// The time point type of the clock.
  typedef typename clock_type::time_point time_point;

  steady_timer(const cobalt::executor & executor = this_thread::get_executor());
  steady_timer(const time_point& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
  steady_timer(const duration& expiry_time,   const cobalt::executor & executor = this_thread::get_executor());

  // cancel the timer. This is safe to call from another thread
  void cancel();

  // The current expiration time.
  time_point expiry() const;
  // Rest the expiry time, either with an absolute time or a duration.
  void reset(const time_point& expiry_time);
  void reset(const duration& expiry_time);

  // Check if the timer is already expired.
  bool expired() const;
  // Get the wait operation.
  [[nodiscard]] wait_op wait();


};

cobalt/io/system_timer.hpp

system_timer 是 asio::system_timer 的一个简单包装器。

如果计时器已经过期,co_await t.wait() 不会暂停。
struct system_timer
{
  /// The clock type.
  typedef std::chrono::system_clock clock_type;

  /// The duration type of the clock.
  typedef typename clock_type::duration duration;

  /// The time point type of the clock.
  typedef typename clock_type::time_point time_point;

  system_timer(const cobalt::executor & executor = this_thread::get_executor());
  system_timer(const time_point& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
  system_timer(const duration& expiry_time,   const cobalt::executor & executor = this_thread::get_executor());

  // cancel the timer. This is safe to call from another thread
  void cancel();

  // The current expiration time.
  time_point expiry() const;
  // Rest the expiry time, either with an absolute time or a duration.
  void reset(const time_point& expiry_time);
  void reset(const duration& expiry_time);

  // Check if the timer is already expired.
  bool expired() const;
  // Get the wait operation.
  [[nodiscard]] wait_op wait();


};

cobalt/io/sleep.hpp

sleep 操作是计时器的便利包装器。每个 sleep 操作都会创建一个计时器,因此重用计时器通常更高效。

// Waits using a steady_timer.
template<typename Duration>
inline auto sleep(const std::chrono::time_point<std::chrono::steady_clock, Duration> & tp);

// Waits using a system_timer.
template<typename Duration>
inline auto  sleep(const std::chrono::time_point<std::chrono::system_clock, Duration> & tp);

// Waits using a steady_timer.
template<typename Rep, typename Period>
inline auto sleep(const std::chrono::duration<Rep, Period> & dur);

cobalt/io/signal_set.hpp

signal_set 是 signal_set 的包装器。

概述

struct signal_set
{
  signal_set(const cobalt::executor & executor = this_thread::get_executor());
  signal_set(std::initializer_list<int> sigs, const cobalt::executor & executor = this_thread::get_executor());

  // Cancel all operations associated with the signal set.
  [[nodiscard]] system::result<void> cancel();
  // Remove all signals from a signal_set.
  [[nodiscard]] system::result<void> clear();
  // Add a signal to a signal_set.
  [[nodiscard]] system::result<void> add(int signal_number);
  // Remove a signal from a signal_set.
  [[nodiscard]] system::result<void> remove(int signal_number);
};

示例

io::signal_set s = {SIGINT, SIGTERM};
int sig = co_await s.wait();
if (s == SIGINT)
  interrupt();
else if (s == SIGTERM)
  terminate();

cobalt/io/stream.hpp

流是一个 IO 对象,允许读写,例如 tcp 套接字。

struct BOOST_SYMBOL_VISIBLE write_stream
{
  virtual ~write_stream() = default;
  [[nodiscard]] virtual write_op write_some(const_buffer_sequence buffer) = 0;
};

struct BOOST_SYMBOL_VISIBLE read_stream
{
  virtual ~read_stream() = default;
  [[nodiscard]] virtual read_op read_some(mutable_buffer_sequence buffer) = 0;
};

struct stream : read_stream, write_stream
{
};

cobalt/io/random_access_device.hpp

random_access_device 是一种 IO 对象,允许在随机位置进行读写。例如,文件。

struct BOOST_SYMBOL_VISIBLE random_access_write_device
{
  virtual ~random_access_write_device() = default;
  [[nodiscard]] virtual write_at_op write_some_at(std::uint64_t offset, const_buffer_sequence buffer) = 0;
};

struct BOOST_SYMBOL_VISIBLE random_access_read_device
{
  virtual ~random_access_read_device() = default;
  [[nodiscard]] virtual read_at_op read_some_at(std::uint64_t offset, mutable_buffer_sequence buffer) = 0;
};

struct random_access_device : random_access_read_device, random_access_write_device
{
};

cobalt/io/read.hpp

readread_at 函数读取直到缓冲区满。

struct read_all final : op<system::error_code, std::size_t>
{
  read_op step;
  read_all(read_op op);
};
read_all read(stream & str, mutable_buffer_sequence buffer);


struct read_all_at final : op<system::error_code, std::size_t>
{
  read_at_op step;
  read_all_at(read_at_op op);
};
read_all_at read_at(stream & str, std::uint64_t offset, mutable_buffer_sequence buffer);

cobalt/io/read.hpp

writewrite_at 将整个缓冲区写入流。

struct write_all final : op<system::error_code, std::size_t>
{
  read_op step;
  write_all(read_op op);
};
write_all write(stream & str, const_buffer_sequence buffer);


struct write_all_at final : op<system::error_code, std::size_t>
{
  write_at_op step;
  write_all_at(write op);
};
write_all_at write_at(stream & str, std::uint64_t offset, const_buffer_sequence buffer);

cobalt/io/file.hpp

文件对象提供文件 IO,这可能是异步的,具体取决于 asio 的配置方式。如果 asio 不支持异步文件 IO,cobalt 将回退到同步操作。

struct file
{

  enum flags
  {
    read_only = O_RDONLY,
    write_only = O_WRONLY,
    read_write = O_RDWR,
    append = O_APPEND,
    create = O_CREAT,
    exclusive = O_EXCL,
    truncate = O_TRUNC,
    sync_all_on_write = O_SYNC
  };

  // Implement bitmask operations as shown in C++ Std [lib.bitmask.types].
  friend flags operator&(flags x, flags y);
  friend flags operator|(flags x, flags y);
  friend flags operator^(flags x, flags y);

  friend flags operator~(flags x);
  friend flags& operator&=(flags& x, flags y);
  friend flags& operator|=(flags& x, flags y);
  friend flags& operator^=(flags& x, flags y);

  /// Basis for seeking in a file.
  enum seek_basis
  {
    seek_set = SEEK_SET,
    seek_cur = SEEK_CUR,
    seek_end = SEEK_END
  };


  using native_handle_type = __unspecified__;

  system::result<void> assign(const native_handle_type & native_file);
  system::result<void> cancel();

  executor get_executor();
  bool is_open() const;

  system::result<void> close();
  native_handle_type native_handle();

  system::result<void> open(const char * path,        flags open_flags);
  system::result<void> open(const std::string & path, flags open_flags);

  system::result<native_handle_type> release();
  system::result<void> resize(std::uint64_t n);

  system::result<std::uint64_t> size() const;
  system::result<void> sync_all();
  system::result<void> sync_data();

  explicit file(executor exec);
  file(executor exec, native_handle_type fd);

};

cobalt/io/stream_file.hpp

stream_file 提供对文件的流访问。如果 asio 不支持异步文件 IO,cobalt 将回退到同步操作。

struct stream_file : file, stream
{
  stream_file(const executor & executor = this_thread::get_executor());
  stream_file(const char * path, file::flags open_flags,
              const executor & executor = this_thread::get_executor());
  stream_file(const std::string & path, file::flags open_flags,
              const executor & executor = this_thread::get_executor());
  stream_file(const native_handle_type & native_file,
              const executor & executor = this_thread::get_executor());
  stream_file(stream_file && sf) noexcept;

  write_op write_some(const_buffer_sequence buffer);
  read_op read_some(mutable_buffer_sequence buffer);

  // advance the position in the file
  system::result<std::uint64_t> seek(
      std::int64_t offset,
      seek_basis whence);

};

cobalt/io/random_access_file.hpp

stream_file 提供对文件的随机访问。如果 asio 不支持异步文件 IO,cobalt 将回退到同步操作。

struct random_access_file : file, random_access_device
{
  using native_handle_type = file::native_handle_type;

  random_access_file(const executor & executor = this_thread::get_executor());
  random_access_file(const char * path, file::flags open_flags,
                     const executor & executor = this_thread::get_executor());
  random_access_file(const std::string & path, file::flags open_flags,
                     const executor & executor = this_thread::get_executor());
  random_access_file(const native_handle_type & native_file,
                     const executor & executor = this_thread::get_executor());
  random_access_file(random_access_file && sf) noexcept;

  write_at_op write_some_at(std::uint64_t offset, const_buffer_sequence buffer);
  read_at_op read_some_at(std::uint64_t offset, mutable_buffer_sequence buffer);
};

cobalt/io/serial_port.hpp

serial_port 是 asio::serial_port 的一个简单包装器。

struct serial_port
{
  // The underlying handle
  using native_handle_type = typename asio::basic_serial_port<executor>::native_handle_type;
  native_handle_type native_handle();


  serial_port(const cobalt::executor & executor = this_thread::get_executor());
  serial_port(serial_port && lhs) = default;
  serial_port(std::string_view device,         const cobalt::executor & executor = this_thread::get_executor());
  serial_port(native_handle_type native_handle, const cobalt::executor & executor = this_thread::get_executor());


  system::result<void> close();
  system::result<void> cancel();
  bool is_open() const;

  // Send a break sequence to the serial port.
  system::result<void> send_break();

  [[nodiscard]] system::result<void>     set_baud_rate(unsigned rate);
  [[nodiscard]] system::result<unsigned> get_baud_rate();

  [[nodiscard]] system::result<void>     set_character_size(unsigned rate);
  [[nodiscard]] system::result<unsigned> get_character_size();

  // An enumerator {none, software, hardware}
  using flow_control = asio::serial_port_base::flow_control::type;

  [[nodiscard]] system::result<void>         set_flow_control(flow_control rate);
  [[nodiscard]] system::result<flow_control> get_flow_control();

  // An enumerator {none, odd, even}
  using parity = asio::serial_port_base::parity::type;

  [[nodiscard]] system::result<void>   set_parity(parity rate);
  [[nodiscard]] system::result<parity> get_parity();

  [[nodiscard]] system::result<void> assign(native_handle_type native_handle);

  [[nodiscard]] system::result<void> open(std::string_view device);

  // read & write some data. Yields the number of bytes transferred.
  [[nodiscard]] write_op write_some(const_buffer_sequence buffer);
  [[nodiscard]]  read_op read_some (mutable_buffer_sequence buffer);

};

cobalt/io/endpoint.hpp

端点头文件提供了通用 asio 兼容的端点类型和协议。

protocol_type

struct protocol_type
{
  using family_t   = __unspecified__;
  using type_t     = __unspecified__;
  using protocol_t = __unspecified__;

  constexpr family_t     family() const noexcept;
  constexpr type_t         type() const noexcept;
  constexpr protocol_t protocol() const noexcept;

  // explicitly construct protocol type.
  constexpr explicit
  protocol_type(family_t family     = static_cast<family_t>(0),
                type_t type         = static_cast<type_t>(0),
                protocol_t protocol = static_cast<protocol_t>(0)) noexcept;

  // allows construction from other procols, e.g. asio
  template<typename OtherProtocol>
    requires requires (const OtherProtocol & op)
      {
        {static_cast<family_t>(op.family())};
        {static_cast<type_t>(op.type())};
        {static_cast<protocol_t>(op.protocol())};
      }
  constexpr protocol_type(const OtherProtocol & op) noexcept

  // make the orderable
  friend
  constexpr auto operator<=>(const protocol_type & , const protocol_type &) noexcept = default;

};
示例
protocol_type tcp_v4{AF_INET, SOCK_STREAM, IPPROTO_TCP};

// from asio
protocol_type tcp_v6 = boost::asio::ip::tcp::v6();

static_protocol

静态协议为编译时使用提供协议定义。

template<protocol_type::family_t   Family   = static_cast<protocol_type::family_t>(0),
         protocol_type::type_t     Type     = static_cast<protocol_type::type_t>(0),
         protocol_type::protocol_t Protocol = static_cast<protocol_type::protocol_t>(0)>
struct static_protocol
{
  using family_t   = protocol_type::family_t  ;
  using type_t     = protocol_type::type_t    ;
  using protocol_t = protocol_type::protocol_t;

  constexpr family_t   family()   const noexcept {return Family;};
  constexpr type_t     type()     const noexcept {return Type;};
  constexpr protocol_t protocol() const noexcept {return Protocol;};

  using endpoint = io::endpoint;
};


constexpr static_protocol<...> ip;
constexpr static_protocol<...> ip_v4;
constexpr static_protocol<...> ip_v6;
constexpr static_protocol<...> tcp;
constexpr static_protocol<...> tcp_v4;
constexpr static_protocol<...> tcp_v6;
constexpr static_protocol<...> udp;
constexpr static_protocol<...> udp_v4;
constexpr static_protocol<...> udp_v6;
constexpr static_protocol<...> icmp;
constexpr static_protocol<...> local_stream;
constexpr static_protocol<...> local_datagram;
constexpr static_protocol<...> local_seqpacket;
constexpr static_protocol<...> local_protocol;

ip_address

struct ip_address
{
  bool is_ipv6() const;
  bool is_ipv4() const;

  std::uint16_t port() const;

  std::array<std::uint8_t, 16u> addr() const;
  boost::static_string<45> addr_str() const;
};

struct ip_address_v4
{
    std::uint16_t port() const;
    std::uint32_t addr() const;
    boost::static_string<15> addr_str() const;
};

struct ip_address_v6
{
  std::uint16_t port() const;
  std::array<std::uint8_t, 16u> addr() const;
  boost::static_string<45> addr_str() const;
};

make_endpoint / get_endpoint

make_endpoint_tagmake_endpoint 函数的 tag_invoke 一起使用。用户可以定义其他 tag_invoke 函数来支持自定义端点。

template<protocol_type::family_t Family>
struct make_endpoint_tag {};

template<protocol_type::family_t Family>
struct get_endpoint_tag {};

提供以下功能

std::size_t tag_invoke(make_endpoint_tag<AF_UNIX>,
                       asio::detail::socket_addr_type* base,
                       std::string_view sv);

const local_endpoint* tag_invoke(get_endpoint_tag<AF_UNIX>,
                                 protocol_type actual,
                                 const endpoint::addr_type * addr);

std::size_t tag_invoke(make_endpoint_tag<AF_INET>,
                       asio::detail::socket_addr_type* base,
                       std::uint32_t address,
                       std::uint16_t port);

std::size_t tag_invoke(make_endpoint_tag<AF_INET>,
                       asio::detail::socket_addr_type* base,
                       std::string_view address,
                       std::uint16_t port);

const ip_address_v4* tag_invoke(get_endpoint_tag<AF_INET>,
                                 protocol_type actual,
                                 const endpoint::addr_type * addr);

std::size_t tag_invoke(make_endpoint_tag<AF_INET6>,
                       asio::detail::socket_addr_type* base,
                       std::span<std::uint8_t, 16> address,
                       std::uint16_t port);

std::size_t tag_invoke(make_endpoint_tag<AF_INET6>,
                       asio::detail::socket_addr_type* base,
                       std::string_view address,
                       std::uint16_t port);

const ip_address_v6* tag_invoke(get_endpoint_tag<AF_INET6>,
                                protocol_type actual,
                                const endpoint::addr_type * addr);

std::size_t tag_invoke(make_endpoint_tag<AF_UNSPEC>,
                       asio::detail::socket_addr_type* base,
                       std::string_view address,
                       std::uint16_t port);

const ip_address* tag_invoke(get_endpoint_tag<AF_UNSPEC>,
                                protocol_type actual,
                                const endpoint::addr_type * addr);

endpoint

端点函数持有并

struct endpoint
{
  using storage_type = asio::detail::sockaddr_storage_type;
  using addr_type = asio::detail::socket_addr_type;
  void resize(std::size_t size);

        void * data()       {return &storage_; }
  const void * data() const {return &storage_; }
  std::size_t size() const {return size_;}
  std::size_t capacity() const {return sizeof(storage_);}

  void set_type    (protocol_type::type_t type)         { type_ = type;}
  void set_protocol(protocol_type::protocol_t protocol) { protocol_ = protocol;}

  protocol_type protocol() const;

  endpoint() = default;
  endpoint(const endpoint & ep);

  // Construct a endpoint using make_endpoint_tag
  template<protocol_type::family_t   Family,
           protocol_type::type_t     Type,
           protocol_type::protocol_t Protocol,
           typename ... Args>
      requires requires (make_endpoint_tag<Family> proto,
                         addr_type* addr, Args && ... args)
      {
        {tag_invoke(proto, addr, std::forward<Args>(args)...)} -> std::convertible_to<std::size_t>;
      }
  endpoint(static_protocol<Family, Type, Protocol> proto, Args && ... args);

  // allows constructing an endpoint from an asio type
  template<typename OtherEndpoint>
  requires requires (OtherEndpoint oe)
      {
        {oe.protocol()} -> std::convertible_to<protocol_type>;
        {oe.data()} -> std::convertible_to<void*>;
        {oe.size()} -> std::convertible_to<std::size_t>;
      }
  endpoint(OtherEndpoint && oe);
};


class bad_endpoint_access : public std::exception
{
 public:
  bad_endpoint_access() noexcept = default;
  char const * what() const noexcept;
};

// Uses `get_endpoint_tag<Protocol> to return a `pointer` to the result.
// Throws `bad_endpoint_access` if it's the wrong type
auto get(const endpoint & ep);

// Uses `get_endpoint_tag<Protocol> to return a `pointer` to the result.
template<static_protocol Protocol>
friend auto get_if(const endpoint * ep);
示例
cobalt::io::endpoint ep{cobalt::io::tcp_v4, "127.0.0.1", 8080};

cobalt/io/socket.hpp

socket 类是所有 socket 的基类。

struct socket
{
  [[nodiscard]] system::result<void> open(protocol_type prot = protocol_type {});
  [[nodiscard]] system::result<void> close();
  [[nodiscard]] system::result<void> cancel();
  [[nodiscard]] bool is_open() const;

  // asio acceptor compatibility
  template<typename T>
  struct rebind_executor {using other = socket;};

  using shutdown_type      = asio::socket_base::shutdown_type;
  using wait_type          = asio::socket_base::wait_type;
  using message_flags      = asio::socket_base::message_flags;
  constexpr static int message_peek          = asio::socket_base::message_peek;
  constexpr static int message_out_of_band   = asio::socket_base::message_out_of_band;
  constexpr static int message_do_not_route  = asio::socket_base::message_do_not_route;
  constexpr static int message_end_of_record = asio::socket_base::message_end_of_record;

  using native_handle_type = asio::basic_socket<protocol_type, executor>::native_handle_type;
  native_handle_type native_handle();

  // Drop the connection
  [[nodiscard]] system::result<void> shutdown(shutdown_type = shutdown_type::shutdown_both);

  // endpoint of a connected endpiotn
  [[nodiscard]] system::result<endpoint> local_endpoint() const;
  [[nodiscard]] system::result<endpoint> remote_endpoint() const;


  system::result<void> assign(protocol_type protocol, native_handle_type native_handle);
  system::result<native_handle_type> release();

  /// socket options
  [[nodiscard]] system::result<std::size_t> bytes_readable();

  [[nodiscard]] system::result<void> set_debug(bool debug);
  [[nodiscard]] system::result<bool> get_debug() const;

  [[nodiscard]] system::result<void> set_do_not_route(bool do_not_route);
  [[nodiscard]] system::result<bool> get_do_not_route() const;

  [[nodiscard]] system::result<void> set_enable_connection_aborted(bool enable_connection_aborted);
  [[nodiscard]] system::result<bool> get_enable_connection_aborted() const;

  [[nodiscard]] system::result<void> set_keep_alive(bool keep_alive);
  [[nodiscard]] system::result<bool> get_keep_alive() const;

  [[nodiscard]] system::result<void> set_linger(bool linger, int timeout);
  [[nodiscard]] system::result<std::pair<bool, int>> get_linger() const;

  [[nodiscard]] system::result<void>        set_receive_buffer_size(std::size_t receive_buffer_size);
  [[nodiscard]] system::result<std::size_t> get_receive_buffer_size() const;

  [[nodiscard]] system::result<void>        set_send_buffer_size(std::size_t send_buffer_size);
  [[nodiscard]] system::result<std::size_t> get_send_buffer_size() const;

  [[nodiscard]] system::result<void>        set_receive_low_watermark(std::size_t receive_low_watermark);
  [[nodiscard]] system::result<std::size_t> get_receive_low_watermark() const;

  [[nodiscard]] system::result<void>        set_send_low_watermark(std::size_t send_low_watermark);
  [[nodiscard]] system::result<std::size_t> get_send_low_watermark() const;

  [[nodiscard]] system::result<void> set_reuse_address(bool reuse_address);
  [[nodiscard]] system::result<bool> get_reuse_address() const;

  [[nodiscard]] system::result<void> set_no_delay(bool reuse_address);
  [[nodiscard]] system::result<bool> get_no_delay() const;


  wait_op     wait(wait_type wt = wait_type::wait_read);
  // Connect to a specific endpoint
  connect_op connect(endpoint ep);
  // connect to one of the given endpoints. Returns the one connected to.
  ranged_connect_op connect(endpoint_sequence ep);

 protected:
  // Adopt the under-specified endpoint. E.g. to tcp from an endpoint specified as ip_address
  virtual void adopt_endpoint_(endpoint & ) {}

};

// Connect to sockets using the given protocol
system::result<void> connect_pair(protocol_type protocol, socket & socket1, socket & socket2);

cobalt/io/socket.hpp

用于流套接字(例如 TCP)的套接字类。

struct [[nodiscard]] stream_socket final : socket, stream
{

  stream_socket(const cobalt::executor & executor = this_thread::get_executor());
  stream_socket(stream_socket && lhs);
  stream_socket(native_handle_type h, protocol_type protocol = protocol_type(),
                const cobalt::executor & executor = this_thread::get_executor());
  stream_socket(endpoint ep,
                const cobalt::executor & executor = this_thread::get_executor());

  write_op write_some(const_buffer_sequence buffer);
  read_op read_some(mutable_buffer_sequence buffer);};

// Connect to sockets using the given protocol
inline system::result<std::pair<stream_socket, stream_socket>> make_pair(decltype(local_stream) protocol);

cobalt/io/socket.hpp

此套接字类实现数据报套接字,例如 UDP。

struct [[nodiscard]] datagram_socket final : socket
{
  datagram_socket(const cobalt::executor & executor = this_thread::get_executor());
  datagram_socket(datagram_socket && lhs);
  datagram_socket(native_handle_type h, protocol_type protocol = protocol_type(),
                  const cobalt::executor & executor = this_thread::get_executor());
  datagram_socket(endpoint ep,
                  const cobalt::executor & executor = this_thread::get_executor());

  write_op send(const_buffer_sequence buffer);
  read_op receive(mutable_buffer_sequence buffer);
};


// Create a pair of connected sockets.
inline system::result<std::pair<datagram_socket, datagram_socket>> make_pair(decltype(local_datagram) protocol);

cobalt/io/socket.hpp

用于 seq packet socket(例如 SCTP)的套接字类。

struct seq_packet_socket : socket
{
  seq_packet_socket(const cobalt::executor & executor = this_thread::get_executor());
  seq_packet_socket(seq_packet_socket && lhs);
  seq_packet_socket(native_handle_type h, protocol_type protocol = protocol_type(),
                    const executor & executor = this_thread::get_executor());
  seq_packet_socket(endpoint ep, const executor & executor = this_thread::get_executor());

  receive_op receive(message_flags in_flags, message_flags& out_flags, mutable_buffer_sequence buffer);
  send_op send(message_flags in_flags, const_buffer_sequence buffer);
  receive_op receive(message_flags in_flags, mutable_buffer_sequence buffer);
};

// Connect to sockets using the given protocol
system::result<void> make_pair(decltype(local_seqpacket) protocol);

cobalt/io/resolver.hpp

解析器允许查找给定地址和服务的端点。

struct resolver
{
  resolver(const executor & exec = this_thread::get_executor());
  resolver(resolver && ) = delete;

  void cancel();

  // Produces a endpoint_sequence
  [[nodiscard]] auto resolve(std::string_view host, std::string_view service);
};


// Short hand for performing a single lookup.
auto lookup(std::string_view host, std::string_view service,
         const executor & exec = this_thread::get_executor());

cobalt/io/acceptor.hpp

接受器可用于接受连接。

struct acceptor
{
  using wait_type          = asio::socket_base::wait_type;
  constexpr static std::size_t max_listen_connections = asio::socket_base::max_listen_connections;

  acceptor(const cobalt::executor & executor = this_thread::get_executor());
  acceptor(endpoint ep, const cobalt::executor & executor = this_thread::get_executor());
  system::result<void> bind(endpoint ep);
  system::result<void> listen(int backlog = max_listen_connections); // int backlog = net::max_backlog()
  endpoint local_endpoint();

  // accept any connection and assign it to `sock`
  accept_op accept(socket & sock);

  // Accept a connection of a stream_socket
  template<protocol_type::family_t F = tcp.family(), protocol_type::protocol_t P = tcp.protocol()>
  accept_op accept(static_protocol<F, tcp.type(), P> stream_proto = tcp);

  // Accept a connection of a seq_packet
  template<protocol_type::family_t F, protocol_type::protocol_t P>
  accept_op accept(static_protocol<F, local_seqpacket.type(), P> stream_proto = tcp);

  // For a connection to be ready
  wait_op     wait(wait_type wt = wait_type::wait_read);
};

cobalt/io/ssl.hpp

SSL 流是一种可以升级到 SSL 的流套接字。

namespace ssl
{

enum class verify
{
  none = asio::ssl::verify_none,
  peer = asio::ssl::verify_peer,
  fail_if_no_peer_cert = asio::ssl::verify_fail_if_no_peer_cert,
  client_once = asio::ssl::verify_client_once
};


using context = asio::ssl::context;
using verify_mode = asio::ssl::verify_mode;

struct stream final : socket, cobalt::io::stream, asio::ssl::stream_base
{
  stream(context & ctx, const cobalt::executor & executor = this_thread::get_executor());
  stream(context & ctx, native_handle_type h, protocol_type protocol = protocol_type(),
             const cobalt::executor & executor = this_thread::get_executor());
  stream(context & ctx, endpoint ep,
             const cobalt::executor & executor = this_thread::get_executor());

  write_op write_some(const_buffer_sequence buffer) override;
  read_op read_some(mutable_buffer_sequence buffer) override;

  // Indicates whether or not an ssl upgrade has been performed
  [[nodiscard]] bool secure() const {return upgraded_;}

  template<typename VerifyCallback>
  system::result<void> set_verify_callback(VerifyCallback vc);

  system::result<void> set_verify_depth(int depth);

  system::result<void> set_verify_mode(verify depth);

  [[nodiscard]] auto handshake(handshake_type type);
  [[nodiscard]] auto handshake(handshake_type type, const_buffer_sequence buffer);
  [[nodiscard]] auto shutdown();
};

}
构建脚本为该类创建了一个单独的库 (boost_cobalt_io_ssl),以便 boost_cobalt_io 可以在没有 OpenSSL 的情况下使用。

cobalt/experimental/context.hpp

这是未定义行为,因为它违反了标准中的前置条件。

此头文件提供 experimental 支持,用于将基于 boost.fiber 的有栈协程用作 C20 协程。也就是说,它们可以通过放入 coroutine_handle 来使用 awaitables。同样,实现使用 C20 协程 promise,并像 C++20 协程一样运行。

//
void delay(experimental::context<promise<void>> h, std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  h.await(tim.async_wait(cobalt::use_op)); // instead of co_await.
}

cobalt::main co_main(int argc, char *argv[])
{
  cobalt::promise<void> dl = cobalt::experimental::make_context(&delay, 50);
  co_await dl;
  co_return 0;
}

参考

// The internal coroutine context.
/// Args are the function arguments after the handle.
template<typename Return, typename ... Args>
struct context
{
  // Get a handle to the promise
        promise_type & promise();
  const promise_type & promise() const;

  // Convert it to any context if the underlying promise is the same
  template<typename Return_, typename ... Args_>
  constexpr operator context<Return_, Args_...>() const;

  // Await something. Uses await_transform automatically.
  template<typename Awaitable>
  auto await(Awaitable && aw);
  // Yield a value, if supported by the promise.
  template<typename Yield>
  auto yield(Yield && value);
};


// Create a fiber with a custom stack allocator (see boost.fiber for details) and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc  && salloc, Args && ... args);

// Create a fiber with the default allocator and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func>
auto make_context(Func && func, Args && ... args);

// Create a fiber with a custom stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc  && salloc, Args && ... args);

// Create a fiber with the default stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func>
auto make_context(Func && func, Args && ... args);

深入

自定义执行器

cobalt 默认使用 asio::any_io_executor 的原因之一是它是一种类型擦除的执行器,即您可以提供自己的事件循环而无需重新编译 cobalt

然而,在 Executor TS 的开发过程中,执行器概念变得有点反直觉,说得轻巧一些。

Ruben Perez 写了一篇很棒的 博客文章,我将从中无耻地借鉴。

定义

执行器是一种指向实际事件循环的类型,它(廉价地)可复制,支持属性(见下文),可进行相等比较,并具有 execute 函数。

execute
struct example_executor
{
  template<typename Fn>
  void execute(Fn && fn) const;
};

上述函数根据其属性执行 fn

属性

一个属性可以被查询、首选或必需,例如:

struct example_executor
{
  // get a property by querying it.
  asio::execution::relationship_t &query(asio::execution::relationship_t) const
  {
    return asio::execution::relationship.fork;
  }

  // require an executor with a new property
  never_blocking_executor require(const execution::blocking_t::never_t);

  // prefer an executor with a new property. the executor may or may not support it.
  never_blocking_executor prefer(const execution::blocking_t::never_t);
  // not supported
  example_executor prefer(const execution::blocking_t::always_t);
};
asio::any_io_executor 的属性

为了将执行器包装在 asio::any_io_executor 中,需要两个属性

  • execution::context_t

  • execution::blocking_t::never_t

这意味着我们需要使它们可请求(这对上下文没有意义)或从 query 返回预期值。

execution::context_t 查询应返回 asio::execution_context&,如下所示

struct example_executor
{
  asio::execution_context &query(asio::execution::context_t) const;
};

执行上下文用于管理管理 IO 对象的服务(如 asio 的计时器和套接字)的生命周期。也就是说,通过提供此上下文,asio 的所有 IO 都将与它一起工作。

执行上下文在执行器销毁后必须保持活动状态。

以下内容可能更受青睐

  • execution::blocking_t::possibly_t

  • execution::outstanding_work_t::tracked_t

  • execution::outstanding_work_t::untracked_t

  • execution::relationship_t::fork_t

  • execution::relationship_t::continuation_

这意味着您可能希望在执行器中支持它们以进行优化。

blocking 属性

如前所述,此属性控制传递给 execute() 的函数是立即运行(作为 execute() 的一部分),还是必须排队等待稍后执行。可能的值有:

  • asio::execution::blocking.never:永远不会将函数作为 execute() 的一部分运行。这就是 asio::post() 所做的。

  • asio::execution::blocking.possibly:函数可能会或可能不会作为 execute() 的一部分运行。这是默认值(调用 io_context::get_executor 时获得的值)。

  • asio::execution::blocking.always:函数总是作为 execute() 的一部分运行。io_context::executor 不支持此功能。

relationship 属性

relationship 可以取两个值

  • asio::execution::relationship.continuation:表示传递给 execute() 的函数是调用 execute() 的函数的延续。

  • asio::execution::relationship.fork:与上述相反。这是默认值(调用 io_context::get_executor() 时获得的值)。

将此属性设置为 continuation 可以优化函数的调度方式。它只在函数排队(而不是立即运行)时才有效。对于 io_context,设置后,函数将调度在更快的线程局部队列中运行,而不是上下文全局队列。

outstanding_work_t 属性

outstanding_work 可以取两个值

  • asio::execution::outstanding_work.tracked:表示在执行器存在期间,仍然有工作要做。

  • asio::execution::outstanding_work.untracked:与上述相反。这是默认值(调用io_context::get_executor()时获得的值)。

将此属性设置为tracked意味着只要executor存活,事件循环就不会返回。

最小执行器

有了这个,我们来看看一个最小执行器的接口。

struct minimal_executor
{
  minimal_executor() noexcept;

  asio::execution_context &query(asio::execution::context_t) const;

  static constexpr asio::execution::blocking_t
  query(asio::execution::blocking_t) noexcept
  {
    return asio::execution::blocking.never;
  }

  template<class F>
  void execute(F && f) const;

  bool operator==(minimal_executor const &other) const noexcept;
  bool operator!=(minimal_executor const &other) const noexcept;
};
有关使用 Python asyncio 事件循环的实现,请参阅example/python.cpp

添加工作守卫。

现在,让我们为outstanding_work属性添加一个require函数,它使用多种类型。

struct untracked_executor : minimal_executor
{
  untracked_executor() noexcept;

  constexpr   tracked_executor require(asio::execution::outstanding_work::  tracked_t) const;
  constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const {return *this; }
};

struct untracked_executor : minimal_executor
{
  untracked_executor() noexcept;

  constexpr   tracked_executor require(asio::execution::outstanding_work::  tracked_t) const {return *this;}
  constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const;
};

请注意,不必从require函数返回不同的类型,也可以这样做

struct trackable_executor : minimal_executor
{
  trackable_executor() noexcept;

  constexpr trackable_executor require(asio::execution::outstanding_work::  tracked_t) const;
  constexpr trackable_executor require(asio::execution::outstanding_work::untracked_t) const;
};

如果我们要使用prefer,它将如下所示

struct trackable_executor : minimal_executor
{
  trackable_executor() noexcept;

  constexpr trackable_executor prefer(asio::execution::outstanding_work::  tracked_t) const;
  constexpr trackable_executor prefer(asio::execution::outstanding_work::untracked_t) const;
};

总结

如您所见,属性系统并非微不足道,但功能强大。实现自定义执行器是一个它自己的问题类别,这就是本文档不这样做。相反,有一个关于如何将 Python 事件循环包装到执行器中的示例。

以下是一些阅读建议。

无栈

C++20 协程是无栈的,这意味着它们没有自己的栈。

C++ 中的栈描述了调用栈,即所有堆叠的函数帧。函数帧是函数操作所需的内存,即一块内存,用于存储其变量和诸如返回地址之类的信息。

函数帧的大小在编译时已知,但在包含其定义的编译单元之外则未知。
int bar() {return 0;} // the deepest point of the stack
int foo() {return bar();}

int main()
{
    return foo();
}

上述示例中的调用栈是

main()
  foo()
    bar()
stackless1

协程可以实现为有栈的,这意味着它会分配一个固定大小的内存块,并像线程一样堆叠函数帧。C++20 协程是无栈的,即它们只分配自己的帧,并在恢复时使用调用者的栈。使用我们之前的示例

fictional_eager_coro_type<int> example()
{
    co_yield 0;
    co_yield 1;
}

void nested_resume(fictional_eager_coro_type<int>& f)
{
    f.resume();
}

int main()
{
    auto f = example();
    nested_resume(f);
    f.reenter();
    return 0;
}

这将产生一个类似于这样的调用栈

main()
  f$example()
  nested_resume()
    f$example()
  f$example()
stackless2

如果协程跨线程移动,也适用同样的情况。

惰性 & 急切

如果协程仅在恢复后才开始执行其代码,则它是惰性的,而急切的协程会立即执行,直到它的第一个挂起点(即co_awaitco_yieldco_return表达式)。

lazy_coro co_example()
{
    printf("Entered coro\n");
    co_yield 0;
    printf("Coro done\n");
}

int main()
{
    printf("enter main\n");
    auto lazy = co_example();
    printf("constructed coro\n");
    lazy.resume();
    printf("resumed once\n");
    lazy.resume();
    printf("resumed twice\n");
    return 0;
}

这将产生如下输出

enter main
constructed coro
Entered coro
resumed once
Coro Done
resumed twice
lazy eager1

而一个急切的协程会是这样的

eager_coro co_example()
{
    printf("Entered coro\n");
    co_yield 0;
    printf("Coro done\n");
}

int main()
{
    printf("enter main\n");
    auto lazy = co_example();
    printf("constructed coro\n");
    lazy.resume();
    printf("resumed once\n");
    return 0;
}

这将产生如下输出

enter main
Entered coro
constructed coro
resume once
Coro Done
lazy eager2

基准测试

在第 11 代 Intel® Core™ i7-1185G7 @ 3.00GHz 上运行

发布到执行器

基准测试正在运行以下代码,使用 cobalt 的任务,asio::awaitable和 `asio 的有栈协程(boost.context)。

cobalt::task<void> atest()
{
  for (std::size_t i = 0u; i < n; i++)
    co_await asio::post(cobalt::use_op);
}
表 6. 50M 次结果(毫秒)
gcc 12 clang 16

2472

2098

awaitable

2432

2253

有栈

3655

3725

并行运行空操作协程

此基准测试使用大小为零的asio::experimental::channel来并行读写。它使用 cobalt 的gatherasio::awaitable中的awaitable_operator

cobalt::task<void> atest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
  for (std::size_t i = 0u; i < n; i++)
    co_await cobalt::gather(
              chan.async_send(system::error_code{}, cobalt::use_task),
              chan.async_receive(cobalt::use_task));
}

asio::awaitable<void> awtest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
  using boost::asio::experimental::awaitable_operators::operator&&;
  for (std::size_t i = 0u; i < n; i++)
    co_await (
        chan.async_send(system::error_code{}, asio::use_awaitable)
        &&
        chan.async_receive(asio::use_awaitable));
}
表 7. 3M 次结果(毫秒)
gcc 12 clang 16

1563

1468

awaitable

2800

2805

立即

此基准测试通过使用大小为 1 的通道来利用即时完成,以便每个操作都是即时的。

cobalt::task<void> atest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 1u};
  for (std::size_t i = 0u; i < n; i++)
  {
    co_await chan.async_send(system::error_code{}, cobalt::use_op);
    co_await chan.async_receive(cobalt::use_op);
  }
}
表 8. 10M 次结果(毫秒)
gcc 12 clang 16

1810

1864

awaitable

3109

4110

有栈

3922

4705

通道

在此基准测试中,比较了 asio::experimental::channel 和 cobalt::channel。

这与并行测试类似,但使用cobalt::channel代替。

表 9. 运行测试 3M 次的结果(毫秒)
gcc clang

500

350

awaitable

790

770

有栈

867

907

操作分配

此基准测试比较了异步操作关联分配器的不同可能解决方案

表 10. 运行测试 2M 次的结果(毫秒)
gcc clang

std::allocator

1136

1139

cobalt::monotonic

1149

1270

pmr::monotonic

1164

1173

cobalt::sbo

1021

1060

后者方法由 cobalt 内部使用。

要求

Boost.cobalt 需要 C++20 编译器,并直接依赖以下 Boost 库

  • boost.asio

  • boost.system

  • boost.circular_buffer

  • boost.intrusive

  • boost.smart_ptr

  • boost.container (用于 clang < 16)

编译器

此库自 Clang 14、Gcc 10 和 MSVC 19.30 (Visual Studio 2022) 起受支持。

Gcc 12.1 和 12.2 版本似乎对没有栈变量的协程存在一个错误,如[这里](https://godbolt.org/z/6adGcqP1z)所示,应避免在协程中使用。

Clang 只在 16 版中添加了std::pmr支持,因此较旧的 Clang 版本使用boost::container::pmr作为即时替换。

部分(如果不是全部)MSVC 版本有一个损坏的协程实现,此库需要进行变通。这可能会导致非确定性行为和开销。

协程的继续可以在从final_suspend返回的 awaitable 中完成,如下所示

// in promise
auto final_suspend() noexcept
{
    struct final_awaitable
    {
      std::coroutine_handle<void> continuation{std::noop_coroutine()}; (1)
      bool await_ready() const noexcept;
      std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> h) noexcept
      {
        auto cc = continuation;
        h.destroy(); (2)
        return cc;
      }

      void await_resume() noexcept {}
    };
    return final_awaitable{my_continuation};
};
1 延续
2 在继续之前自毁协程

在 MSVC 上,final_suspend 无法正确挂起协程,因此h.destroy()将导致协程帧上的元素双重销毁。因此,MSVC 需要推迟销毁,使其离线进行。这将导致开销,并使实际内存释放变得非确定性。

致谢

如果没有 CppAlliance 及其创始人 Vinnie Falco,这个库是不可能实现的。Vinnie 信任我,让我从事这个项目,尽管他对这样的库应该如何设计有着非常不同的看法。

还要感谢 Ruben Perez 和 Richard Hodges 倾听我的设计问题并给我建议和用例。此外,如果没有 Chris Kohlhoff 优秀的 boost.asio,这个库也是不可能实现的。