概述
以下是 cobalt 中相关功能的列表
返回单个结果的急切协程 - 视为默认值 |
|
可以产生多个值的急切协程。 |
|
promise 的惰性版本,可以派生到其他执行器。 |
|
类似于 promise 的协程,没有句柄 |
一个函数,以伪随机方式等待一组协程中就绪的一个,以避免饥饿。 |
|
一个函数,等待一组协程并将其全部作为值返回,如果任何 awaitable 抛出异常,则抛出异常。 |
|
一个函数,等待一组协程并将其全部作为 |
|
一个确定性的 |
一个线程局部实用工具,用于在协程之间发送值。 |
|
一个异步 RAII 辅助工具,允许在发生异常时进行异步拆卸 |
C++ 协程简介 |
如果您以前从未使用过协程,请阅读 |
|
功能和概念的简要高级概述 |
如果您熟悉 asio 和协程,并想大致了解此库提供的功能,请阅读 |
|
低级用法视图 |
如果您想快速编码,请阅读 |
|
API 参考 |
编码时查找详细信息 |
|
一些实现细节 |
如果您不够困惑,请阅读 |
动机
许多编程语言,如 node.js 和 python,提供了易于使用的单线程并发框架。虽然比同步代码更复杂,但单线程异步性避免了多线程的许多陷阱和开销。
也就是说,一个协程可以工作,而其他协程等待事件(例如,来自服务器的响应)。这允许编写在单个线程上同时执行多项任务的应用程序。
该库旨在为 C++ 提供此功能:简单的单线程异步性,类似于 node.js 和 python 中的 asyncio,并与现有库(如 boost.beast
、boost.mysql
或 boost.redis
)配合使用。它基于 boost.asio
。
它从其他语言中提取了一系列概念,并基于 C++20 协程提供了它们。
与 asio::awaitable
和 asio::experimental::coro
不同,cobalt
协程是开放的。也就是说,asio::awaitable
只能 await 和被其他 asio::awaitable
await,并且不提供协程特定的同步机制。
另一方面,cobalt
提供了一个协程特定的 channel
和不同的等待类型(race
、gather
等),这些类型经过优化,可以与协程和 awaitable 配合使用。
协程入门
异步编程
异步编程通常指一种编程风格,它允许任务在后台运行,同时执行其他工作。
想象一下,如果您有一个 get-request 函数,它执行完整的 http 请求,包括连接和 ssl 握手等。
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
auto res = http_get("https://boost.ac.cn");
printf("%s", res.c_str());
return 0;
}
上述代码将是传统的同步编程。如果我们要并行执行两个请求,我们将需要创建另一个线程来运行另一个带有同步代码的线程。
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
std::string other_res;
std::thread thr{[&]{ other_res = http_get("https://cppalliance.org"); }};
auto res = http_get("https://boost.ac.cn");
thr.join();
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
这可行,但我们的程序大部分时间将花费在等待输入上。操作系统提供允许异步执行 IO 的 API,而 boost.asio 等库提供了管理异步操作的便携式方法。Asio 本身不规定处理完成的方式。该库 (boost.cobalt) 提供了一种通过协程/awaitable 管理所有这些的方法。
cobalt::promise<std::string> http_cobalt_get(std:string_view url);
cobalt::main co_main(int argc, char * argv[])
{
auto [res, other_res] =
cobalt::join(
http_cobalt_get("https://boost.ac.cn"),
http_cobalt_get("https://cppalliance.org")
);
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
在上述代码中,执行请求的异步函数利用了操作系统 API,因此实际的 IO 不会阻塞。这意味着,当我们等待两个函数完成时,操作是交错且非阻塞的。同时,cobalt 提供了协程原语,使我们摆脱了回调地狱。
协程
协程是可恢复的函数。可恢复意味着函数可以暂停,即将控制多次返回给调用者。
常规函数使用 return
函数将控制权返回给调用者,并在那里也返回该值。
另一方面,协程可能会将控制权交给调用者并多次恢复。
协程有三个控制关键字,类似于 co_return(其中只有 co_return
必须受支持)。
-
co_return
-
co_yield
-
co_await
co_return
这类似于 return
,但将函数标记为协程。
co_await
co_await
表达式暂停等待一个 Awaitable,即停止执行直到 awaitable
恢复它。
例如:
cobalt::promise<void> delay(std::chrono::milliseconds);
cobalt::task<void> example()
{
co_await delay(std::chrono::milliseconds(50));
}
co_await
表达式可以产生一个值,具体取决于它正在等待什么。
cobalt::promise<std::string> read_some();
cobalt::task<void> example()
{
std::string res = co_await read_some();
}
在 cobalt 中,大多数协程原语也是 Awaitable。 |
co_yield
co_yield
表达式类似于 co_await
,但它将控制权交给调用者并带有一个值。
例如:
cobalt::generator<int> iota(int max)
{
int i = 0;
while (i < max)
co_yield i++;
co_return i;
}
co_yield
表达式也可以产生一个值,这允许产生值的协程的用户向其推送值。
cobalt::generator<int> iota()
{
int i = 0;
bool more = false;
do
{
more = co_yield i++;
}
while(more);
co_return -1;
}
可等待对象
Awaitable 是可以在 co_await
表达式中使用的类型。
struct awaitable_prototype
{
bool await_ready();
template<typename T>
see_below await_suspend(std::coroutine_handle<T>);
return_type await_resume();
};
如果存在 operator co_await 调用,则类型将隐式转换为 awaitable。本文档将使用 awaitable 来包含这些类型,并使用 "actual_awaitable" 来指代符合上述原型的类型。 |

在 co_await
表达式中,等待协程将首先调用 await_ready
以检查协程是否需要暂停。准备好后,它直接进入 await_resume
以获取值,因为不需要暂停。否则,它将暂停自身并使用 std::coroutine_handle
调用 await_suspend
到其自己的 promise。
std::coroutine_handle<void> 可用于类型擦除。 |
return_type 是 co_await expression
的结果类型,例如 int
int i = co_await awaitable_with_int_result();
await_suspend
的返回类型可以是三种情况
-
void
-
bool
-
std::coroutine_handle<U>
如果它是 void,则等待协程保持暂停。如果它是 bool
,则将检查该值,如果为 false,则等待协程将立即恢复。
如果返回 std::coroutine_handle
,则此协程将恢复。后者允许 await_suspend
返回传入的句柄,这实际上与返回 false
相同。
如果等待协程立即重新恢复,即在调用 await_resume 后,它在该库中被称为“立即完成”。这不要与非暂停 awaitable 混淆,即从 await_ready
返回 true
的 awaitable。
事件循环
由于 cobalt
中的协程可以 co_await
事件,因此它们需要在事件循环上运行。也就是说,另一段代码负责跟踪未完成的事件并恢复正在等待它们的协程。这种模式非常常见,并以类似的方式被 node.js 或 python 的 asyncio
使用。
cobalt
使用 asio::io_context
作为其默认事件循环。也就是说,类 thread、main 和 run 函数在内部使用它。
您可以使用任何可以生成 asio::any_io_executor
的事件循环与该库一起使用。实现这一点的最简单方法是使用 spawn。
事件循环通过执行器(遵循 asio 术语)访问,并且可以使用 set_executor 手动设置。
导览
进入 cobalt 环境
为了使用 awaitable,我们需要能够 co_await
它们,即在协程内。
我们有四种方法可以实现这一点
- cobalt/main.hpp
-
用协程替换
int main
cobalt::main co_main(int argc, char* argv[])
{
// co_await things here
co_return 0;
}
- cobalt/thread.hpp
-
为异步环境创建线程
cobalt::thread my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
auto t = my_thread();
t.join();
return 0;
}
- cobalt/task.hpp
-
创建一个任务并运行或派生它
cobalt::task<void> my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
cobalt::run(my_task()); // sync
asio::io_context ctx;
cobalt::spawn(ctx, my_task(), asio::detached);
ctx.run();
return 0;
}
Promise
Promise 是推荐的默认协程类型。它们是急切的,因此易于用于临时并发。
cobalt::promise<int> my_promise()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// start the promise here
auto p = my_promise();
// do something else here
co_await do_the_other_thing();
// wait for the promise to complete
auto res = co_await p;
co_return res;
}
Task
Task 是惰性的,这意味着它们在被 await 或派生之前不会做任何事情。
cobalt::task<int> my_task()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the task here
auto t = my_task();
// do something else here first
co_await do_the_other_thing();
// start and wait for the task to complete
auto res = co_await t;
co_return res;
}
Generator
一个 generator 是 cobalt 中唯一可以 co_yield
值的类型。
Generator 默认是急切的。与 std::generator 不同,cobalt::generator
可以 co_await
,因此是异步的。
cobalt::generator<int> my_generator()
{
for (int i = 0; i < 10; i++)
co_yield i;
co_return 10;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator();
while (g)
printf("Generator %d\n", co_await g);
co_return 0;
}
值可以推送到生成器中,这些值将从 co_yield
返回。
一个急切的生成器将在被等待之前产生第一个结果。也就是说,当我们调用 co_await g(4)
时,初始 co_yield 的结果已就绪。然后生成器接收作为 co_yield 返回传入的值 4
,并处理它。
cobalt::generator<std::string, int> my_eager_push_generator(int value)
{
while (value != 0)
value = co_yield std::to_string(value);
co_return "";
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(5);
assert("5" == co_await g(4)); // result of 5
assert("4" == co_await g(3)); // result of 4
assert("3" == co_await g(2)); // result of 3
assert("2" == co_await g(1)); // result of 2
assert("1" == co_await g(0)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
协程也可以使用 this_coro::initial
变为惰性。
一个惰性生成器确实等待第一次 co_await
开始工作,即当 initial
被等待时它会暂停。数据处理不是提前一步,而是在 co_await
时发生。
cobalt::generator<std::string, int> my_eager_push_generator()
{
auto value = co_await this_coro::initial;
while (value != 0)
value = co_yield std::to_string(value);
co_return "";
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(); // lazy, so the generator waits for the first pushed value
assert("5" == co_await g(5)); // result of 5
assert("4" == co_await g(4)); // result of 4
assert("3" == co_await g(3)); // result of 3
assert("2" == co_await g(2)); // result of 2
assert("1" == co_await g(1)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
join
cobalt::promise<int> some_work();
cobalt::promise<double> more_work();
cobalt::main co_main(int argc, char * argv[])
{
std::tuple<int, double> res = cobalt::join(some_work(), more_work());
co_return 0;
}
race
cobalt::generator<int> some_data_source();
cobalt::generator<double> another_data_source();
cobalt::main co_main(int argc, char * argv[])
{
auto g1 = some_data_source();
auto g2 = another_data_source();
int res1 = co_await g1;
double res2 = co_await g2;
printf("Result: %f", res1 * res2);
while (g1 && g2)
{
switch(variant2::variant<int, double> nx = co_await cobalt::race(g1, g2))
{
case 0:
res1 = variant2::get<0>(nx);
break;
case 1:
res2 = variant2::get<1>(nx);
break;
}
printf("New result: %f", res1 * res2);
}
co_return 0;
}
在这种情况下,race 不会造成任何数据丢失。 |
教程
delay
让我们从最简单的示例开始:一个简单的延迟。
cobalt::main co_main(int argc, char * argv[]) (1)
{
asio::steady_timer tim{co_await asio::this_coro::executor, (2)
std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
co_await tim.async_wait(cobalt::use_op); (4)
co_return 0; (5)
}
1 | co_main 函数在使用时定义了一个隐式的 main ,是设置运行异步代码环境的最简单方法。 |
2 | 从当前协程 promise 中获取执行器。 |
3 | 使用参数设置超时 |
4 | 使用 cobalt::use_op 执行等待。 |
5 | 返回一个从隐式 main 返回的值。 |
在此示例中,我们使用 cobalt/main.hpp 头文件,如果如上定义 co_main
,它将为我们提供一个 main 协程。这有几个优点
-
环境设置正确(
executor
和memory
) -
asio 被告知上下文是单线程的
-
一个带有
SIGINT
和SIGTERM
的asio::signal_set
会自动连接到取消(即Ctrl+C
会导致取消)
此协程的 promise 中有一个执行器(promise 是协程状态的 C++ 名称。不要与 cobalt/promise.hpp 混淆),我们可以通过 this_coro 命名空间中的虚拟 awaitable 获得它。
echo 服务器
namespace cobalt = boost::cobalt;
namespace this_coro = boost::cobalt::this_coro;
cobalt::promise<void> echo(cobalt::io::stream_socket socket)
{
try (1)
{
char data[4096];
while (socket.is_open()) (2)
{
std::size_t n = co_await socket.read_some(boost::asio::buffer(data)); (3)
co_await cobalt::io::write(socket, boost::asio::buffer(data, n)); (4)
}
}
catch (std::exception& e)
{
std::printf("echo: exception: %s\n", e.what());
}
}
1 | 使用 use_op 完成令牌时,I/O 错误会转换为 C++ 异常。此外,如果协程被取消(例如,因为用户按下了 Ctrl-C),也会引发异常。在这些情况下,我们打印错误并退出循环。 |
2 | 我们运行循环直到被取消(异常)或用户关闭连接。 |
3 | 尽可能多地阅读。 |
4 | 写入所有读取的字节。 |
请注意,promise 是急切的。调用 echo
将立即执行代码直到 async_read_some
,然后将控制权返回给调用者。
接下来,我们还需要一个接受函数。在这里,我们使用 generator 来管理接受器状态。这是一个可以多次 co_awaited 的协程,直到达到 co_return
表达式。
cobalt::generator<cobalt::io::stream_socket> listen()
{
cobalt::io::acceptor acceptor(cobalt::io::endpoint{cobalt::io::tcp_v4, "0.0.0.0", 55555});
for (;;) (1)
{
cobalt::io::stream_socket sock = co_await acceptor.accept(); (2)
co_yield std::move(sock); (3)
}
co_return cobalt::io::stream_socket{}; (4)
}
1 | 取消也会导致 co_await 抛出异常 |
2 | 异步接受连接 |
3 | 将其 yield 给等待协程 |
4 | co_return 一个值以符合 C++ 标准。 |
有了这两个函数,我们现在可以编写服务器了
cobalt::promise<void> run_server(cobalt::wait_group & workers)
{
auto l = listen(); (1)
while (true)
{
if (workers.size() == 10u)
co_await workers.wait_one(); (2)
else
workers.push_back(echo(co_await l)); (3)
}
}
1 | 构造监听器生成器协程。当对象被销毁时,协程将被取消,执行所有必要的清理工作。 |
2 | 当工作器数量超过 10 个时,我们等待一个工作器完成 |
3 | 接受新连接并启动它。 |
wait_group 用于管理正在运行的 echo 函数。此类别将取消并等待正在运行的 echo
协程。
我们不需要对 listener
进行同样的操作,因为它在 l
被销毁时会自行停止。生成器的析构函数会取消它。
由于 promise
是急切的,只需调用它即可启动。然后我们将这些 promises 放入 wait_group,这将允许我们在作用域退出时销毁所有工作器。
cobalt::main co_main(int argc, char ** argv)
{
co_await cobalt::with(cobalt::wait_group(), &run_server); (1)
co_return 0u;
}
1 | 使用异步作用域运行 run_server 。 |
上面所示的 with 函数将运行一个带资源(例如 wait_group)的函数。在作用域退出时,with
将调用并 co_await
一个异步拆卸函数。这将导致所有连接在 co_main
存在之前正确关闭。
价格行情
为了演示 channels
和其他工具,我们需要一定的复杂性。为此,我们的项目是一个价格行情应用程序,它连接到 https://blockchain.info。用户可以连接到 localhost 查询给定的货币对,如下所示
wscat -c localhost:8080/btc/usd
首先,我们进行与 echo 服务器相同的声明。
using executor_type = cobalt::use_op_t::executor_with_default<cobalt::executor>;
using socket_type = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;
下一步是编写一个函数来连接 ssl 流,以连接上游
cobalt::promise<asio::ssl::stream<socket_type>> connect(
std::string host, boost::asio::ssl::context & ctx)
{
asio::ip::tcp::resolver res{cobalt::this_thread::get_executor()};
auto ep = co_await res.async_resolve(host, "https", cobalt::use_op); (1)
asio::ssl::stream<socket_type> sock{cobalt::this_thread::get_executor(), ctx};
co_await sock.next_layer().async_connect(*ep.begin()); (2)
co_await sock.async_handshake(asio::ssl::stream_base::client); (3)
co_return sock; (4)
}
1 | 查找主机 |
2 | 连接到端点 |
3 | 执行 SSL 握手 |
4 | 将套接字返回给调用者 |
接下来,我们需要一个函数来对现有 ssl 流进行 websocket 升级。
cobalt::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
ws.set_option(beast::websocket::stream_base::decorator(
[](beast::websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " cobalt-ticker");
req.set(http::field::origin,
"https://exchange.blockchain.com"); (1)
}));
co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 | blockchain.info 要求设置此头文件。 |
2 | 执行 websocket 握手。 |
一旦 websocket 连接,我们希望连续接收 json 消息,为此生成器是一个不错的选择。
cobalt::generator<json::object> json_reader(websocket_type & ws)
try
{
beast::flat_buffer buf;
while (ws.is_open()) (1)
{
auto sz = co_await ws.async_read(buf); (2)
json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
auto obj = json::parse(data);
co_yield obj.as_object(); (3)
buf.consume(sz);
}
co_return {};
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | 只要套接字保持打开状态,就一直运行 |
2 | 从 websocket 读取帧 |
3 | 解析并 co_yield 它作为对象。 |
然后需要将其连接到订阅者,为此我们将利用通道传递原始 json。为了便于生命周期管理,订阅者将持有 shared_ptr
,生产者将持有 weak_ptr
。
using subscription = std::pair<std::string, std::weak_ptr<cobalt::channel<json::object>>>;
using subscription_channel = std::weak_ptr<cobalt::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;
运行区块链连接器的主要函数操作两个输入:来自 websocket 的数据和用于处理新订阅的通道。
cobalt::promise<void> run_blockchain_info(cobalt::channel<subscription> & subc)
try
{
asio::ssl::context ctx{asio::ssl::context_base::tls_client};
websocket_type ws{co_await connect("blockchain.info", ctx)};
co_await connect_to_blockchain_info(ws); (1)
subscription_map subs;
std::list<std::string> unconfirmed;
auto rd = json_reader(ws); (2)
while (ws.is_open()) (3)
{
switch (auto msg = co_await cobalt::race(rd, subc.read()); msg.index()) (4)
{
case 0: (5)
if (auto ms = get<0>(msg);
ms.at("event") == "rejected") // invalid sub, cancel however subbed
co_await handle_rejections(unconfirmed, subs, ms);
else
co_await handle_update(unconfirmed, subs, ms, ws);
break;
case 1: // (6)
co_await handle_new_subscription(
unconfirmed, subs,
std::move(get<1>(msg)), ws);
break;
}
}
for (auto & [k ,c] : subs)
{
if (auto ptr = c.lock())
ptr->close();
}
}
catch(std::exception & e)
{
std::cerr << "Exception: " << e.what() << std::endl;
throw;
}
1 | 初始化连接 |
2 | 实例化 json_reader |
3 | 只要 websocket 打开就运行 |
4 | 选择,即等待新的 json 消息或订阅 |
5 | 当是 json 时,处理更新或拒绝 |
6 | 处理新订阅消息 |
handle_*
函数的内容对于 cobalt
功能而言不那么重要,因此在本教程中省略了它。
handle_new_subscription
函数向 blockchain.info
发送消息,后者会发回确认或拒绝。handle_rejection
和 handle_update
将获取 json 值并将其转发到订阅通道。
在消费者端,我们的服务器将只将数据转发给客户端。如果客户端输入数据,我们将立即关闭 websocket。我们使用 as_tuple
来忽略潜在的错误。
cobalt::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
system::error_code ec;
co_await st.async_read(buf, asio::as_tuple(cobalt::use_op));
co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(cobalt::use_op));
st.next_layer().close(ec);
}
接下来,我们运行用户发送的会话
cobalt::promise<void> run_session(beast::websocket::stream<socket_type> st,
cobalt::channel<subscription> & subc)
try
{
http::request<http::empty_body> req;
beast::flat_buffer buf;
co_await http::async_read(st.next_layer(), buf, req); (1)
// check the target
auto r = urls::parse_uri_reference(req.target());
if (r.has_error() || (r->segments().size() != 2u)) (2)
{
http::response<http::string_body> res{http::status::bad_request, 11};
res.body() = r.has_error() ? r.error().message() :
"url needs two segments, e.g. /btc/usd";
co_await http::async_write(st.next_layer(), res);
st.next_layer().close();
co_return ;
}
co_await st.async_accept(req); (3)
auto sym = std::string(r->segments().front()) + "-" +
std::string(r->segments().back());
boost::algorithm::to_upper(sym);
// close when data gets sent
auto p = read_and_close(st, std::move(buf)); (4)
auto ptr = std::make_shared<cobalt::channel<json::object>>(1u); (5)
co_await subc.write(subscription{sym, ptr}); (6)
while (ptr->is_open() && st.is_open()) (7)
{
auto bb = json::serialize(co_await ptr->read());
co_await st.async_write(asio::buffer(bb));
}
co_await st.async_close(beast::websocket::close_code::going_away,
asio::as_tuple(cobalt::use_op)); (8)
st.next_layer().close();
co_await p; (9)
}
catch(std::exception & e)
{
std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 | 读取 http 请求,因为我们想要路径 |
2 | 检查路径,例如 /btc/usd 。 |
3 | 接受 WebSocket |
4 | 开始读取,如果消费者发送了内容,则关闭 |
5 | 创建通道以接收更新 |
6 | 向 run_blockchain_info 发送订阅请求 |
7 | 当通道和 websocket 打开时,我们正在转发数据。 |
8 | 关闭套接字并忽略错误 |
9 | 由于 websocket 现在肯定已关闭,等待 read_and_close 关闭。 |
在编写完 run_session
和 run_blockchain_info
之后,我们现在可以转到 main
cobalt::main co_main(int argc, char * argv[])
{
acceptor_type acc{co_await cobalt::this_coro::executor,
asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
std::cout << "Listening on localhost:8080" << std::endl;
constexpr int limit = 10; // allow 10 ongoing sessions
cobalt::channel<subscription> sub_manager; (1)
co_await join( (2)
run_blockchain_info(sub_manager),
cobalt::with( (3)
cobalt::wait_group(
asio::cancellation_type::all,
asio::cancellation_type::all),
[&](cobalt::wait_group & sessions) -> cobalt::promise<void>
{
while (!co_await cobalt::this_coro::cancelled) (4)
{
if (sessions.size() >= limit) (5)
co_await sessions.wait_one();
auto conn = co_await acc.async_accept(); (6)
sessions.push_back( (7)
run_session(
beast::websocket::stream<socket_type>{std::move(conn)},
sub_manager));
}
})
);
co_return 0;
}
1 | 创建通道以管理订阅 |
2 | 使用 join 并行运行两个任务。 |
3 | 使用 cobalt 作用域提供 wait_group 。 |
4 | 运行直到被取消。 |
5 | 达到 limit 后,我们等待一个任务完成。 |
6 | 等待新连接。 |
7 | 将会话插入 wait_group 。 |
Main 使用 join
是因为一个任务失败应该取消另一个任务。
延迟操作
到目前为止,我们已经使用 use_op
来使用基于 asio 完成令牌机制的隐式操作。
然而,我们可以实现自己的操作,这些操作也可以利用 await_ready
优化。与立即完成不同,当 await_ready
返回 true 时,协程永远不会暂停。
为了利用此协程功能,cobalt
提供了一种创建可跳过操作的简单方法
struct wait_op final : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
void ready(cobalt::handler<system::error_code> h ) override (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) override (3)
{
tim.async_wait(std::move(complete));
}
};
cobalt::main co_main(int argc, char * argv[])
{
asio::steady_timer tim{co_await asio::this_coro::executor,
std::chrono::milliseconds(std::stoi(argv[1]))};
co_await wait_op(tim); (4)
co_return 0; //
}
1 | 声明操作。我们继承 op 以使其可等待。 |
2 | 预暂停检查在此处实现 |
3 | 如果需要,执行等待 |
4 | 像使用任何其他 awaitable 一样使用 op。 |
通过这种方式,我们可以最小化协程暂停的数量。
虽然上述方法与 asio 一起使用,但您也可以将这些处理程序与任何其他基于回调的代码一起使用。
带推送值的生成器
带推送值的协程并不常见,但可以显著简化某些问题。
由于我们已经在前面的示例中有了 json_reader,这里是如何编写一个接收推送值的 json_writer。
使用生成器的优点是内部状态管理。
cobalt::generator<system::error_code, json::object>
json_writer(websocket_type & ws)
try
{
char buffer[4096];
json::serializer ser;
while (ws.is_open()) (1)
{
auto val = co_yield system::error_code{}; (2)
while (!ser.done())
{
auto sv = ser.read(buffer);
co_await ws.cobalt_write({sv.data(), sv.size()}); (3)
}
}
co_return {};
}
catch (system::system_error& e)
{
co_return e.code();
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | 只要套接字保持打开状态,就一直运行 |
2 | co_yield 当前错误并检索新值。 |
3 | 向 websocket 写入一个帧 |
现在我们可以这样使用生成器
auto g = json_writer(my_ws);
extern std::vector<json::value> to_write;
for (auto && tw : std::move(to_write))
{
if (auto ec = co_await g(std::move(tw)))
return ec; // yield error
}
高级示例
更多示例仅以代码形式在仓库中提供。所有示例均列在下方。
一个执行单个 http get 请求的 http 客户端。 |
|
使用 |
|
使用 nanobind 将 cobalt 与 python 集成。它使用 python 的 asyncio 作为执行器,并允许 C++ co_await python 函数,反之亦然。 |
|
将 |
|
创建基于 |
|
使用带有 |
|
使用 |
|
延迟 部分使用的示例 |
|
延迟操作 部分使用的示例 |
|
echo 服务器 部分使用的示例 |
|
价格行情 部分使用的示例 |
|
通道 引用使用的示例 |
设计
概念
该库有两个基本概念
-
协程
awaitable 是可以在协程内部与 co_await
一起使用的表达式,例如:
co_await delay(50ms);
但是,协程 promise 可以定义一个 await_transform
,即实际上什么可以与 co_await
表达式一起使用取决于协程。
因此,我们应该重新定义 awaitable 是什么:一个 awaitable 是一种类型,可以在协程内部 co_await
,其 promise 不定义 await_transform
。
伪关键字是一种可以在协程中使用的类型,由于其 promise 的 await_transform
,它为其添加了特殊功能。
this_coro 命名空间中的所有动词都是此类伪关键字。
auto exec = co_await this_coro::executor;
该库公开了一组 enable_* 协程基类,以便于创建自定义协程。这包括 enable_awaitables,它提供了一个 await_transform ,只转发 awaitable。 |
本文档中协程指的是异步协程,即不考虑 std::generator 等同步协程。
执行器
由于一切都是异步的,库需要使用事件循环。因为一切都是单线程的,可以假设每个线程只有一个执行器,这足以满足 97% 的用例。因此,有一个 thread_local
执行器,它被协程对象用作默认值(尽管以副本形式存储在协程 promise 中)。
同样,库使用一种 executor
类型,默认为 asio::any_io_executor
。
如果您编写自己的协程,它应该持有执行器的副本,并有一个 get_executor 函数,以 const 引用返回它。 |
使用 Strands
虽然可以使用 strands,但它们与 thread_local
执行器不兼容。这是因为它们可能会切换线程,因此它们不能是 thread_local
。
在 通道 的情况下,这是一个构造函数参数,但对于其他协程类型,需要使用 asio::executor_arg
。这是通过在参数列表(某个位置)中直接后跟要在协程参数列表中使用的执行器来完成的,例如:
cobalt::promise<void> example_with_executor(int some_arg, asio::executor_arg_t, cobalt::executor);
通过这种方式,协程 promise 可以从第三个参数中获取执行器,而不是默认使用 thread_local 执行器。
当然,参数可以设置为默认值,以减少不便,如果它们有时与 thread_local 执行器一起使用。
cobalt::promise<void> example_with_executor(int some_arg,
asio::executor_arg_t = asio::executor_arg,
cobalt::executor = cobalt::this_thread::get_executor());
如果在 strand 上省略此操作,则会抛出 asio::bad_allocator
类型的异常,或者——更糟糕的是——使用了错误的执行器。
多态内存资源
类似地,该库使用 thread_local pmr::memory_resource
来分配协程帧并用作异步操作中的分配器。
原因是用户可能希望自定义分配,例如避免锁、限制内存使用或监控使用情况。pmr
允许我们在不引入不必要的模板参数(即没有 promise
复杂性)的情况下实现这一点。然而,使用 pmr
会引入一些最小的开销,因此用户可以选择通过定义 BOOST_COBALT_NO_PMR
来禁用它。
op 使用内部资源,该资源针对 asio 的分配器用法进行了优化,而 gather、race 和 join 使用单调资源来最小化分配。两者在定义 BOOST_COBALT_NO_PMR
的情况下仍然有效,在这种情况下,它们将使用 new/delete
作为上游分配。
如果你编写自己的协程,它应该有一个返回 pmr::polymorphic_allocator 的 get_allocator 函数。 |
取消
cobalt 使用基于 asio::cancellation_signal
的隐式取消。这主要用于隐式(例如与 race 一起使用),因此示例中很少有显式使用。
如果您编写自定义协程,它必须从 get_cancellation_slot 函数返回一个 cancellation_slot ,以便能够取消其他操作。 |
如果您编写自定义 awaitable,它可以在 await_suspend 中使用该函数来接收取消信号。 |
Promise
主要的协程类型是 promise
,它是急切的。默认使用它的原因是可以优化不暂停的 promise,例如:
cobalt::promise<void> noop()
{
co_return;
}
理论上,等待上述操作是一个空操作,但实际上,截至 2023 年,编译器尚未达到这一水平。
Race
最重要的同步机制是 race
函数。
它以伪随机顺序等待多个 awaitable,并在第一个完成时返回结果,然后忽略其余的。
也就是说,它以伪随机顺序启动 co_await
,并在找到一个 awaitable 准备就绪或立即完成时停止。
cobalt::generator<int> gen1();
cobalt::generator<double> gen2();
cobalt::promise<void> p()
{
auto g1 = gen1();
auto g2 = gen2();
while (!co_await cobalt::this_coro::cancelled)
{
switch(auto v = co_await race(g1, g2); v.index())
{
case 0:
printf("Got int %d\n", get<0>(v));
break;
case 1:
printf("Got double %f\n", get<1>(v));
break;
}
}
}
race
是触发取消的首选方式,例如:
cobalt::promise<void> timeout();
cobalt::promise<void> work();
race(timeout(), work());
interrupt_await
如果它天真地取消,然而会丢失数据。因此,引入了 interrupt_await
的概念,它告诉 awaitable(如果支持)立即恢复 awaiter 并返回或抛出被忽略的值。
struct awaitable
{
bool await_ready() const;
template<typename Promise>
std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
T await_resume();
void interrupt_await() &;
};
如果 interrupt_await
未导致立即恢复 (h
),则 race
将发送取消信号。
race
以正确的引用限定符应用这些
auto g = gen1();
race(g, gen2());
如果可用,上述代码将为 g1
调用 interrupt_await() &
函数,为 g2
调用 interrupt_await() &&
函数。
一般来说,cobalt 中的协程支持左值中断,即 interrupt_await() & 。channel 操作是非限定的,即在两种情况下都有效。 |
关联器
cobalt
使用 asio 的 associator
概念,但对其进行了简化。也就是说,它有三个 associator,它们是等待 promise 的成员函数。
-
const executor_type & get_executor()
(始终是executor
,必须按 const 引用返回) -
allocator_type get_allocator()
(始终是pmr::polymorphic_allocator
) -
cancellation_slot_type get_cancellation_slot()
(必须与asio::cancellation_slot
具有相同的接口)
cobalt
使用概念在其 await_suspend
函数中检查这些是否存在。
通过这种方式,自定义协程可以支持取消、执行器和分配器。
在自定义 awaitable 中,您可以这样获取它们
struct my_awaitable
{
bool await_ready();
template<typename T>
void await_suspend(std::corutine_handle<P> h)
{
if constexpr (requires (Promise p) {p.get_executor();})
handle_executor(h.promise().get_executor();
if constexpr (requires (Promise p) {p.get_cancellation_slot();})
if ((cl = h.promise().get_cancellation_slot()).is_connected())
cl.emplace<my_cancellation>();
}
void await_resume();
};
取消在 co_await
表达式中连接(如果协程和 awaitable 支持),包括 race 等同步机制。
线程
主要的技术原因是,切换协程最有效的方式是通过 await_suspend
返回新协程的句柄,如下所示
struct my_awaitable
{
bool await_ready();
std::coroutine_handle<T> await_suspend(std::coroutine_handle<U>);
void await_resume();
};
在这种情况下,等待协程将在调用 await_suspend 之前暂停,并恢复返回的协程。当然,如果需要通过执行器,这就不起作用。
这不仅适用于等待的协程,也适用于通道。此库中的通道使用 awaitable 的侵入式列表,并且可能会从写入操作的 await_suspend
返回读取(因此暂停)协程的句柄。
I/O
cobalt::io
命名空间为 boost.asio 提供了一个包装库。它被简化和编译以加快开发和编译时间。
参考
cobalt/main.hpp
开始使用 cobalt 应用程序最简单的方法是使用具有以下签名的 co_main
函数
cobalt::main co_main(int argc, char *argv[]);
声明 co_main
将添加一个 main
函数,该函数执行在事件循环上运行协程所需的所有步骤。这使我们能够编写非常简单的异步程序。
cobalt::main co_main(int argc, char *argv[])
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
1 | 获取 main 运行的执行器 |
2 | 与 asio 对象一起使用 |
3 | co_await 一个 cobalt 操作 |
主 promise 将创建一个 asio::signal_set
并将其用于取消。SIGINT
变为完全取消,而 SIGTERM
变为终端取消。
取消将不会转发到 detached 协程。用户需要注意在取消时结束它们,否则程序将无法正常终止。 |
执行器
它还将创建一个 asio::io_context
来运行,您可以通过 this_coro::executor
获取它。它将被分配给 cobalt::this_thread::get_executor()
。
内存资源
它还创建了一个内存资源,该资源将用作内部内存分配的默认值。它将被分配给 thread_local
到 cobalt::this_thread::get_default_resource()
。
Promise
每个协程都有一个内部状态,称为 promise
(不要与 cobalt::promise
混淆)。根据协程属性的不同,可以 co_await
不同的事物,就像我们在上面的示例中使用的一样。
它们通过继承实现,并在不同的 promise 类型之间共享
主要的 promise 具有以下属性。
规范
-
声明
co_main
将隐式声明一个main
函数 -
main
仅在定义了co_main
时才存在。 -
SIGINT
和SIGTERM
将导致内部任务被取消。
cobalt/promise.hpp
Promise 是一个急切的协程,可以 co_await
和 co_return
值。也就是说,它不能使用 co_yield
。
cobalt::promise<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
Promise 默认是附加的。这意味着当 promise
句柄超出范围时会发送取消。
Promise 可以通过调用 detach
或使用前缀 +
运算符来分离。这是使用 detached 的运行时替代方案。分离的 Promise 在销毁时不会发送取消。
cobalt::promise<void> my_task();
cobalt::main co_main(int argc, char *argv[])
{
+my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
1 | 通过使用 + ,任务将被分离。否则,编译器将生成 nodiscard 警告。 |
执行器
除非在任何位置使用后跟执行器参数的 asio::executor_arg
,否则执行器将从 thread_local
get_executor 函数获取。
cobalt::promise<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
内存资源
内存资源从 thread_local
get_default_resource 函数获取,除非在任何位置使用后跟 polymorphic_allocator
参数的 std::allocator_arg
。
cobalt::promise<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
概述
template<typename Return>
struct [[nodiscard]] promise
{
promise(promise &&lhs) noexcept;
promise& operator=(promise && lhs) noexcept;
// enable `co_await`. (1)
auto operator co_await ();
// Ignore the return value, i.e. detach it. (2)
void operator +() &&;
// Cancel the promise.
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if the result is ready
bool ready() const;
// Check if the promise can be awaited.
explicit operator bool () const; (3)
// Detach or attach
bool attached() const;
void detach();
void attach();
// Create an already completed promimse
static promise
// Get the return value. If !ready() this function has undefined behaviour.
Return get();
};
1 | 支持 中断等待 |
2 | 这允许使用简单的 +my_task() 表达式创建并行运行的 promise。 |
3 | 这允许类似 while (p) co_await p; 的代码 |
Promise
协程 promise (promise::promise_type
) 具有以下属性。
-
Await deferred == cobalt/generator.hpp
Generator 是一个急切的协程,可以 co_await
并 co_yield
值给调用者。
cobalt::generator<int> example()
{
printf("In coro 1\n");
co_yield 2;
printf("In coro 3\n");
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n");
printf("In main %d\n", co_await f);
printf("In main %d\n", co_await f);
return 0;
}
这将生成以下输出
In main 0 In coro 1 In main 1 In main 2 In coro 3 In main 4

当 Push
(第二个模板参数)设置为非 void 时,可以将值推送到生成器中
cobalt::generator<int, int> example()
{
printf("In coro 1\n");
int i = co_yield 2;
printf("In coro %d\n", i);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main %d\n", co_await f(3)); (1)
co_return 0;
}
1 | 推送的值通过 operator() 传递给 co_yield 的结果。 |
这将生成以下输出
In main 0 In coro 1 In main 2 In coro 3
惰性
通过等待 initial 可以使生成器变为惰性。这个 co_await
表达式将产生 Push
值。这意味着生成器将等到第一次被等待时才开始工作,然后处理新推送的值并在下一个 co_yield 处恢复。
cobalt::generator<int, int> example()
{
int v = co_await cobalt::this_coro::initial;
printf("In coro %d\n", v);
co_yield 2;
printf("In coro %d\n", v);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n"); // < this is now before the co_await initial
printf("In main %d\n", co_await f(1));
printf("In main %d\n", co_await f(3));
return 0;
}
这将生成以下输出
In main 0 In main 1 In coro 1 In main 2 In coro 3 In main 4

执行器
除非在任何位置使用后跟执行器参数的 asio::executor_arg
,否则执行器将从 thread_local
get_executor 函数获取。
cobalt::generator<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
内存资源
内存资源从 thread_local
get_default_resource 函数获取,除非在任何位置使用后跟 polymorphic_allocator
参数的 std::allocator_arg
。
cobalt::generator<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
概述
template<typename Yield, typename Push = void>
struct [[nodiscard]] generator
{
// Movable
generator(generator &&lhs) noexcept = default;
generator& operator=(generator &&) noexcept;
// True until it co_returns & is co_awaited after (1)
explicit operator bool() const;
// Cancel the generator. (3)
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if a value is available
bool ready() const;
// Get the returned value. If !ready() this function has undefined behaviour.
Yield get();
// Cancel & detach the generator.
~generator();
// an awaitable that results in value of Yield
.
using generator_awaitable = unspecified;
// Present when Push
!= void
generator_awaitable operator()( Push && push);
generator_awaitable operator()(const Push & push);
// Present when Push
== void
, i.e. can co_await
the generator directly.
generator_awaitable operator co_await (); (2)
};
1 | 这允许类似 while (gen) co_await gen: 的代码 |
2 | 支持 中断等待 |
3 | 取消的生成器可能可恢复 |
急切生成器的销毁将被延迟,惰性生成器将立即销毁。 |
cobalt/task.hpp
任务是一个惰性协程,可以 co_await
和 co_return
值。也就是说,它不能使用 co_yield
。
cobalt::task<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
与 promise 不同,任务可以在创建它的执行器之外的另一个执行器上被等待或派生。
执行器
由于 task
是惰性的,它在构造时不需要执行器。它会尝试从调用者或等待者(如果存在)获取执行器。否则,它将默认为 thread_local 执行器。
内存资源
内存资源不是从 thread_local
get_default_resource 函数获取的,而是 pmr::get_default_resource()
,除非在任何位置使用后跟 polymorphic_allocator
参数的 std::allocator_arg
。
cobalt::task<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
概述
template<typename Return>
struct [[nodiscard]] task
{
task(task &&lhs) noexcept = default;
task& operator=(task &&) noexcept = default;
// enable `co_await`
auto operator co_await ();
};
通过调用 run(my_task()) ,任务可以从同步函数中同步使用。 |
cobalt/detached.hpp
Detached 是一个急切的协程,可以 co_await
但不能 co_return
值。也就是说,它不能被恢复,并且通常不会被等待。
cobalt::detached delayed_print(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
printf("Hello world\n");
}
cobalt::main co_main(int argc, char *argv[])
{
delayed_print();
co_return 0;
}
Detached 用于轻松地在后台运行协程。
cobalt::detached my_task();
cobalt::main co_main(int argc, char *argv[])
{
my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
1 | 派生 detached 协程。 |
一个 detached 可以这样给自己分配一个新的取消源
cobalt::detached my_task(asio::cancellation_slot sl)
{
co_await this_coro::reset_cancellation_source(sl);
// do somework
}
cobalt::main co_main(int argc, char *argv[])
{
asio::cancellation_signal sig;
my_task(sig.slot()); (1)
co_await delay(std::chrono::milliseconds(50));
sig.emit(asio::cancellation_type::all);
co_return 0;
}
执行器
除非在任何位置使用后跟执行器参数的 asio::executor_arg
,否则执行器将从 thread_local
get_executor 函数获取。
cobalt::detached my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
内存资源
内存资源从 thread_local
get_default_resource 函数获取,除非在任何位置使用后跟 polymorphic_allocator
参数的 std::allocator_arg
。
cobalt::detached my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
use_op
use_op
令牌是创建 op 的直接方式,即使用 cobalt::use_op
作为完成令牌将创建所需的 awaitable。
auto tim = cobalt::use_op.as_default_on(asio::steady_timer{co_await cobalt::this_coro::executor});
co_await tim.async_wait();
根据完成签名,co_await
表达式可能会抛出异常。
签名 | 返回类型 | 异常 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
任何异常 |
|
|
任何异常 |
use_op 永远不会立即完成,即 await_ready 总是返回 false,但总是暂停协程。 |
手动编码操作
操作是 [cobalt_operation] 功能的更高级实现。
该库使创建具有早期完成条件的异步操作变得容易,即完全避免协程暂停的条件。
例如,我们可以创建一个 wait_op
,如果计时器已经过期,它什么也不做。
struct wait_op : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
bool ready(cobalt::handler<system::error_code> ) (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) (3)
{
tim.async_wait(std::move(complete));
}
};
1 | 继承带有匹配签名的 op ,await_transform 会将其提取 |
2 | 检查操作是否就绪 - 从 await_ready 调用 |
3 | 如果操作未就绪,则启动它。 |
cobalt/concepts.hpp
Awaitable
Awaitable 是可以在 co_await
中使用的表达式。
template<typename Awaitable, typename Promise = void>
concept awaitable_type = requires (Awaitable aw, std::coroutine_handle<Promise> h)
{
{aw.await_ready()} -> std::convertible_to<bool>;
{aw.await_suspend(h)};
{aw.await_resume()};
};
template<typename Awaitable, typename Promise = void>
concept awaitable =
awaitable_type<Awaitable, Promise>
|| requires (Awaitable && aw) { {std::forward<Awaitable>(aw).operator co_await()} -> awaitable_type<Promise>;}
|| requires (Awaitable && aw) { {operator co_await(std::forward<Awaitable>(aw))} -> awaitable_type<Promise>;};
该库中的 awaitable 要求协程 promise 如果提供执行器,则以 const 引用返回执行器。否则它将使用 this_thread::get_executor() 。 |
启用 awaitable
继承 enable_awaitables
将使协程能够通过 await_transform
co_await
任何在没有 await_transform
的情况下可以 co_await
的东西。
cobalt/this_coro.hpp
this_coro
命名空间提供了访问协程 promise 内部状态的实用工具。
伪 awaitable
// Awaitable type that returns the executor of the current coroutine.
struct executor_t {}
constexpr executor_t executor;
// Awaitable type that returns the cancellation state of the current coroutine.
struct cancellation_state_t {};
constexpr cancellation_state_t cancellation_state;
// Reset the cancellation state with custom or default filters.
constexpr unspecified reset_cancellation_state();
template<typename Filter>
constexpr unspecified reset_cancellation_state(
Filter && filter);
template<typename InFilter, typename OutFilter>
constexpr unspecified reset_cancellation_state(
InFilter && in_filter,
OutFilter && out_filter);
// get & set the throw_if_cancelled setting.
unspecified throw_if_cancelled();
unspecified throw_if_cancelled(bool value);
// Set the cancellation source in a detached.
unspecified reset_cancellation_source();
unspecified reset_cancellation_source(asio::cancellation_slot slot);
// get the allocator the promise
struct allocator_t {};
constexpr allocator_t allocator;
// get the current cancellation state-type
struct cancelled_t {};
constexpr cancelled_t cancelled;
// set the over-eager mode of a generator
struct initial_t {};
constexpr initial_t initial;
Await 分配器
支持 enable_await_allocator
的协程的分配器可以通过以下方式获取
co_await cobalt::this_coro::allocator;
为了为您自己的协程启用此功能,您可以使用 CRTP 模式继承 enable_await_allocator
struct my_promise : cobalt::enable_await_allocator<my_promise>
{
using allocator_type = __your_allocator_type__;
allocator_type get_allocator();
};
如果可用,分配器将由 use_op 使用 |
Await 执行器
支持 enable_await_executor
的协程的执行器可以通过以下方式获取
co_await cobalt::this_coro::executor;
为了为您自己的协程启用此功能,您可以使用 CRTP 模式继承 enable_await_executor
struct my_promise : cobalt::enable_await_executor<my_promise>
{
using executor_type = __your_executor_type__;
executor_type get_executor();
};
如果可用,执行器将由 use_op 使用 |
Await 延迟
您的协程 promise 可以继承 enable_await_deferred
,以便在 co_await
表达式中支持单个签名 asio::deferred
。
由于 asio::deferred
现在是默认完成令牌,因此这允许以下代码,而无需指定任何完成令牌或其他特化。
asio::steady_timer t{co_await cobalt::this_coro::executor};
co_await t.async_wait();
内存资源基类
promise 的 promise_memory_resource_base
基类将提供一个 get_allocator
,该分配器从默认资源或遵循 std::allocator_arg
参数传递的资源中获取。同样,它将添加 operator new
重载,以便协程为其帧分配使用相同的内存资源。
如果取消则抛出
promise_throw_if_cancelled_base
提供了基本选项,允许操作使协程在等待另一个实际的 awaitable 时抛出异常。
co_await cobalt::this_coro::throw_if_cancelled;
取消状态
promise_cancellation_base
提供了基本选项,允许操作启用协程具有可通过 reset_cancellation_state
重置的 cancellation_state。
co_await cobalt::this_coro::reset_cancellation_state();
为了方便起见,还有一个快捷方式可以检查当前的取消状态
asio::cancellation_type ct = (co_await cobalt::this_coro::cancellation_state).cancelled();
asio::cancellation_type ct = co_await cobalt::this_coro::cancelled; // same as above
cobalt/this_thread.hpp
由于一切都是单线程的,此库为每个线程提供了一个执行器和默认内存资源。
namespace boost::cobalt::this_thread
{
pmr::memory_resource* get_default_resource() noexcept; (1)
pmr::memory_resource* set_default_resource(pmr::memory_resource* r) noexcept; (2)
pmr::polymorphic_allocator<void> get_allocator(); (3)
typename asio::io_context::executor_type & get_executor(); (4)
void set_executor(asio::io_context::executor_type exec) noexcept; (5)
}
1 | 获取默认资源 - 除非设置,否则将为 pmr::get_default_resource |
2 | 设置默认资源 - 返回之前设置的资源 |
3 | 获取一个包装 (1) 的分配器 |
4 | 获取线程的执行器 - 如果未设置则抛出 |
5 | 设置当前线程的执行器。 |
协程将使用这些作为默认值,但会保留一个副本以防万一。
唯一的例外是 cobalt 操作的初始化,它将使用 this_thread::executor 进行重新抛出。 |
cobalt/channel.hpp
通道可用于在单个线程上的不同协程之间交换数据。
概述
template<typename T>
struct channel
{
// create a channel with a buffer limit, executor & resource.
explicit
channel(std::size_t limit = 0u,
executor executor = this_thread::get_executor(),
pmr::memory_resource * resource = this_thread::get_default_resource());
// not movable.
channel(channel && rhs) noexcept = delete;
channel & operator=(channel && lhs) noexcept = delete;
using executor_type = executor;
const executor_type & get_executor();
// Closes the channel
~channel();
bool is_open() const;
// close the operation, will cancel all pending ops, too
void close();
// an awaitable that yields T
using read_op = unspecified;
// an awaitable that yields void
using write_op = unspecified;
// read a value to a channel
read_op read();
// write a value to the channel
write_op write(const T && value);
write_op write(const T & value);
write_op write( T && value);
write_op write( T & value);
// write a value to the channel if T is void
};
描述
通道是两个协程通信和同步的工具。
const std::size_t buffer_size = 2;
channel<int> ch{exec, buffer_size};
// in coroutine (1)
co_await ch.write(42);
// in coroutine (2)
auto val = co_await ch.read();
1 | 向通道发送一个值 - 将阻塞直到可以发送 |
2 | 从通道读取一个值 - 将阻塞直到有一个值可等待。 |
两个操作都可能阻塞,具体取决于通道缓冲区大小。
如果缓冲区大小为零,则 read
和 write
需要同时发生,即充当集合点。
如果缓冲区未满,则写操作不会暂停协程;同样,如果缓冲区不为空,则读操作不会暂停。
如果两个操作同时完成(空缓冲区总是如此),则第二个操作将被发布到执行器以供稍后完成。
通道类型可以是 void ,在这种情况下 write 不需要参数。 |
通道操作可以在不丢失数据的情况下被取消。这使得它们可以与 race 一起使用。
generator<variant2::variant<int, double>> merge(
channel<int> & c1,
channel<double> & c2)
{
while (c1 && c2)
co_yield co_await race(c1, c2);
}
示例
cobalt::promise<void> producer(cobalt::channel<int> & chan)
{
for (int i = 0; i < 4; i++)
co_await chan.write(i);
chan.close();
}
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
while (c.is_open())
std::cout << co_await c.read() << std::endl;
co_await p;
co_return 0;
}
此外,还提供了 channel_reader
,以使读取通道更方便且可与 BOOST_COBALT_FOR 一起使用。
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
std::cout << value << std::endl;
co_await p;
co_return 0;
}
cobalt/with.hpp
with
设施提供了一种执行协程异步拆卸的方法。也就是说,它就像一个异步析构函数调用。
struct my_resource
{
cobalt::promise<void> await_exit(std::exception_ptr e);
};
cobalt::promise<void> work(my_resource & res);
cobalt::promise<void> outer()
{
co_await cobalt::with(my_resource(), &work);
}
拆卸可以通过提供 await_exit
成员函数或返回 awaitable 的 tag_invoke
函数来完成,或者通过将拆卸作为 with
的第三个参数提供。
using ws_stream = beast::websocket::stream<asio::ip::tcp::socket>>;
cobalt::promise<ws_stream> connect(urls::url); (1)
cobalt::promise<void> disconnect(ws_stream &ws); (2)
auto teardown(const boost::cobalt::with_exit_tag & wet , ws_stream & ws, std::exception_ptr e)
{
return disconnect(ws);
}
cobalt::promise<void> run_session(ws_stream & ws);
cobalt::main co_main(int argc, char * argv[])
{
co_await cobalt::with(co_await connect(argv[1]), &run_session, &teardown);
co_return 0;
}
1 | 实现 websocket 连接和 websocket 初始化 |
2 | 实现有序关机。 |
如果作用域在没有异常的情况下退出,则 std::exception_ptr 为空。注意:exit 函数通过引用获取 exception_ptr 并修改它是合法的。 |
cobalt/race.hpp
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_wait()
{
co_await cobalt::race(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::race(aws); (2)
}
race
的第一个参数可以是 uniform random bit generator。
extern promise<void> pv1, pv2;
std::vector<promise<void>> pvv;
std::default_random_engine rdm{1};
// if everything returns void race returns the index
std::size_t r1 = co_await race(pv1, pv2);
std::size_t r2 = co_await race(rdm, pv1, pv2);
std::size_t r3 = co_await race(pvv);
std::size_t r4 = co_await race(rdm, pvv);
// variant if not everything is void. void become monostate
extern promise<int> pi1, pi2;
variant2::variant<monostate, int, int> r5 = co_await race(pv1, pi1, pi2);
variant2::variant<monostate, int, int> r6 = co_await race(rdm, pv1, pi1, pi2);
// a range returns a pair of the index and the result if non-void
std::vector<promise<int>> piv;
std::pair<std::size_t, int> r7 = co_await race(piv);
std::pair<std::size_t, int> r8 = co_await race(rdm, piv);
中断等待
当参数作为右值引用传递时,race 将尝试在 awaitable 上使用 .interrupt_await
来通知 awaitable 立即完成,并且结果将被忽略。如果支持,Awaitable 必须在 interrupt_await
返回之前恢复等待协程。如果 race
未检测到该函数,它将发送取消。
这意味着你可以像这样重复使用 race
cobalt::promise<void> do_wait()
{
auto t1 = task1();
auto t2 = task2();
co_await cobalt::race(t1, t2); (1)
co_await cobalt::race(t1, t2); (2)
}
1 | 等待第一个任务完成 |
2 | 等待另一个任务完成 |
race
将调用 awaitable
的函数,就像在 co_await
表达式中使用一样,或者根本不评估它们。
left_race
left_race
函数类似于 race
,但遵循严格的从左到右扫描。这可能导致饥饿问题,这就是为什么它不是推荐的默认值,但如果采取适当的措施,它对优先级排序很有用。
概述
// Concept for the random number generator.
template<typename G>
concept uniform_random_bit_generator =
requires ( G & g)
{
{typename std::decay_t<G>::result_type() } -> std::unsigned_integral; // is an unsigned integer type
// T Returns the smallest value that G's operator() may return. The value is strictly less than G::max(). The function must be constexpr.
{std::decay_t<G>::min()} -> std::same_as<typename std::decay_t<G>::result_type>;
// T Returns the largest value that G's operator() may return. The value is strictly greater than G::min(). The function must be constexpr.
{std::decay_t<G>::max()} -> std::same_as<typename std::decay_t<G>::result_type>;
{g()} -> std::same_as<typename std::decay_t<G>::result_type>;
} && (std::decay_t<G>::max() > std::decay_t<G>::min());
// Variadic race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, awaitable ... Promise>
awaitable race(URBG && g, Promise && ... p);
// Ranged race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, range<awaitable> PromiseRange>
awaitable race(URBG && g, PromiseRange && p);
// Variadic race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable race(Promise && ... p);
// Ranged race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable race(PromiseRange && p);
// Variadic left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable left_race(Promise && ... p);
// Ranged left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable left_race(PromiseRange && p);
选择空范围将导致抛出异常。 |
cobalt/gather.hpp
gather
函数可用于同时 co_await
多个 awaitable,并传递取消。
该函数将收集所有完成并以 system::result
的形式返回它们,即以值形式捕获概念。一个 awaitable 抛出异常不会取消其他 awaitable。
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_gather()
{
co_await cobalt::gather(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::gather(aws); (2)
}
gather
将调用 awaitable
的函数,就像在 co_await
表达式中使用一样。
extern promise<void> pv1, pv2;
std::tuple<system::result<int>, system::result<int>> r1 = co_await gather(pv1, pv2);
std::vector<promise<void>> pvv;
pmr::vector<system::result<void>> r2 = co_await gather(pvv);
extern promise<int> pi1, pi2;
std::tuple<system::result<monostate>,
system::result<monostate>,
system::result<int>,
system::result<int>> r3 = co_await gather(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<system::result<int>> r4 = co_await gather(piv);
概述
// Variadic gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable gather(Promise && ... p);
// Ranged gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable gather(PromiseRange && p);
cobalt/join.hpp
join
函数可用于同时 co_await
多个 awaitable,并正确连接取消。
该函数将收集所有完成并将其作为值返回,除非抛出异常。如果抛出异常,所有未完成的操作都将被取消(如果可能,则中断),并重新抛出第一个异常。
void 将作为 variant2::monostate 在元组中返回,除非所有 awaitable 都产生 void。 |
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_join()
{
co_await cobalt::join(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::join(aws); (2)
}
join
将调用 awaitable
的函数,就像在 co_await
表达式中使用一样。
extern promise<void> pv1, pv2;
/* void */ co_await join(pv1, pv2);
std::vector<promise<void>> pvv;
/* void */ co_await join(pvv);
extern promise<int> pi1, pi2;
std::tuple<monostate, monostate, int, int> r1 = co_await join(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<int> r2 = co_await join(piv);
概述
// Variadic join
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable join(Promise && ... p);
// Ranged join
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable join(PromiseRange && p);
选择空范围将导致异常。 |
cobalt/wait_group.hpp
wait_group
函数可用于管理多个 promise
类型的协程。它与 cobalt/with.hpp 开箱即用,因为它具有匹配的 await_exit
成员。
本质上,wait_group
是一个动态的 promise 列表,它有一个 race
函数 (wait_one
),一个 gather
函数 (wait_all
),并在作用域退出时进行清理。
struct wait_group
{
// create a wait_group
explicit
wait_group(asio::cancellation_type normal_cancel = asio::cancellation_type::none,
asio::cancellation_type exception_cancel = asio::cancellation_type::all);
// insert a task into the group
void push_back(promise<void> p);
// the number of tasks in the group
std::size_t size() const;
// remove completed tasks without waiting (i.e. zombie tasks)
std::size_t reap();
// cancel all tasks
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// wait for one task to complete.
wait_one_op wait_one();
// wait for all tasks to complete
wait_op wait();
// wait for all tasks to complete
wait_op operator co_await ();
// when used with with
, this will receive the exception
// and wait for the completion
// if ep
is set, this will use the exception_cancel
level,
// otherwise the normal_cancel
to cancel all promises.
wait_op await_exit(std::exception_ptr ep);
};
cobalt/spawn.hpp
auto spawn(Context & context, task<T> && t, CompletionToken&& token);
auto spawn(Executor executor, task<T> && t, CompletionToken&& token);
Spawn 将调度其启动并发布完成。这使得使用 task 在另一个执行器上运行任务并在当前执行器上使用 use_op 消费结果是安全的。也就是说,spawn
可用于跨线程。
示例
cobalt::task<int> work();
int main(int argc, char *argv[])
{
asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_1};
auto f = spawn(ctx, work(), asio::use_future);
ctx.run();
return f.get();
}
调用者需要确保执行器不会同时在多个线程上运行,例如,通过使用单线程 asio::io_context 。 |
cobalt/run.hpp
run
函数类似于 spawn,但同步运行。它将在内部设置执行上下文和内存资源。
这在将一段 cobalt 代码集成到同步应用程序中时非常有用。
概述
// Run the task and return it's value or rethrow any exception.
T run(task<T> t);
示例
cobalt::task<int> work();
int main(int argc, char *argv[])
{
return run(work());
}
cobalt/thread.hpp
线程类型是创建类似于 main
但不使用 signal_set
的环境的另一种方式。
cobalt::thread my_thread()
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
1 | 获取线程运行的执行器 |
2 | 与 asio 对象一起使用 |
3 | co_await 一个 cobalt 操作 |
要使用线程,您可以像使用 std::thread
一样使用它
int main(int argc, char * argv[])
{
auto thr = my_thread();
thr.join();
return 0;
}
线程也是一个 awaitable
(包括取消)。
cobalt::main co_main(int argc, char * argv[])
{
auto thr = my_thread();
co_await thr;
co_return 0;
}
销毁一个 detached 线程将导致硬停止(io_context::stop )并加入线程。 |
除了等待 cobalt/thread.hpp 和 cobalt/spawn.hpp 之外,该库中没有任何东西是线程安全的。如果您需要在线程之间传输数据,您将需要一个线程安全的实用工具,例如 asio::concurrent_channel 。您不能在线程之间共享任何 cobalt 原语,唯一的例外是能够将 task 派生 到另一个线程的执行器上。 |
执行器
它还将创建一个 asio::io_context
来运行,您可以通过 this_coro::executor
获取它。它将被分配给 cobalt::this_thread::get_executor()
。
内存资源
它还创建了一个内存资源,该资源将用作内部内存分配的默认值。它将被分配给 thread_local
到 cobalt::this_thread::get_default_resource()
。
概述
struct thread
{
// Send a cancellation signal
void cancel(asio::cancellation_type type = asio::cancellation_type::all);
// Allow the thread to be awaited. NOOP if the thread is invalid.
auto operator co_await() &-> detail::thread_awaitable; (1)
auto operator co_await() && -> detail::thread_awaitable; (2)
// Stops the io_context & joins the executor
~thread();
/// Move constructible
thread(thread &&) noexcept = default;
using executor_type = executor;
using id = std::thread::id;
id get_id() const noexcept;
// Add the functions similar to `std::thread`
void join();
bool joinable() const;
void detach();
executor_type get_executor() const;
};
1 | 支持 中断等待 |
2 | 始终转发取消 |
cobalt/result.hpp
Awaitable 可以修改为返回 system::result
或 std::tuple
,而不是使用异常。
// value only
T res = co_await foo();
// as result
system::result<T, std::exception_ptr> res = co_await cobalt::as_result(foo());
// as tuple
std::tuple<std::exception_ptr, T> res = co_await cobalt::as_tuple(foo());
Awaitable 还可以通过提供使用 cobalt::as_result_tag
和 cobalt::as_tuple_tag
的 await_resume
重载来提供处理结果和元组的自定义方式。
your_result_type await_resume(cobalt::as_result_tag);
your_tuple_type await_resume(cobalt::as_tuple_tag);
// example of an op with result system::error_code, std::size_t
system::result<std::size_t> await_resume(cobalt::as_result_tag);
std::tuple<system::error_code, std::size_t> await_resume(cobalt::as_tuple_tag);
Awaitable 仍然允许抛出异常,例如对于 OOM 等关键异常。 |
cobalt/async_for.hpp
对于像生成器这样的类型,提供了 BOOST_COBALT_FOR
宏,以模拟 for co_await
循环。
cobalt::generator<int> gen();
cobalt::main co_main(int argc, char * argv[])
{
BOOST_COBALT_FOR(auto i, gen())
printf("Generated value %d\n", i);
co_return 0;
}
cobalt/error.hpp
为了更方便地管理错误,cobalt 提供了一个 error_category
与 boost::system::error_code
一起使用。
enum class error
{
moved_from,
detached,
completed_unexpected,
wait_not_ready,
already_awaited,
allocation_failed
};
system::error_category & cobalt_category();
system::error_code make_error_code(error e);
cobalt/composition.hpp
包含 cobalt/composition.hpp
提供了将任何函数(最后一个参数为 cobalt::completion_handler
)转换为组合操作的定义。这允许简单地创建组合操作。
co_await
语句的结果会自动转换为元组(类似于使用的 as_tuple
)以避免异常。
结果需要 co_return
。传递给协程的 Handler 不得修改。
struct echo_op final : op<system::error_code>
{
echo_op(cobalt::io::stream & str) : stream(str) {}
void initiate(completion_handler<system::error_code>) final;
cobalt::io::stream & str;
};
void echo_op::initiate(completion_handler<system::error_code, std::size_t>)
{
char buf[4096];
auto [ec, n] = co_await str.read_some(buf);
auto buf = asio::buffer(buf, n);
while (!ec && !buf.empty() && !!co_await this_coro::cancelled)
{
std::tie(ec, n) = co_await str.write_some(buf);
buf += n;
}
if (!!co_await this_coro::cancelled)
co_return {asio::error::operation_aborted, m};
else
co_return {system::error_code{}, m};
}
cobalt/config.hpp
配置添加器允许配置 boost.cobalt 的一些实现细节。
executor_type
执行器类型默认为 boost::asio::any_io_executor
。
您可以通过定义 BOOST_COBALT_CUSTOM_EXECUTOR
并自行添加 boost::cobalt::executor
类型来将其设置为 boost::asio::any_io_executor
。
或者,可以定义 BOOST_COBALT_USE_IO_CONTEXT
来将执行器设置为 boost::asio::io_context::executor_type
。
pmr
Boost.cobalt 可以与不同的 pmr 实现一起使用,默认为 std::pmr
。
可以使用以下宏进行配置
-
BOOST_COBALT_USE_STD_PMR
-
BOOST_COBALT_USE_BOOST_CONTAINER_PMR
-
BOOST_COBALT_USE_CUSTOM_PMR
如果您定义 BOOST_COBALT_USE_CUSTOM_PMR
,您将需要提供一个 boost::cobalt::pmr
命名空间,它是 std::pmr
的直接替代品。
或者,可以使用以下方式禁用 pmr
:
-
BOOST_COBALT_NO_PMR
.
use_op
使用一个小型缓冲区优化资源,其大小可以通过定义 BOOST_COBALT_SBO_BUFFER_SIZE
来设置,默认为 4096 字节。
cobalt/io/buffer.hpp
缓冲区头文件提供了用于 IO 函数的通用缓冲区序列表示。它实现为一个单个缓冲区后跟一个缓冲区跨度。这允许从前面丢弃字节,也允许使用单个 buffer
,一个 registered_buffer
。
const_buffer_sequence
实现 ConstBufferSequence
,mutable_buffer_sequence
实现 MutableBufferSequence
。
namespace io
{
// Aliases for the asio functions.
using asio::buffer;
using asio::buffer_copy;
using asio::buffer_size;
}
mutable_buffer_sequence
namespace io
{
using asio::buffer;
using asio::mutable_buffer;
struct mutable_buffer_sequence
{
std::size_t buffer_count() const;
mutable_buffer_sequence(asio::mutable_registered_buffer buffer = {});
mutable_buffer_sequence(asio::mutable_buffer head);
template<typename T>
requires (std::constructible_from<std::span<const asio::mutable_buffer>, T&&>)
mutable_buffer_sequence(T && value);
mutable_buffer_sequence(std::span<const asio::mutable_buffer> spn);
// drop n bytes from the buffer sequence
mutable_buffer_sequence & operator+=(std::size_t n);
// a random access iterator over the buffers
struct const_iterator;
const_iterator begin() const;
const_iterator end() const;
// is this a registered buffer
bool is_registered() const;
};
// Invokes Func either with the mutable_buffer_sequence, an `asio::mutable_registered_buffer` or a `asio::mutable_buffer`.
template<typename Func>
auto visit(const mutable_buffer_sequence & seq, Func && func);
const_buffer_sequence
using asio::const_buffer;
struct const_buffer_sequence
{
std::size_t buffer_count();
const_buffer_sequence(asio::const_registered_buffer buffer);
const_buffer_sequence(asio::mutable_registered_buffer buffer);
const_buffer_sequence(asio::const_buffer head) { this->head_ = head; }
const_buffer_sequence(asio::mutable_buffer head) { this->head_ = head; }
template<typename T>
requires (std::constructible_from<std::span<const asio::const_buffer>, T&&>)
const_buffer_sequence(T && value);
const_buffer_sequence(std::span<const asio::const_buffer> spn);
// drop bytes from the front, i.e. advance the buffer
const_buffer_sequence & operator+=(std::size_t n);
// a random access iterator over the buffers
struct const_iterator;
// Access the sequence as a range.
const_iterator begin() const;
const_iterator end() const;
// is this a registered buffer
bool is_registered() const;
};
// Invokes Func either with the const_buffer_sequence, an `asio::const_registered_buffer` or a `asio::const_buffer.
template<typename Func>
auto visit(const const_buffer_sequence & seq, Func && func);
}
cobalt/io/ops.hpp
此包装器中的大多数功能都是通过操作实现的。
它们是类型擦除的,但不是通过使用 virtual
。这使得去虚拟化更容易。必须提供 implementation
函数,而 try_implementation_t
是可选的,并将在 ready
函数中使用。两者都将以 void *this
作为第一个参数调用。
struct [[nodiscard]] write_op final : op<system::error_code, std::size_t>
{
const_buffer_sequence buffer;
using implementation_t = void(void*, const_buffer_sequence, completion_handler<system::error_code, std::size_t>);
using try_implementation_t = void(void*, const_buffer_sequence, handler<system::error_code, std::size_t>);
write_op(const_buffer_sequence buffer,
void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code, std::size_t> handler) final;
void ready(handler<system::error_code, std::size_t> handler) final;
};
struct [[nodiscard]] read_op final : op<system::error_code, std::size_t>
{
mutable_buffer_sequence buffer;
using implementation_t = void(void*, mutable_buffer_sequence, completion_handler<system::error_code, std::size_t>);
using try_implementation_t = void(void*, mutable_buffer_sequence, handler<system::error_code, std::size_t>);
read_op(mutable_buffer_sequence buffer,
void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code, std::size_t> handler) final;
void ready(handler<system::error_code, std::size_t> handler) final;
};
struct [[nodiscard]] write_at_op final : op<system::error_code, std::size_t>
{
std::uint64_t offset;
const_buffer_sequence buffer;
using implementation_t = void(void*, std::uint64_t, const_buffer_sequence, completion_handler<system::error_code, std::size_t>);
using try_implementation_t = void(void*, std::uint64_t, const_buffer_sequence, handler<system::error_code, std::size_t>);
write_at_op(std::uint64_t offset,
const_buffer_sequence buffer,
void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code, std::size_t> handler) final;
void ready(handler<system::error_code, std::size_t> handler) final;
};
struct [[nodiscard]] read_at_op final : op<system::error_code, std::size_t>
{
std::uint64_t offset;
mutable_buffer_sequence buffer;
using implementation_t = void(void*, std::uint64_t, mutable_buffer_sequence, completion_handler<system::error_code, std::size_t>);
using try_implementation_t = void(void*, std::uint64_t, mutable_buffer_sequence, handler<system::error_code, std::size_t>);
read_at_op(std::uint64_t offset,
mutable_buffer_sequence buffer,
void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code, std::size_t> handler) final;
void ready(handler<system::error_code, std::size_t> handler) final;
};
struct [[nodiscard]] wait_op final : op<system::error_code>
{
using implementation_t = void(void*, completion_handler<system::error_code>);
using try_implementation_t = void(void*, handler<system::error_code>);
wait_op(void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code> handler) final;
void ready(handler<system::error_code> handler) final;
};
当操作未被等待时,操作可以重用,并且公共成员(例如缓冲区)可以修改。例如:
write_op wo = do_w;
auto sz = co_await wo;
while (sz > 0)
{
wo.buffer += sz;
sz = co_await wo;
}
cobalt/io/steady_timer.hpp
steady_timer 是 asio::steady_timer
的一个简单包装器。
如果计时器已经过期,co_await t.wait() 不会暂停。 |
struct steady_timer
{
/// The clock type.
typedef std::chrono::steady_clock clock_type;
/// The duration type of the clock.
typedef typename clock_type::duration duration;
/// The time point type of the clock.
typedef typename clock_type::time_point time_point;
steady_timer(const cobalt::executor & executor = this_thread::get_executor());
steady_timer(const time_point& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
steady_timer(const duration& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
// cancel the timer. This is safe to call from another thread
void cancel();
// The current expiration time.
time_point expiry() const;
// Rest the expiry time, either with an absolute time or a duration.
void reset(const time_point& expiry_time);
void reset(const duration& expiry_time);
// Check if the timer is already expired.
bool expired() const;
// Get the wait operation.
[[nodiscard]] wait_op wait();
};
cobalt/io/system_timer.hpp
system_timer 是 asio::system_timer
的一个简单包装器。
如果计时器已经过期,co_await t.wait() 不会暂停。 |
struct system_timer
{
/// The clock type.
typedef std::chrono::system_clock clock_type;
/// The duration type of the clock.
typedef typename clock_type::duration duration;
/// The time point type of the clock.
typedef typename clock_type::time_point time_point;
system_timer(const cobalt::executor & executor = this_thread::get_executor());
system_timer(const time_point& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
system_timer(const duration& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
// cancel the timer. This is safe to call from another thread
void cancel();
// The current expiration time.
time_point expiry() const;
// Rest the expiry time, either with an absolute time or a duration.
void reset(const time_point& expiry_time);
void reset(const duration& expiry_time);
// Check if the timer is already expired.
bool expired() const;
// Get the wait operation.
[[nodiscard]] wait_op wait();
};
cobalt/io/sleep.hpp
sleep 操作是计时器的便利包装器。每个 sleep 操作都会创建一个计时器,因此重用计时器通常更高效。
// Waits using a steady_timer.
template<typename Duration>
inline auto sleep(const std::chrono::time_point<std::chrono::steady_clock, Duration> & tp);
// Waits using a system_timer.
template<typename Duration>
inline auto sleep(const std::chrono::time_point<std::chrono::system_clock, Duration> & tp);
// Waits using a steady_timer.
template<typename Rep, typename Period>
inline auto sleep(const std::chrono::duration<Rep, Period> & dur);
cobalt/io/signal_set.hpp
signal_set 是 signal_set 的包装器。
概述
struct signal_set
{
signal_set(const cobalt::executor & executor = this_thread::get_executor());
signal_set(std::initializer_list<int> sigs, const cobalt::executor & executor = this_thread::get_executor());
// Cancel all operations associated with the signal set.
[[nodiscard]] system::result<void> cancel();
// Remove all signals from a signal_set.
[[nodiscard]] system::result<void> clear();
// Add a signal to a signal_set.
[[nodiscard]] system::result<void> add(int signal_number);
// Remove a signal from a signal_set.
[[nodiscard]] system::result<void> remove(int signal_number);
};
示例
io::signal_set s = {SIGINT, SIGTERM};
int sig = co_await s.wait();
if (s == SIGINT)
interrupt();
else if (s == SIGTERM)
terminate();
cobalt/io/stream.hpp
流是一个 IO 对象,允许读写,例如 tcp 套接字。
struct BOOST_SYMBOL_VISIBLE write_stream
{
virtual ~write_stream() = default;
[[nodiscard]] virtual write_op write_some(const_buffer_sequence buffer) = 0;
};
struct BOOST_SYMBOL_VISIBLE read_stream
{
virtual ~read_stream() = default;
[[nodiscard]] virtual read_op read_some(mutable_buffer_sequence buffer) = 0;
};
struct stream : read_stream, write_stream
{
};
cobalt/io/random_access_device.hpp
random_access_device 是一种 IO 对象,允许在随机位置进行读写。例如,文件。
struct BOOST_SYMBOL_VISIBLE random_access_write_device
{
virtual ~random_access_write_device() = default;
[[nodiscard]] virtual write_at_op write_some_at(std::uint64_t offset, const_buffer_sequence buffer) = 0;
};
struct BOOST_SYMBOL_VISIBLE random_access_read_device
{
virtual ~random_access_read_device() = default;
[[nodiscard]] virtual read_at_op read_some_at(std::uint64_t offset, mutable_buffer_sequence buffer) = 0;
};
struct random_access_device : random_access_read_device, random_access_write_device
{
};
cobalt/io/read.hpp
read
和 read_at
函数读取直到缓冲区满。
struct read_all final : op<system::error_code, std::size_t>
{
read_op step;
read_all(read_op op);
};
read_all read(stream & str, mutable_buffer_sequence buffer);
struct read_all_at final : op<system::error_code, std::size_t>
{
read_at_op step;
read_all_at(read_at_op op);
};
read_all_at read_at(stream & str, std::uint64_t offset, mutable_buffer_sequence buffer);
cobalt/io/read.hpp
write
和 write_at
将整个缓冲区写入流。
struct write_all final : op<system::error_code, std::size_t>
{
read_op step;
write_all(read_op op);
};
write_all write(stream & str, const_buffer_sequence buffer);
struct write_all_at final : op<system::error_code, std::size_t>
{
write_at_op step;
write_all_at(write op);
};
write_all_at write_at(stream & str, std::uint64_t offset, const_buffer_sequence buffer);
cobalt/io/file.hpp
文件对象提供文件 IO,这可能是异步的,具体取决于 asio 的配置方式。如果 asio 不支持异步文件 IO,cobalt 将回退到同步操作。
struct file
{
enum flags
{
read_only = O_RDONLY,
write_only = O_WRONLY,
read_write = O_RDWR,
append = O_APPEND,
create = O_CREAT,
exclusive = O_EXCL,
truncate = O_TRUNC,
sync_all_on_write = O_SYNC
};
// Implement bitmask operations as shown in C++ Std [lib.bitmask.types].
friend flags operator&(flags x, flags y);
friend flags operator|(flags x, flags y);
friend flags operator^(flags x, flags y);
friend flags operator~(flags x);
friend flags& operator&=(flags& x, flags y);
friend flags& operator|=(flags& x, flags y);
friend flags& operator^=(flags& x, flags y);
/// Basis for seeking in a file.
enum seek_basis
{
seek_set = SEEK_SET,
seek_cur = SEEK_CUR,
seek_end = SEEK_END
};
using native_handle_type = __unspecified__;
system::result<void> assign(const native_handle_type & native_file);
system::result<void> cancel();
executor get_executor();
bool is_open() const;
system::result<void> close();
native_handle_type native_handle();
system::result<void> open(const char * path, flags open_flags);
system::result<void> open(const std::string & path, flags open_flags);
system::result<native_handle_type> release();
system::result<void> resize(std::uint64_t n);
system::result<std::uint64_t> size() const;
system::result<void> sync_all();
system::result<void> sync_data();
explicit file(executor exec);
file(executor exec, native_handle_type fd);
};
cobalt/io/stream_file.hpp
stream_file
提供对文件的流访问。如果 asio 不支持异步文件 IO,cobalt 将回退到同步操作。
struct stream_file : file, stream
{
stream_file(const executor & executor = this_thread::get_executor());
stream_file(const char * path, file::flags open_flags,
const executor & executor = this_thread::get_executor());
stream_file(const std::string & path, file::flags open_flags,
const executor & executor = this_thread::get_executor());
stream_file(const native_handle_type & native_file,
const executor & executor = this_thread::get_executor());
stream_file(stream_file && sf) noexcept;
write_op write_some(const_buffer_sequence buffer);
read_op read_some(mutable_buffer_sequence buffer);
// advance the position in the file
system::result<std::uint64_t> seek(
std::int64_t offset,
seek_basis whence);
};
cobalt/io/random_access_file.hpp
stream_file
提供对文件的随机访问。如果 asio 不支持异步文件 IO,cobalt 将回退到同步操作。
struct random_access_file : file, random_access_device
{
using native_handle_type = file::native_handle_type;
random_access_file(const executor & executor = this_thread::get_executor());
random_access_file(const char * path, file::flags open_flags,
const executor & executor = this_thread::get_executor());
random_access_file(const std::string & path, file::flags open_flags,
const executor & executor = this_thread::get_executor());
random_access_file(const native_handle_type & native_file,
const executor & executor = this_thread::get_executor());
random_access_file(random_access_file && sf) noexcept;
write_at_op write_some_at(std::uint64_t offset, const_buffer_sequence buffer);
read_at_op read_some_at(std::uint64_t offset, mutable_buffer_sequence buffer);
};
cobalt/io/serial_port.hpp
serial_port 是 asio::serial_port
的一个简单包装器。
struct serial_port
{
// The underlying handle
using native_handle_type = typename asio::basic_serial_port<executor>::native_handle_type;
native_handle_type native_handle();
serial_port(const cobalt::executor & executor = this_thread::get_executor());
serial_port(serial_port && lhs) = default;
serial_port(std::string_view device, const cobalt::executor & executor = this_thread::get_executor());
serial_port(native_handle_type native_handle, const cobalt::executor & executor = this_thread::get_executor());
system::result<void> close();
system::result<void> cancel();
bool is_open() const;
// Send a break sequence to the serial port.
system::result<void> send_break();
[[nodiscard]] system::result<void> set_baud_rate(unsigned rate);
[[nodiscard]] system::result<unsigned> get_baud_rate();
[[nodiscard]] system::result<void> set_character_size(unsigned rate);
[[nodiscard]] system::result<unsigned> get_character_size();
// An enumerator {none, software, hardware}
using flow_control = asio::serial_port_base::flow_control::type;
[[nodiscard]] system::result<void> set_flow_control(flow_control rate);
[[nodiscard]] system::result<flow_control> get_flow_control();
// An enumerator {none, odd, even}
using parity = asio::serial_port_base::parity::type;
[[nodiscard]] system::result<void> set_parity(parity rate);
[[nodiscard]] system::result<parity> get_parity();
[[nodiscard]] system::result<void> assign(native_handle_type native_handle);
[[nodiscard]] system::result<void> open(std::string_view device);
// read & write some data. Yields the number of bytes transferred.
[[nodiscard]] write_op write_some(const_buffer_sequence buffer);
[[nodiscard]] read_op read_some (mutable_buffer_sequence buffer);
};
cobalt/io/endpoint.hpp
端点头文件提供了通用 asio 兼容的端点类型和协议。
protocol_type
struct protocol_type
{
using family_t = __unspecified__;
using type_t = __unspecified__;
using protocol_t = __unspecified__;
constexpr family_t family() const noexcept;
constexpr type_t type() const noexcept;
constexpr protocol_t protocol() const noexcept;
// explicitly construct protocol type.
constexpr explicit
protocol_type(family_t family = static_cast<family_t>(0),
type_t type = static_cast<type_t>(0),
protocol_t protocol = static_cast<protocol_t>(0)) noexcept;
// allows construction from other procols, e.g. asio
template<typename OtherProtocol>
requires requires (const OtherProtocol & op)
{
{static_cast<family_t>(op.family())};
{static_cast<type_t>(op.type())};
{static_cast<protocol_t>(op.protocol())};
}
constexpr protocol_type(const OtherProtocol & op) noexcept
// make the orderable
friend
constexpr auto operator<=>(const protocol_type & , const protocol_type &) noexcept = default;
};
protocol_type tcp_v4{AF_INET, SOCK_STREAM, IPPROTO_TCP};
// from asio
protocol_type tcp_v6 = boost::asio::ip::tcp::v6();
static_protocol
静态协议为编译时使用提供协议定义。
template<protocol_type::family_t Family = static_cast<protocol_type::family_t>(0),
protocol_type::type_t Type = static_cast<protocol_type::type_t>(0),
protocol_type::protocol_t Protocol = static_cast<protocol_type::protocol_t>(0)>
struct static_protocol
{
using family_t = protocol_type::family_t ;
using type_t = protocol_type::type_t ;
using protocol_t = protocol_type::protocol_t;
constexpr family_t family() const noexcept {return Family;};
constexpr type_t type() const noexcept {return Type;};
constexpr protocol_t protocol() const noexcept {return Protocol;};
using endpoint = io::endpoint;
};
constexpr static_protocol<...> ip;
constexpr static_protocol<...> ip_v4;
constexpr static_protocol<...> ip_v6;
constexpr static_protocol<...> tcp;
constexpr static_protocol<...> tcp_v4;
constexpr static_protocol<...> tcp_v6;
constexpr static_protocol<...> udp;
constexpr static_protocol<...> udp_v4;
constexpr static_protocol<...> udp_v6;
constexpr static_protocol<...> icmp;
constexpr static_protocol<...> local_stream;
constexpr static_protocol<...> local_datagram;
constexpr static_protocol<...> local_seqpacket;
constexpr static_protocol<...> local_protocol;
ip_address
struct ip_address
{
bool is_ipv6() const;
bool is_ipv4() const;
std::uint16_t port() const;
std::array<std::uint8_t, 16u> addr() const;
boost::static_string<45> addr_str() const;
};
struct ip_address_v4
{
std::uint16_t port() const;
std::uint32_t addr() const;
boost::static_string<15> addr_str() const;
};
struct ip_address_v6
{
std::uint16_t port() const;
std::array<std::uint8_t, 16u> addr() const;
boost::static_string<45> addr_str() const;
};
make_endpoint / get_endpoint
make_endpoint_tag
与 make_endpoint
函数的 tag_invoke
一起使用。用户可以定义其他 tag_invoke
函数来支持自定义端点。
template<protocol_type::family_t Family>
struct make_endpoint_tag {};
template<protocol_type::family_t Family>
struct get_endpoint_tag {};
提供以下功能
std::size_t tag_invoke(make_endpoint_tag<AF_UNIX>,
asio::detail::socket_addr_type* base,
std::string_view sv);
const local_endpoint* tag_invoke(get_endpoint_tag<AF_UNIX>,
protocol_type actual,
const endpoint::addr_type * addr);
std::size_t tag_invoke(make_endpoint_tag<AF_INET>,
asio::detail::socket_addr_type* base,
std::uint32_t address,
std::uint16_t port);
std::size_t tag_invoke(make_endpoint_tag<AF_INET>,
asio::detail::socket_addr_type* base,
std::string_view address,
std::uint16_t port);
const ip_address_v4* tag_invoke(get_endpoint_tag<AF_INET>,
protocol_type actual,
const endpoint::addr_type * addr);
std::size_t tag_invoke(make_endpoint_tag<AF_INET6>,
asio::detail::socket_addr_type* base,
std::span<std::uint8_t, 16> address,
std::uint16_t port);
std::size_t tag_invoke(make_endpoint_tag<AF_INET6>,
asio::detail::socket_addr_type* base,
std::string_view address,
std::uint16_t port);
const ip_address_v6* tag_invoke(get_endpoint_tag<AF_INET6>,
protocol_type actual,
const endpoint::addr_type * addr);
std::size_t tag_invoke(make_endpoint_tag<AF_UNSPEC>,
asio::detail::socket_addr_type* base,
std::string_view address,
std::uint16_t port);
const ip_address* tag_invoke(get_endpoint_tag<AF_UNSPEC>,
protocol_type actual,
const endpoint::addr_type * addr);
endpoint
端点函数持有并
struct endpoint
{
using storage_type = asio::detail::sockaddr_storage_type;
using addr_type = asio::detail::socket_addr_type;
void resize(std::size_t size);
void * data() {return &storage_; }
const void * data() const {return &storage_; }
std::size_t size() const {return size_;}
std::size_t capacity() const {return sizeof(storage_);}
void set_type (protocol_type::type_t type) { type_ = type;}
void set_protocol(protocol_type::protocol_t protocol) { protocol_ = protocol;}
protocol_type protocol() const;
endpoint() = default;
endpoint(const endpoint & ep);
// Construct a endpoint using make_endpoint_tag
template<protocol_type::family_t Family,
protocol_type::type_t Type,
protocol_type::protocol_t Protocol,
typename ... Args>
requires requires (make_endpoint_tag<Family> proto,
addr_type* addr, Args && ... args)
{
{tag_invoke(proto, addr, std::forward<Args>(args)...)} -> std::convertible_to<std::size_t>;
}
endpoint(static_protocol<Family, Type, Protocol> proto, Args && ... args);
// allows constructing an endpoint from an asio type
template<typename OtherEndpoint>
requires requires (OtherEndpoint oe)
{
{oe.protocol()} -> std::convertible_to<protocol_type>;
{oe.data()} -> std::convertible_to<void*>;
{oe.size()} -> std::convertible_to<std::size_t>;
}
endpoint(OtherEndpoint && oe);
};
class bad_endpoint_access : public std::exception
{
public:
bad_endpoint_access() noexcept = default;
char const * what() const noexcept;
};
// Uses `get_endpoint_tag<Protocol> to return a `pointer` to the result.
// Throws `bad_endpoint_access` if it's the wrong type
auto get(const endpoint & ep);
// Uses `get_endpoint_tag<Protocol> to return a `pointer` to the result.
template<static_protocol Protocol>
friend auto get_if(const endpoint * ep);
cobalt::io::endpoint ep{cobalt::io::tcp_v4, "127.0.0.1", 8080};
cobalt/io/socket.hpp
socket 类是所有 socket 的基类。
struct socket
{
[[nodiscard]] system::result<void> open(protocol_type prot = protocol_type {});
[[nodiscard]] system::result<void> close();
[[nodiscard]] system::result<void> cancel();
[[nodiscard]] bool is_open() const;
// asio acceptor compatibility
template<typename T>
struct rebind_executor {using other = socket;};
using shutdown_type = asio::socket_base::shutdown_type;
using wait_type = asio::socket_base::wait_type;
using message_flags = asio::socket_base::message_flags;
constexpr static int message_peek = asio::socket_base::message_peek;
constexpr static int message_out_of_band = asio::socket_base::message_out_of_band;
constexpr static int message_do_not_route = asio::socket_base::message_do_not_route;
constexpr static int message_end_of_record = asio::socket_base::message_end_of_record;
using native_handle_type = asio::basic_socket<protocol_type, executor>::native_handle_type;
native_handle_type native_handle();
// Drop the connection
[[nodiscard]] system::result<void> shutdown(shutdown_type = shutdown_type::shutdown_both);
// endpoint of a connected endpiotn
[[nodiscard]] system::result<endpoint> local_endpoint() const;
[[nodiscard]] system::result<endpoint> remote_endpoint() const;
system::result<void> assign(protocol_type protocol, native_handle_type native_handle);
system::result<native_handle_type> release();
/// socket options
[[nodiscard]] system::result<std::size_t> bytes_readable();
[[nodiscard]] system::result<void> set_debug(bool debug);
[[nodiscard]] system::result<bool> get_debug() const;
[[nodiscard]] system::result<void> set_do_not_route(bool do_not_route);
[[nodiscard]] system::result<bool> get_do_not_route() const;
[[nodiscard]] system::result<void> set_enable_connection_aborted(bool enable_connection_aborted);
[[nodiscard]] system::result<bool> get_enable_connection_aborted() const;
[[nodiscard]] system::result<void> set_keep_alive(bool keep_alive);
[[nodiscard]] system::result<bool> get_keep_alive() const;
[[nodiscard]] system::result<void> set_linger(bool linger, int timeout);
[[nodiscard]] system::result<std::pair<bool, int>> get_linger() const;
[[nodiscard]] system::result<void> set_receive_buffer_size(std::size_t receive_buffer_size);
[[nodiscard]] system::result<std::size_t> get_receive_buffer_size() const;
[[nodiscard]] system::result<void> set_send_buffer_size(std::size_t send_buffer_size);
[[nodiscard]] system::result<std::size_t> get_send_buffer_size() const;
[[nodiscard]] system::result<void> set_receive_low_watermark(std::size_t receive_low_watermark);
[[nodiscard]] system::result<std::size_t> get_receive_low_watermark() const;
[[nodiscard]] system::result<void> set_send_low_watermark(std::size_t send_low_watermark);
[[nodiscard]] system::result<std::size_t> get_send_low_watermark() const;
[[nodiscard]] system::result<void> set_reuse_address(bool reuse_address);
[[nodiscard]] system::result<bool> get_reuse_address() const;
[[nodiscard]] system::result<void> set_no_delay(bool reuse_address);
[[nodiscard]] system::result<bool> get_no_delay() const;
wait_op wait(wait_type wt = wait_type::wait_read);
// Connect to a specific endpoint
connect_op connect(endpoint ep);
// connect to one of the given endpoints. Returns the one connected to.
ranged_connect_op connect(endpoint_sequence ep);
protected:
// Adopt the under-specified endpoint. E.g. to tcp from an endpoint specified as ip_address
virtual void adopt_endpoint_(endpoint & ) {}
};
// Connect to sockets using the given protocol
system::result<void> connect_pair(protocol_type protocol, socket & socket1, socket & socket2);
cobalt/io/socket.hpp
用于流套接字(例如 TCP)的套接字类。
struct [[nodiscard]] stream_socket final : socket, stream
{
stream_socket(const cobalt::executor & executor = this_thread::get_executor());
stream_socket(stream_socket && lhs);
stream_socket(native_handle_type h, protocol_type protocol = protocol_type(),
const cobalt::executor & executor = this_thread::get_executor());
stream_socket(endpoint ep,
const cobalt::executor & executor = this_thread::get_executor());
write_op write_some(const_buffer_sequence buffer);
read_op read_some(mutable_buffer_sequence buffer);};
// Connect to sockets using the given protocol
inline system::result<std::pair<stream_socket, stream_socket>> make_pair(decltype(local_stream) protocol);
cobalt/io/socket.hpp
此套接字类实现数据报套接字,例如 UDP。
struct [[nodiscard]] datagram_socket final : socket
{
datagram_socket(const cobalt::executor & executor = this_thread::get_executor());
datagram_socket(datagram_socket && lhs);
datagram_socket(native_handle_type h, protocol_type protocol = protocol_type(),
const cobalt::executor & executor = this_thread::get_executor());
datagram_socket(endpoint ep,
const cobalt::executor & executor = this_thread::get_executor());
write_op send(const_buffer_sequence buffer);
read_op receive(mutable_buffer_sequence buffer);
};
// Create a pair of connected sockets.
inline system::result<std::pair<datagram_socket, datagram_socket>> make_pair(decltype(local_datagram) protocol);
cobalt/io/socket.hpp
用于 seq packet socket(例如 SCTP)的套接字类。
struct seq_packet_socket : socket
{
seq_packet_socket(const cobalt::executor & executor = this_thread::get_executor());
seq_packet_socket(seq_packet_socket && lhs);
seq_packet_socket(native_handle_type h, protocol_type protocol = protocol_type(),
const executor & executor = this_thread::get_executor());
seq_packet_socket(endpoint ep, const executor & executor = this_thread::get_executor());
receive_op receive(message_flags in_flags, message_flags& out_flags, mutable_buffer_sequence buffer);
send_op send(message_flags in_flags, const_buffer_sequence buffer);
receive_op receive(message_flags in_flags, mutable_buffer_sequence buffer);
};
// Connect to sockets using the given protocol
system::result<void> make_pair(decltype(local_seqpacket) protocol);
cobalt/io/resolver.hpp
解析器允许查找给定地址和服务的端点。
struct resolver
{
resolver(const executor & exec = this_thread::get_executor());
resolver(resolver && ) = delete;
void cancel();
// Produces a endpoint_sequence
[[nodiscard]] auto resolve(std::string_view host, std::string_view service);
};
// Short hand for performing a single lookup.
auto lookup(std::string_view host, std::string_view service,
const executor & exec = this_thread::get_executor());
cobalt/io/acceptor.hpp
接受器可用于接受连接。
struct acceptor
{
using wait_type = asio::socket_base::wait_type;
constexpr static std::size_t max_listen_connections = asio::socket_base::max_listen_connections;
acceptor(const cobalt::executor & executor = this_thread::get_executor());
acceptor(endpoint ep, const cobalt::executor & executor = this_thread::get_executor());
system::result<void> bind(endpoint ep);
system::result<void> listen(int backlog = max_listen_connections); // int backlog = net::max_backlog()
endpoint local_endpoint();
// accept any connection and assign it to `sock`
accept_op accept(socket & sock);
// Accept a connection of a stream_socket
template<protocol_type::family_t F = tcp.family(), protocol_type::protocol_t P = tcp.protocol()>
accept_op accept(static_protocol<F, tcp.type(), P> stream_proto = tcp);
// Accept a connection of a seq_packet
template<protocol_type::family_t F, protocol_type::protocol_t P>
accept_op accept(static_protocol<F, local_seqpacket.type(), P> stream_proto = tcp);
// For a connection to be ready
wait_op wait(wait_type wt = wait_type::wait_read);
};
cobalt/io/ssl.hpp
SSL 流是一种可以升级到 SSL 的流套接字。
namespace ssl
{
enum class verify
{
none = asio::ssl::verify_none,
peer = asio::ssl::verify_peer,
fail_if_no_peer_cert = asio::ssl::verify_fail_if_no_peer_cert,
client_once = asio::ssl::verify_client_once
};
using context = asio::ssl::context;
using verify_mode = asio::ssl::verify_mode;
struct stream final : socket, cobalt::io::stream, asio::ssl::stream_base
{
stream(context & ctx, const cobalt::executor & executor = this_thread::get_executor());
stream(context & ctx, native_handle_type h, protocol_type protocol = protocol_type(),
const cobalt::executor & executor = this_thread::get_executor());
stream(context & ctx, endpoint ep,
const cobalt::executor & executor = this_thread::get_executor());
write_op write_some(const_buffer_sequence buffer) override;
read_op read_some(mutable_buffer_sequence buffer) override;
// Indicates whether or not an ssl upgrade has been performed
[[nodiscard]] bool secure() const {return upgraded_;}
template<typename VerifyCallback>
system::result<void> set_verify_callback(VerifyCallback vc);
system::result<void> set_verify_depth(int depth);
system::result<void> set_verify_mode(verify depth);
[[nodiscard]] auto handshake(handshake_type type);
[[nodiscard]] auto handshake(handshake_type type, const_buffer_sequence buffer);
[[nodiscard]] auto shutdown();
};
}
构建脚本为该类创建了一个单独的库 (boost_cobalt_io_ssl),以便 boost_cobalt_io 可以在没有 OpenSSL 的情况下使用。 |
cobalt/experimental/context.hpp
这是未定义行为,因为它违反了标准中的前置条件。 |
此头文件提供 experimental
支持,用于将基于 boost.fiber
的有栈协程用作 C20 协程。也就是说,它们可以通过放入 coroutine_handle
来使用 awaitables
。同样,实现使用 C20 协程 promise,并像 C++20 协程一样运行。
//
void delay(experimental::context<promise<void>> h, std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
h.await(tim.async_wait(cobalt::use_op)); // instead of co_await.
}
cobalt::main co_main(int argc, char *argv[])
{
cobalt::promise<void> dl = cobalt::experimental::make_context(&delay, 50);
co_await dl;
co_return 0;
}
参考
// The internal coroutine context.
/// Args are the function arguments after the handle.
template<typename Return, typename ... Args>
struct context
{
// Get a handle to the promise
promise_type & promise();
const promise_type & promise() const;
// Convert it to any context if the underlying promise is the same
template<typename Return_, typename ... Args_>
constexpr operator context<Return_, Args_...>() const;
// Await something. Uses await_transform automatically.
template<typename Awaitable>
auto await(Awaitable && aw);
// Yield a value, if supported by the promise.
template<typename Yield>
auto yield(Yield && value);
};
// Create a fiber with a custom stack allocator (see boost.fiber for details) and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc && salloc, Args && ... args);
// Create a fiber with the default allocator and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func>
auto make_context(Func && func, Args && ... args);
// Create a fiber with a custom stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc && salloc, Args && ... args);
// Create a fiber with the default stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func>
auto make_context(Func && func, Args && ... args);
深入
自定义执行器
cobalt 默认使用 asio::any_io_executor
的原因之一是它是一种类型擦除的执行器,即您可以提供自己的事件循环而无需重新编译 cobalt
。
然而,在 Executor TS 的开发过程中,执行器概念变得有点反直觉,说得轻巧一些。
Ruben Perez 写了一篇很棒的 博客文章,我将从中无耻地借鉴。
定义
执行器是一种指向实际事件循环的类型,它(廉价地)可复制,支持属性(见下文),可进行相等比较,并具有 execute
函数。
execute
struct example_executor
{
template<typename Fn>
void execute(Fn && fn) const;
};
上述函数根据其属性执行 fn
。
属性
一个属性可以被查询、首选或必需,例如:
struct example_executor
{
// get a property by querying it.
asio::execution::relationship_t &query(asio::execution::relationship_t) const
{
return asio::execution::relationship.fork;
}
// require an executor with a new property
never_blocking_executor require(const execution::blocking_t::never_t);
// prefer an executor with a new property. the executor may or may not support it.
never_blocking_executor prefer(const execution::blocking_t::never_t);
// not supported
example_executor prefer(const execution::blocking_t::always_t);
};
asio::any_io_executor
的属性
为了将执行器包装在 asio::any_io_executor
中,需要两个属性
-
execution::context_t
-
execution::blocking_t::never_t
这意味着我们需要使它们可请求(这对上下文没有意义)或从 query
返回预期值。
execution::context_t
查询应返回 asio::execution_context&
,如下所示
struct example_executor
{
asio::execution_context &query(asio::execution::context_t) const;
};
执行上下文用于管理管理 IO 对象的服务(如 asio 的计时器和套接字)的生命周期。也就是说,通过提供此上下文,asio 的所有 IO 都将与它一起工作。
执行上下文在执行器销毁后必须保持活动状态。 |
以下内容可能更受青睐
-
execution::blocking_t::possibly_t
-
execution::outstanding_work_t::tracked_t
-
execution::outstanding_work_t::untracked_t
-
execution::relationship_t::fork_t
-
execution::relationship_t::continuation_
这意味着您可能希望在执行器中支持它们以进行优化。
blocking
属性
如前所述,此属性控制传递给 execute()
的函数是立即运行(作为 execute()
的一部分),还是必须排队等待稍后执行。可能的值有:
-
asio::execution::blocking.never
:永远不会将函数作为execute()
的一部分运行。这就是asio::post()
所做的。 -
asio::execution::blocking.possibly
:函数可能会或可能不会作为execute()
的一部分运行。这是默认值(调用io_context::get_executor
时获得的值)。 -
asio::execution::blocking.always
:函数总是作为execute()
的一部分运行。io_context::executor
不支持此功能。
relationship
属性
relationship
可以取两个值
-
asio::execution::relationship.continuation
:表示传递给execute()
的函数是调用execute()
的函数的延续。 -
asio::execution::relationship.fork
:与上述相反。这是默认值(调用io_context::get_executor()
时获得的值)。
将此属性设置为 continuation
可以优化函数的调度方式。它只在函数排队(而不是立即运行)时才有效。对于 io_context
,设置后,函数将调度在更快的线程局部队列中运行,而不是上下文全局队列。
outstanding_work_t
属性
outstanding_work
可以取两个值
-
asio::execution::outstanding_work.tracked
:表示在执行器存在期间,仍然有工作要做。 -
asio::execution::outstanding_work.untracked
:与上述相反。这是默认值(调用io_context::get_executor()
时获得的值)。
将此属性设置为tracked
意味着只要executor
存活,事件循环就不会返回。
最小执行器
有了这个,我们来看看一个最小执行器的接口。
struct minimal_executor
{
minimal_executor() noexcept;
asio::execution_context &query(asio::execution::context_t) const;
static constexpr asio::execution::blocking_t
query(asio::execution::blocking_t) noexcept
{
return asio::execution::blocking.never;
}
template<class F>
void execute(F && f) const;
bool operator==(minimal_executor const &other) const noexcept;
bool operator!=(minimal_executor const &other) const noexcept;
};
有关使用 Python asyncio 事件循环的实现,请参阅example/python.cpp。 |
添加工作守卫。
现在,让我们为outstanding_work
属性添加一个require
函数,它使用多种类型。
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const {return *this; }
};
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const {return *this;}
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const;
};
请注意,不必从require
函数返回不同的类型,也可以这样做
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor require(asio::execution::outstanding_work::untracked_t) const;
};
如果我们要使用prefer
,它将如下所示
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor prefer(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor prefer(asio::execution::outstanding_work::untracked_t) const;
};
总结
如您所见,属性系统并非微不足道,但功能强大。实现自定义执行器是一个它自己的问题类别,这就是本文档不这样做。相反,有一个关于如何将 Python 事件循环包装到执行器中的示例。
以下是一些阅读建议。
无栈
C++20 协程是无栈的,这意味着它们没有自己的栈。
C++ 中的栈描述了调用栈,即所有堆叠的函数帧。函数帧是函数操作所需的内存,即一块内存,用于存储其变量和诸如返回地址之类的信息。
函数帧的大小在编译时已知,但在包含其定义的编译单元之外则未知。 |
int bar() {return 0;} // the deepest point of the stack
int foo() {return bar();}
int main()
{
return foo();
}
上述示例中的调用栈是
main()
foo()
bar()

协程可以实现为有栈的,这意味着它会分配一个固定大小的内存块,并像线程一样堆叠函数帧。C++20 协程是无栈的,即它们只分配自己的帧,并在恢复时使用调用者的栈。使用我们之前的示例
fictional_eager_coro_type<int> example()
{
co_yield 0;
co_yield 1;
}
void nested_resume(fictional_eager_coro_type<int>& f)
{
f.resume();
}
int main()
{
auto f = example();
nested_resume(f);
f.reenter();
return 0;
}
这将产生一个类似于这样的调用栈
main()
f$example()
nested_resume()
f$example()
f$example()

如果协程跨线程移动,也适用同样的情况。
惰性 & 急切
如果协程仅在恢复后才开始执行其代码,则它是惰性的,而急切的协程会立即执行,直到它的第一个挂起点(即co_await
、co_yield
或co_return
表达式)。
lazy_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
lazy.resume();
printf("resumed twice\n");
return 0;
}
这将产生如下输出
enter main
constructed coro
Entered coro
resumed once
Coro Done
resumed twice

而一个急切的协程会是这样的
eager_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
return 0;
}
这将产生如下输出
enter main
Entered coro
constructed coro
resume once
Coro Done

基准测试
在第 11 代 Intel® Core™ i7-1185G7 @ 3.00GHz 上运行
发布到执行器
基准测试正在运行以下代码,使用 cobalt 的任务,asio::awaitable
和 `asio 的有栈协程(boost.context)。
cobalt::task<void> atest()
{
for (std::size_t i = 0u; i < n; i++)
co_await asio::post(cobalt::use_op);
}
gcc 12 | clang 16 | |
---|---|---|
钴 |
2472 |
2098 |
awaitable |
2432 |
2253 |
有栈 |
3655 |
3725 |
并行运行空操作协程
此基准测试使用大小为零的asio::experimental::channel
来并行读写。它使用 cobalt 的gather和asio::awaitable
中的awaitable_operator
。
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
for (std::size_t i = 0u; i < n; i++)
co_await cobalt::gather(
chan.async_send(system::error_code{}, cobalt::use_task),
chan.async_receive(cobalt::use_task));
}
asio::awaitable<void> awtest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
using boost::asio::experimental::awaitable_operators::operator&&;
for (std::size_t i = 0u; i < n; i++)
co_await (
chan.async_send(system::error_code{}, asio::use_awaitable)
&&
chan.async_receive(asio::use_awaitable));
}
gcc 12 | clang 16 | |
---|---|---|
钴 |
1563 |
1468 |
awaitable |
2800 |
2805 |
立即
此基准测试通过使用大小为 1 的通道来利用即时完成,以便每个操作都是即时的。
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 1u};
for (std::size_t i = 0u; i < n; i++)
{
co_await chan.async_send(system::error_code{}, cobalt::use_op);
co_await chan.async_receive(cobalt::use_op);
}
}
gcc 12 | clang 16 | |
---|---|---|
钴 |
1810 |
1864 |
awaitable |
3109 |
4110 |
有栈 |
3922 |
4705 |
通道
在此基准测试中,比较了 asio::experimental::channel 和 cobalt::channel。
这与并行测试类似,但使用cobalt::channel
代替。
gcc | clang | |
---|---|---|
钴 |
500 |
350 |
awaitable |
790 |
770 |
有栈 |
867 |
907 |
操作分配
此基准测试比较了异步操作关联分配器的不同可能解决方案
gcc | clang | |
---|---|---|
std::allocator |
1136 |
1139 |
cobalt::monotonic |
1149 |
1270 |
pmr::monotonic |
1164 |
1173 |
cobalt::sbo |
1021 |
1060 后者方法由 cobalt 内部使用。 |
要求
库
Boost.cobalt 需要 C++20 编译器,并直接依赖以下 Boost 库
-
boost.asio
-
boost.system
-
boost.circular_buffer
-
boost.intrusive
-
boost.smart_ptr
-
boost.container (用于 clang < 16)
编译器
此库自 Clang 14、Gcc 10 和 MSVC 19.30 (Visual Studio 2022) 起受支持。
Gcc 12.1 和 12.2 版本似乎对没有栈变量的协程存在一个错误,如[这里](https://godbolt.org/z/6adGcqP1z)所示,应避免在协程中使用。 |
Clang 只在 16 版中添加了std::pmr
支持,因此较旧的 Clang 版本使用boost::container::pmr
作为即时替换。
部分(如果不是全部)MSVC 版本有一个损坏的协程实现,此库需要进行变通。这可能会导致非确定性行为和开销。 |
协程的继续可以在从final_suspend
返回的 awaitable 中完成,如下所示
// in promise
auto final_suspend() noexcept
{
struct final_awaitable
{
std::coroutine_handle<void> continuation{std::noop_coroutine()}; (1)
bool await_ready() const noexcept;
std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> h) noexcept
{
auto cc = continuation;
h.destroy(); (2)
return cc;
}
void await_resume() noexcept {}
};
return final_awaitable{my_continuation};
};
1 | 延续 |
2 | 在继续之前自毁协程 |
在 MSVC 上,final_suspend 无法正确挂起协程,因此h.destroy()
将导致协程帧上的元素双重销毁。因此,MSVC 需要推迟销毁,使其离线进行。这将导致开销,并使实际内存释放变得非确定性。
致谢
如果没有 CppAlliance 及其创始人 Vinnie Falco,这个库是不可能实现的。Vinnie 信任我,让我从事这个项目,尽管他对这样的库应该如何设计有着非常不同的看法。
还要感谢 Ruben Perez 和 Richard Hodges 倾听我的设计问题并给我建议和用例。此外,如果没有 Chris Kohlhoff 优秀的 boost.asio,这个库也是不可能实现的。