概述
以下是 cobalt 中相关功能的列表
返回单个结果的急切协程 - 可以将其视为默认值 |
|
可以产生多个值的急切协程。 |
|
promise 的惰性版本,可以生成到其他执行器上。 |
|
类似于 promise 的协程,没有句柄 |
一个函数,以伪随机方式等待一组已准备好的协程中的一个,以避免饥饿。 |
|
一个函数,等待一组协程并将其全部作为值返回,或者如果任何可等待对象抛出异常,则抛出异常。 |
|
一个函数,等待一组协程并将其全部作为 |
|
一个确定的 |
一个线程本地实用程序,用于在协程之间发送值。 |
|
一个异步 RAII 助手,允许在发生异常时进行异步拆卸 |
C++ 协程简介 |
如果您以前从未使用过协程,请阅读 |
|
功能和概念的简要高级概述 |
如果您熟悉 asio 和协程,并且想要了解此库提供的功能,请阅读。 |
|
用法的低级视图 |
如果您想快速开始编码,请阅读 |
|
API 参考 |
在编码时查找详细信息 |
|
一些实现细节 |
如果您还没有感到困惑,请阅读 |
动机
许多编程语言(如 node.js 和 python)都提供了易于使用的单线程并发框架。虽然比同步代码更复杂,但单线程异步避免了多线程的许多陷阱和开销。
也就是说,一个协程可以工作,而其他协程等待事件(例如,来自服务器的响应)。这允许在单线程上编写同时执行多项操作的应用程序。
此库旨在为 C++ 提供此功能:简单的单线程异步,类似于 node.js 和 python 中的 asyncio,并且可以与现有库(如 boost.beast
、boost.mysql
或 boost.redis
)一起使用。它基于 boost.asio
。
它从其他语言中提取了一系列概念,并基于 C++20 协程提供它们。
与 asio::awaitable
和 asio::experimental::coro
不同,cobalt
协程是开放的。也就是说,asio::awaitable
只能等待和被其他 asio::awaitable
等待,并且不提供协程特定的同步机制。
另一方面,cobalt
提供了协程特定的 channel
和不同的等待类型(race
、gather
等),这些类型经过优化,可以与协程和可等待对象一起使用。
协程入门
异步编程
异步编程通常是指一种允许在后台运行任务,同时执行其他工作的编程风格。
想象一下,如果有一个 get-request 函数执行完整的 http 请求,包括连接和 ssl 握手等。
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
auto res = http_get("https://boost.ac.cn");
printf("%s", res.c_str());
return 0;
}
以上代码是传统的同步编程。如果我们想并行执行两个请求,我们需要创建另一个线程来运行另一个具有同步代码的线程。
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
std::string other_res;
std::thread thr{[&]{ other_res = http_get("https://cppalliance.org"); }};
auto res = http_get("https://boost.ac.cn");
thr.join();
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
这是可行的,但我们的程序将大部分时间用于等待输入。操作系统提供了允许异步执行 IO 的 API,而诸如 boost.asio 之类的库提供了管理异步操作的可移植方法。Asio 本身并没有规定处理完成的方法。此库 (boost.cobalt) 提供了一种通过协程/可等待对象管理所有这些的方法。
cobalt::promise<std::string> http_cobalt_get(std:string_view url);
cobalt::main co_main(int argc, char * argv[])
{
auto [res, other_res] =
cobalt::join(
http_cobalt_get(("https://boost.ac.cn"),
http_cobalt_get(("https://cppalliance.org")
);
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
在上面的代码中,用于执行请求的异步函数利用了操作系统 API,因此实际的 IO 不会阻塞。这意味着,当我们等待两个函数完成时,操作是交错的且非阻塞的。同时,cobalt 提供了使我们摆脱回调地狱的协程原语。
协程
协程是可恢复的函数。可恢复意味着一个函数可以暂停,即将控制权多次传递回调用者。
常规函数使用 return
函数将控制权返回给调用者,同时还会返回值。
另一方面,协程可能会将控制权返回给调用者并多次恢复。
协程有三个类似于 co_return 的控制关键字(其中只有 co_return
必须受支持)。
-
co_return
-
co_yield
-
co_await
co_return
这类似于 return
,但会将函数标记为协程。
co_await
co_await
表达式将挂起一个 可等待对象,即停止执行直到 awaitable
恢复它。
例如
cobalt::promise<void> delay(std::chrono::milliseconds);
cobalt::task<void> example()
{
co_await delay(std::chrono::milliseconds(50));
}
co_await
表达式可以根据它正在等待的内容产生一个值。
cobalt::promise<std::string> read_some();
cobalt::task<void> example()
{
std::string res = co_await read_some();
}
在 cobalt 中,大多数协程原语也是 可等待对象。 |
co_yield
co_yield
表达式类似于 co_await
,但它将控制权返回给调用者并携带一个值。
例如
cobalt::generator<int> iota(int max)
{
int i = 0;
while (i < max)
co_yield i++;
co_return i;
}
co_yield
表达式还可以产生一个值,这允许 yield 协程的用户将值推送到其中。
cobalt::generator<int> iota()
{
int i = 0;
bool more = false;
do
{
more = co_yield i++;
}
while(more);
co_return -1;
}
可等待对象
可等待对象是在 co_await
表达式中可以使用的类型。
struct awaitable_prototype
{
bool await_ready();
template<typename T>
see_below await_suspend(std::coroutine_handle<T>);
return_type await_resume();
};
如果存在可用的 operator co_await 调用,类型将隐式转换为可等待对象。本文档将使用 awaitable 来包含这些类型,并使用 "actual_awaitable" 来引用符合上述原型的类型。 |
在 co_await
表达式中,等待的协程将首先调用 await_ready
来检查协程是否需要挂起。准备就绪后,它会直接进入 await_resume
以获取值,因为不需要挂起。否则,它将挂起自身并使用 std::coroutine_handle
调用 await_suspend
到自己的 promise。
std::coroutine_handle<void> 可用于类型擦除。 |
return_type 是 co_await 表达式
的结果类型,例如 int
int i = co_await awaitable_with_int_result();
await_suspend
的返回类型可以是三种
-
void
-
bool
-
std::coroutine_handle<U>
如果它是 void,则等待的协程保持挂起。如果它是 bool
,则将检查该值,如果为 false,则等待的协程将立即恢复。
如果返回 std::coroutine_handle
,则将恢复此协程。后者允许 await_suspend
返回传入的句柄,实际上与返回 false
相同。
如果等待的协程立即重新恢复,即在调用 await_resume 之后,则在本库中称为“立即完成”。这不应与非挂起的可等待对象混淆,即从 await_ready
返回 true
的对象。
事件循环
由于 cobalt
中的协程可以 co_await
事件,因此它们需要在事件循环上运行。也就是说,另一段代码负责跟踪未完成的事件并恢复正在等待它们的恢复协程。这种模式非常常见,并且在 node.js 或 python 的 asyncio
中以类似的方式使用。
cobalt
使用 asio::io_context
作为其默认事件循环。也就是说,类 thread、main 和 run 函数在内部使用它。
你可以使用任何可以产生 asio::any_io_executor
的事件循环来配合这个库。最简单的方法是使用 spawn。
事件循环通过执行器(遵循 asio 的术语)访问,并且可以使用 set_executor 手动设置。
导览
进入 cobalt 环境
为了使用 awaitables,我们需要能够 co_await
它们,即处于协程中。
我们有四种方法可以实现这一点:
- cobalt/main.hpp
-
使用协程替换
int main
cobalt::main co_main(int argc, char* argv[])
{
// co_await things here
co_return 0;
}
- cobalt/thread.hpp
-
为异步环境创建一个线程
cobalt::thread my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
auto t = my_thread();
t.join();
return 0;
}
- cobalt/task.hpp
-
创建一个任务并运行或派生它
cobalt::task<void> my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
cobalt::run(my_task()); // sync
asio::io_context ctx;
cobalt::spawn(ctx, my_task(), asio::detached);
ctx.run();
return 0;
}
Promise
Promises 是推荐的默认协程类型。它们是即时的,因此易于用于临时的并发。
cobalt::promise<int> my_promise()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// start the promise here
auto p = my_promise();
// do something else here
co_await do_the_other_thing();
// wait for the promise to complete
auto res = co_await p;
co_return res;
}
Task
Tasks 是惰性的,这意味着它们在被等待或派生之前不会执行任何操作。
cobalt::task<int> my_task()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the task here
auto t = my_task();
// do something else here first
co_await do_the_other_thing();
// start and wait for the task to complete
auto res = co_await t;
co_return res;
}
生成器
generator 是 cobalt 中唯一可以 co_yield
值的类型。
Generator 默认是即时的。与 std::generator 不同,cobalt::generator
可以 co_await
,因此是异步的。
cobalt::generator<int> my_generator()
{
for (int i = 0; i < 10; i++)
co_yield i;
co_return 10;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator();
while (g)
printf("Generator %d\n", co_await g);
co_return 0;
}
值可以被推入生成器,这些值将从 co_yield
返回。
cobalt::generator<double, int> my_eager_push_generator(int value)
{
while (value != 0)
value = co_yield value * 0.1;
co_return std::numeric_limits<double>::quiet_NaN();
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(5);
assert(0.5 == co_await g(4)); // result of 5
assert(0.4 == co_await g(3)); // result of 4
assert(0.3 == co_await g(2)); // result of 3
assert(0.2 == co_await g(1)); // result of 2
assert(0.1 == co_await g(0)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
也可以使用 this_coro::initial
将协程设为惰性。
cobalt::generator<double, int> my_eager_push_generator()
{
auto value = co_await this_coro::initial;
while (value != 0)
value = co_yield value * 0.1;
co_return std::numeric_limits<double>::quiet_NaN();
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(); // lazy, so the generator waits for the first pushed value
assert(0.5 == co_await g(5)); // result of 5
assert(0.4 == co_await g(4)); // result of 4
assert(0.3 == co_await g(3)); // result of 3
assert(0.2 == co_await g(2)); // result of 2
assert(0.1 == co_await g(1)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
join
如果多个 awaitables 并行工作,可以使用 join 同时等待它们。
cobalt::promise<int> some_work();
cobalt::promise<double> more_work();
cobalt::main co_main(int argc, char * argv[])
{
std::tuple<int, double> res = cobalt::join(some_work(), more_work());
co_return 0;
}
race
如果多个 awaitables 并行工作,但我们希望在其中任何一个完成时收到通知,则应使用 race。
cobalt::generator<int> some_data_source();
cobalt::generator<double> another_data_source();
cobalt::main co_main(int argc, char * argv[])
{
auto g1 = some_data_source();
auto g2 = another_data_source();
int res1 = co_await g1;
double res2 = co_await g2;
printf("Result: %f", res1 * res2);
while (g1 && g2)
{
switch(variant2::variant<int, double> nx = co_await cobalt::race(g1, g2))
{
case 0:
res1 = variant2::get<0>(nx);
break;
case 1:
res2 = variant2::get<1>(nx);
break;
}
printf("New result: %f", res1 * res2);
}
co_return 0;
}
在这种情况下,race 不会导致任何数据丢失。 |
教程
delay
让我们从最简单的示例开始:一个简单的延迟。
cobalt::main co_main(int argc, char * argv[]) (1)
{
asio::steady_timer tim{co_await asio::this_coro::executor, (2)
std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
co_await tim.async_wait(cobalt::use_op); (4)
co_return 0; (5)
}
1 | 当使用 co_main 函数时,它会定义一个隐式的 main 函数,并且是设置环境以运行异步代码的最简单方法。 |
2 | 从当前协程 promise 获取执行器。 |
3 | 使用参数设置超时时间。 |
4 | 通过使用 cobalt::use_op 执行等待。 |
5 | 返回一个从隐式 main 返回的值。 |
在这个例子中,我们使用了 cobalt/main.hpp 头文件,如果定义了上面的 co_main
,它会提供一个 main 协程。这有几个优点:
-
环境设置正确(
executor
&memory
) -
asio 被告知上下文是单线程的
-
一个带有
SIGINT
&SIGTERM
的asio::signal_set
会自动连接到取消操作(即Ctrl+C
会导致取消)
然后,此协程在其 promise 中有一个执行器(promise 是 C++ 中协程状态的名称。不要与 cobalt/promise.hpp 混淆),我们可以通过 this_coro 命名空间中的虚拟 awaitable 获取该执行器。
echo 服务器
我们将到处使用 use_op
(asio completion)token,所以我们使用一个 默认完成 token,这样我们就可以跳过最后一个参数。
namespace cobalt = boost::cobalt;
using boost::asio::ip::tcp;
using boost::asio::detached;
using tcp_acceptor = cobalt::use_op_t::as_default_on_t<tcp::acceptor>;
using tcp_socket = cobalt::use_op_t::as_default_on_t<tcp::socket>;
namespace this_coro = boost::cobalt::this_coro;
cobalt::promise<void> echo(tcp_socket socket)
{
try (1)
{
char data[4096];
while (socket.is_open()) (2)
{
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data)); (3)
co_await async_write(socket, boost::asio::buffer(data, n)); (4)
}
}
catch (std::exception& e)
{
std::printf("echo: exception: %s\n", e.what());
}
}
1 | 当使用 use_op 完成 token 时,I/O 错误会被转换为 C++ 异常。此外,如果协程被取消(例如,因为用户按下了 Ctrl-C),也会引发异常。在这些条件下,我们会打印错误并退出循环。 |
2 | 我们运行循环直到被取消(异常)或用户关闭连接。 |
3 | 尽可能多地读取。 |
4 | 写入所有读取的字节。 |
注意,promise 是即时的。调用 echo
会立即执行代码,直到 async_read_some
,然后将控制权返回给调用者。
接下来,我们还需要一个 acceptor 函数。在这里,我们使用 generator 来管理 acceptor 状态。这是一个可以多次 co_awaited 的协程,直到到达 co_return
表达式。
cobalt::generator<tcp_socket> listen()
{
tcp_acceptor acceptor({co_await cobalt::this_coro::executor}, {tcp::v4(), 55555});
for (;;) (1)
{
tcp_socket sock = co_await acceptor.async_accept(); (2)
co_yield std::move(sock); (3)
}
co_return tcp_socket{acceptor.get_executor()}; (4)
}
1 | 取消操作也会导致此处从 co_await 抛出异常 |
2 | 异步接受连接 |
3 | 将其 yield 给等待协程 |
4 | co_return 一个值以符合 C++ 规范。 |
有了这两个函数,我们现在可以编写服务器了
cobalt::promise<void> run_server(cobalt::wait_group & workers)
{
auto l = listen(); (1)
while (true)
{
if (workers.size() == 10u)
co_await workers.wait_one(); (2)
else
workers.push_back(echo(co_await l)); (3)
}
}
1 | 构造监听器生成器协程。当对象被销毁时,协程将被取消,执行所有必需的清理操作。 |
2 | 当我们有超过 10 个 worker 时,我们等待一个 worker 完成 |
3 | 接受新连接并启动它。 |
wait_group 用于管理正在运行的 echo 函数。这个类将取消并等待正在运行的 echo
协程。
我们不需要对 listener
执行相同的操作,因为它会在 l
被销毁时自行停止。生成器的析构函数会取消它。
由于 promise
是即时的,因此只需调用它就足以启动。然后,我们将这些 promise 放入一个 wait_group 中,这使我们可以在作用域退出时关闭所有 worker。
cobalt::main co_main(int argc, char ** argv)
{
co_await cobalt::with(cobalt::wait_group(), &run_server); (1)
co_return 0u;
}
1 | 在异步作用域中运行 run_server 。 |
上面显示的 with 函数将使用诸如 wait_group 之类的资源运行函数。在作用域退出时,with
将调用并 co_await
一个异步的拆卸函数。这会导致所有连接在 co_main
存在之前被正确关闭。
价格行情
为了演示 channels
和其他工具,我们需要一定的复杂性。为此,我们的项目是一个价格行情工具,它连接到 https://blockchain.info。然后,用户可以连接到 localhost 以查询给定的货币对,例如这样
wscat -c localhost:8080/btc/usd
首先,我们执行与 echo-server 相同的声明。
using executor_type = cobalt::use_op_t::executor_with_default<cobalt::executor>;
using socket_type = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;
下一步是编写一个函数来连接 ssl-stream,以上游连接
cobalt::promise<asio::ssl::stream<socket_type>> connect(
std::string host, boost::asio::ssl::context & ctx)
{
asio::ip::tcp::resolver res{cobalt::this_thread::get_executor()};
auto ep = co_await res.async_resolve(host, "https", cobalt::use_op); (1)
asio::ssl::stream<socket_type> sock{cobalt::this_thread::get_executor(), ctx};
co_await sock.next_layer().async_connect(*ep.begin()); (2)
co_await sock.async_handshake(asio::ssl::stream_base::client); (3)
co_return sock; (4)
}
1 | 查找主机 |
2 | 连接到端点 |
3 | 执行 ssl 握手 |
4 | 将 socket 返回给调用者 |
接下来,我们需要一个函数在现有的 ssl-stream 上执行 websocket 升级。
cobalt::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
ws.set_option(beast::websocket::stream_base::decorator(
[](beast::websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " cobalt-ticker");
req.set(http::field::origin,
"https://exchange.blockchain.com"); (1)
}));
co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 | blockchain.info 要求设置此标头。 |
2 | 执行 websocket 握手。 |
一旦 websocket 连接,我们希望持续接收 json 消息,为此,生成器是一个不错的选择。
cobalt::generator<json::object> json_reader(websocket_type & ws)
try
{
beast::flat_buffer buf;
while (ws.is_open()) (1)
{
auto sz = co_await ws.async_read(buf); (2)
json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
auto obj = json::parse(data);
co_yield obj.as_object(); (3)
buf.consume(sz);
}
co_return {};
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | 只要 socket 打开就保持运行 |
2 | 从 websocket 读取一个帧 |
3 | 解析并将其作为对象 co_yield 。 |
然后,这需要连接到订阅者,为此,我们将利用通道来传递原始 json。为了简化生命周期管理,订阅者将保留一个 shared_ptr
,而生产者保留一个 weak_ptr
。
using subscription = std::pair<std::string, std::weak_ptr<cobalt::channel<json::object>>>;
using subscription_channel = std::weak_ptr<cobalt::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;
运行区块链连接器的主函数操作两个输入:来自 websocket 的数据和一个用于处理新订阅的通道。
cobalt::promise<void> run_blockchain_info(cobalt::channel<subscription> & subc)
try
{
asio::ssl::context ctx{asio::ssl::context_base::tls_client};
websocket_type ws{co_await connect("blockchain.info", ctx)};
co_await connect_to_blockchain_info(ws); (1)
subscription_map subs;
std::list<std::string> unconfirmed;
auto rd = json_reader(ws); (2)
while (ws.is_open()) (3)
{
switch (auto msg = co_await cobalt::race(rd, subc.read()); msg.index()) (4)
{
case 0: (5)
if (auto ms = get<0>(msg);
ms.at("event") == "rejected") // invalid sub, cancel however subbed
co_await handle_rejections(unconfirmed, subs, ms);
else
co_await handle_update(unconfirmed, subs, ms, ws);
break;
case 1: // (6)
co_await handle_new_subscription(
unconfirmed, subs,
std::move(get<1>(msg)), ws);
break;
}
}
for (auto & [k ,c] : subs)
{
if (auto ptr = c.lock())
ptr->close();
}
}
catch(std::exception & e)
{
std::cerr << "Exception: " << e.what() << std::endl;
throw;
}
1 | 初始化连接 |
2 | 实例化 json_reader |
3 | 只要 websocket 打开就运行 |
4 | 选择,即等待新的 json 消息或订阅 |
5 | 当它是 json 时,处理更新或拒绝 |
6 | 处理新的订阅消息 |
对于 cobalt
功能而言,handle_*
函数的内容并不重要,因此在本教程中跳过它。
handle_new_subscription
函数向 blockchain.info
发送一条消息,它将发回确认或拒绝。handle_rejection
和 handle_update
将获取 json 值并将它们转发到订阅通道。
在消费者端,我们的服务器只会将数据转发给客户端。如果客户端输入数据,我们将立即关闭 websocket。我们使用 as_tuple
来忽略潜在的错误。
cobalt::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
system::error_code ec;
co_await st.async_read(buf, asio::as_tuple(cobalt::use_op));
co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(cobalt::use_op));
st.next_layer().close(ec);
}
接下来,我们运行用户发送的会话
cobalt::promise<void> run_session(beast::websocket::stream<socket_type> st,
cobalt::channel<subscription> & subc)
try
{
http::request<http::empty_body> req;
beast::flat_buffer buf;
co_await http::async_read(st.next_layer(), buf, req); (1)
// check the target
auto r = urls::parse_uri_reference(req.target());
if (r.has_error() || (r->segments().size() != 2u)) (2)
{
http::response<http::string_body> res{http::status::bad_request, 11};
res.body() = r.has_error() ? r.error().message() :
"url needs two segments, e.g. /btc/usd";
co_await http::async_write(st.next_layer(), res);
st.next_layer().close();
co_return ;
}
co_await st.async_accept(req); (3)
auto sym = std::string(r->segments().front()) + "-" +
std::string(r->segments().back());
boost::algorithm::to_upper(sym);
// close when data gets sent
auto p = read_and_close(st, std::move(buf)); (4)
auto ptr = std::make_shared<cobalt::channel<json::object>>(1u); (5)
co_await subc.write(subscription{sym, ptr}); (6)
while (ptr->is_open() && st.is_open()) (7)
{
auto bb = json::serialize(co_await ptr->read());
co_await st.async_write(asio::buffer(bb));
}
co_await st.async_close(beast::websocket::close_code::going_away,
asio::as_tuple(cobalt::use_op)); (8)
st.next_layer().close();
co_await p; (9)
}
catch(std::exception & e)
{
std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 | 读取 http 请求,因为我们需要路径 |
2 | 检查路径,例如 /btc/usd 。 |
3 | 接受 websocket |
4 | 如果消费者发送了任何内容,则开始读取并关闭 |
5 | 创建通道以接收更新 |
6 | 向 run_blockchain_info 发送订阅请求 |
7 | 当通道和 websocket 打开时,我们正在转发数据。 |
8 | 关闭 socket 并忽略错误 |
9 | 由于 websocket 此时肯定已关闭,请等待 read_and_close 关闭。 |
编写完 run_session
和 run_blockchain_info
后,我们可以继续进行 main
cobalt::main co_main(int argc, char * argv[])
{
acceptor_type acc{co_await cobalt::this_coro::executor,
asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
std::cout << "Listening on localhost:8080" << std::endl;
constexpr int limit = 10; // allow 10 ongoing sessions
cobalt::channel<subscription> sub_manager; (1)
co_await join( (2)
run_blockchain_info(sub_manager),
cobalt::with( (3)
cobalt::wait_group(
asio::cancellation_type::all,
asio::cancellation_type::all),
[&](cobalt::wait_group & sessions) -> cobalt::promise<void>
{
while (!co_await cobalt::this_coro::cancelled) (4)
{
if (sessions.size() >= limit) (5)
co_await sessions.wait_one();
auto conn = co_await acc.async_accept(); (6)
sessions.push_back( (7)
run_session(
beast::websocket::stream<socket_type>{std::move(conn)},
sub_manager));
}
})
);
co_return 0;
}
1 | 创建通道以管理订阅 |
2 | 使用 join 并行运行两个任务。 |
3 | 使用钴作用域来提供 wait_group 。 |
4 | 运行直到被取消。 |
5 | 当我们达到 limit 时,我们等待一个任务完成。 |
6 | 等待新连接。 |
7 | 将会话插入到 wait_group 中。 |
Main 使用 join
,因为一个任务失败应该取消另一个任务。
delay op
到目前为止,我们使用了 use_op
来使用基于 asio 完成 token 机制的隐式操作。
但是,我们可以实现自己的操作,这些操作也可以利用 await_ready
优化。与立即完成不同,当 await_ready
返回 true 时,协程永远不会挂起。
为了利用此协程功能,cobalt
提供了一种创建可跳过操作的简单方法
struct wait_op final : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
void ready(cobalt::handler<system::error_code> h ) override (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) override (3)
{
tim.async_wait(std::move(complete));
}
};
cobalt::main co_main(int argc, char * argv[])
{
asio::steady_timer tim{co_await asio::this_coro::executor,
std::chrono::milliseconds(std::stoi(argv[1]))};
co_await wait_op(tim); (4)
co_return 0; //
}
1 | 声明 op。我们继承 op 使其可等待。 |
2 | 预挂起检查在这里实现 |
3 | 如果需要,执行等待 |
4 | 像任何其他 awaitable 一样使用 op。 |
这样,我们可以最大限度地减少协程挂起的次数。
虽然以上用法是配合 asio 使用的,但你也可以将这些处理程序用于任何其他基于回调的代码。
带推送值的生成器
带有推送值的协程并不常见,但可以显著简化某些问题。
由于我们在之前的示例中已经有了一个 json_reader,下面是如何编写一个可以接收推送值的 json_writer。
使用生成器的优势在于其内部状态管理。
cobalt::generator<system::error_code, json::object>
json_writer(websocket_type & ws)
try
{
char buffer[4096];
json::serializer ser;
while (ws.is_open()) (1)
{
auto val = co_yield system::error_code{}; (2)
while (!ser.done())
{
auto sv = ser.read(buffer);
co_await ws.cobalt_write({sv.data(), sv.size()}); (3)
}
}
co_return {};
}
catch (system::system_error& e)
{
co_return e.code();
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | 只要 socket 打开就保持运行 |
2 | co_yield 当前错误并检索一个新值。 |
3 | 向 websocket 写入一个帧。 |
现在我们可以像这样使用生成器
auto g = json_writer(my_ws);
extern std::vector<json::value> to_write;
for (auto && tw : std::move(to_write))
{
if (auto ec = co_await g(std::move(tw)))
return ec; // yield error
}
高级示例
更多示例仅以代码形式在存储库中提供。所有示例都列在下面。
执行单个 HTTP GET 请求的 HTTP 客户端。 |
|
使用 |
|
使用 nanobind 将 cobalt 与 Python 集成。它使用 Python 的 asyncio 作为执行器,并允许 C++ 协程等待 Python 函数,反之亦然。 |
|
将 |
|
创建一个基于 |
|
将工作线程与 |
|
使用 |
|
延迟 部分使用的示例 |
|
延迟操作 部分使用的示例 |
|
回显服务器 部分使用的示例 |
|
价格行情 部分使用的示例 |
|
通道 参考部分使用的示例 |
设计
概念
这个库有两个基本概念
-
协程
可等待对象 是一个可以在协程内部与 co_await
一起使用的表达式,例如:
co_await delay(50ms);
但是,协程的 promise 可以定义一个 await_transform
,即,实际可以与 co_await
表达式一起使用的内容取决于协程。
因此,我们应该重新定义 可等待对象 的含义:**可等待对象** 是一个可以从不定义 await_transform
的协程 promise 中使用 co_await
的类型。
伪关键字
是可以在协程中使用的类型,由于其 promise 的 await_transform
,它为协程添加了特殊功能。
this_coro 命名空间中的所有动词都是这样的伪关键字。
auto exec = co_await this_coro::executor;
此库公开了一组用于 promise 的 enable_* 基类,以简化自定义协程的创建。这包括 enable_awaitables,它提供了一个 await_transform ,可以转发 可等待对象。 |
本文档中的协程指的是异步协程,即,不考虑像 std::generator 这样的同步协程。
执行器
由于所有内容都是异步的,因此该库需要使用事件循环。由于所有内容都是单线程的,因此可以假设每个线程只有一个执行器,这足以满足 97% 的用例。因此,有一个 thread_local
执行器被协程对象用作默认执行器(尽管以复制方式存储在协程 promise 中)。
同样,该库使用一种 executor
类型,默认为 asio::any_io_executor
。
如果你编写自己的协程,它应该持有执行器的副本,并具有一个通过常量引用返回执行器的 get_executor 函数。 |
使用 Strand
虽然可以使用 strand,但它们与 thread_local
执行器不兼容。这是因为它们可能会切换线程,因此它们不能是 thread_local
。
在 通道 的情况下,这是一个构造函数参数,但对于其他协程类型,需要使用 asio::executor_arg
。这是通过在协程的参数列表中直接使用 asio::executor_arg_t
(在某个位置)后跟要使用的执行器来完成的,例如:
cobalt::promise<void> example_with_executor(int some_arg, asio::executor_arg_t, cobalt::executor);
这样,协程的 promise 可以从第三个参数中获取执行器,而不是默认使用 thread_local 执行器。
当然,如果有时使用 thread_local 执行器,则可以默认参数,以使其不那么不方便。
cobalt::promise<void> example_with_executor(int some_arg,
asio::executor_arg_t = asio::executor_arg,
cobalt::executor = cobalt::this_thread::get_executor());
如果在 strand 上省略此操作,则会抛出 asio::bad_allocator
类型的异常,或者更糟糕的是,会使用错误的执行器。
多态内存资源
类似地,该库使用 thread_local 的 pmr::memory_resource
来分配协程帧以及用作异步操作的分配器。
原因是,用户可能希望自定义分配,例如,避免锁、限制内存使用或监视使用情况。pmr
允许我们在不引入不必要的模板参数的情况下实现此目的,即,没有 promise<T, Allocator>
的复杂性。但是,使用 pmr
确实会引入一些最小的开销,因此用户可以选择通过定义 BOOST_COBALT_NO_PMR
来禁用它。
op 使用针对 asio 的分配器用法优化的内部资源,而 gather、race 和 join 使用单调资源来最小化分配。两者仍然可以使用定义的 BOOST_COBALT_NO_PMR
,在这种情况下,它们将使用 new/delete
作为上游分配。
如果你编写自己的协程,它应该具有一个返回 pmr::polymorphic_allocator<void> 的 get_allocator 函数。 |
取消
cobalt 使用基于 asio::cancellation_signal
的隐式取消。这主要隐式使用(例如,与 race 一起使用),因此在示例中很少有显式使用。
如果你编写自定义协程,它必须从 get_cancellation_slot 函数返回一个 cancellation_slot ,以便能够取消其他操作。 |
如果你编写自定义的可等待对象,则可以在 await_suspend 中使用该函数来接收取消信号。 |
Promise
主要的协程类型是 promise
,它是急切的。默认为此类型的原因是,编译器可以优化掉不挂起的 promise,如下所示:
cobalt::promise<void> noop()
{
co_return;
}
理论上,等待上述操作是空操作,但实际上,到 2023 年为止,编译器还没有达到这种程度。
Race
最重要的同步机制是 race
函数。
它以伪随机顺序等待多个 可等待对象,并将返回第一个完成的结果,然后忽略其余的结果。
也就是说,它以伪随机顺序启动 co_await
,并在发现一个可等待对象准备就绪或立即完成时停止。
cobalt::generator<int> gen1();
cobalt::generator<double> gen2();
cobalt::promise<void> p()
{
auto g1 = gen1();
auto g2 = gen2();
while (!co_await cobalt::this_coro::cancelled)
{
switch(auto v = co_await race(g1, g2); v.index())
{
case 0:
printf("Got int %d\n", get<0>(v));
break;
case 1:
printf("Got double %f\n", get<1>(v));
break;
}
}
}
race
是触发取消的首选方式,例如:
cobalt::promise<void> timeout();
cobalt::promise<void> work();
race(timeout(), work());
interrupt_await
如果它天真地取消,则会丢失数据。因此,引入了 interrupt_await
的概念,该概念告诉可等待对象(支持它的)立即恢复等待器并返回或抛出忽略的值。
struct awaitable
{
bool await_ready() const;
template<typename Promise>
std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
T await_resume();
void interrupt_await() &;
};
如果 interrupt_await
没有导致立即恢复(h
),则 race
将发送取消信号。
race
使用正确的引用限定符应用这些
auto g = gen1();
race(g, gen2());
如果可用,以上代码将为 g1
调用 interrupt_await() &
函数,为 g2
调用 interrupt_await() &&
函数。
一般来说,cobalt 中的协程支持左值中断,即 interrupt_await() & 。通道 操作是不限定的,即在两种情况下都有效。 |
关联器
cobalt
使用 asio 的 关联器
概念,但对其进行了简化。也就是说,它有三个关联器,它们是等待 promise 的成员函数。
-
const executor_type & get_executor()
(始终为executor
,必须按常量引用返回) -
allocator_type get_allocator()
(始终为pmr::polymorphic_allocator<void>
) -
cancellation_slot_type get_cancellation_slot()
(必须具有与asio::cancellation_slot
相同的 IF)
cobalt
使用概念来检查这些概念是否存在于其 await_suspend
函数中。
这样,自定义协程可以支持取消、执行器和分配器。
在自定义的可等待对象中,你可以像这样获取它们
struct my_awaitable
{
bool await_ready();
template<typename T>
void await_suspend(std::corutine_handle<P> h)
{
if constexpr (requires (Promise p) {p.get_executor();})
handle_executor(h.promise().get_executor();
if constexpr (requires (Promise p) {p.get_cancellation_slot();})
if ((cl = h.promise().get_cancellation_slot()).is_connected())
cl.emplace<my_cancellation>();
}
void await_resume();
};
取消在 co_await
表达式中连接(如果协程和可等待对象支持),包括像 race 这样的同步机制。
线程
主要的技术原因是,切换协程最有效的方法是从 await_suspend
返回新协程的句柄,如下所示:
struct my_awaitable
{
bool await_ready();
std::coroutine_handle<T> await_suspend(std::coroutine_handle<U>);
void await_resume();
};
在这种情况下,等待协程将在调用 await_suspend 之前挂起,并且恢复返回的协程。如果我们需要通过执行器,这当然不起作用。
这不仅适用于等待的协程,也适用于通道。此库中的通道使用可等待对象的侵入式列表,并且可能会从写操作的 await_suspend
返回读取(因此挂起)协程的句柄。
参考
cobalt/main.hpp
开始使用钴应用程序的最简单方法是使用具有以下签名的 co_main
函数
cobalt::main co_main(int argc, char *argv[]);
声明 co_main
将添加一个 main
函数,该函数执行在事件循环上运行协程的所有必要步骤。这使我们可以编写非常简单的异步程序。
cobalt::main co_main(int argc, char *argv[])
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
1 | 获取 main 运行的执行器 |
2 | 将其与 asio 对象一起使用 |
3 | co_await 一个钴操作 |
主 promise 将创建一个 asio::signal_set
并将其用于取消。SIGINT
变为全部取消,而 SIGTERM
变为终端取消。
取消不会转发到分离的协程。用户需要注意在取消时结束它们,否则程序不允许正常终止。 |
执行器
它还会创建一个要运行的 asio::io_context
,你可以通过 this_coro::executor
获取它。它将被分配给 cobalt::this_thread::get_executor()
。
内存资源
它还会创建一个内存资源,该资源将用作内部内存分配的默认值。它将被分配给 thread_local
到 cobalt::this_thread::get_default_resource()
。
Promise
每个协程都有一个内部状态,称为 promise
(不要与 cobalt::promise
混淆)。根据协程的属性,可以 co_await
不同的内容,就像我们在上面的示例中使用的一样。
它们通过继承来实现,并在不同的 promise 类型之间共享
主 promise 具有以下属性。
规范
-
声明
co_main
将隐式声明一个main
函数 -
仅当定义了
co_main
时,才存在main
。 -
SIGINT
和SIGTERM
将导致取消内部任务。
cobalt/promise.hpp
promise 是一个急切的协程,可以 co_await
和 co_return
值。也就是说,它不能使用 co_yield
。
cobalt::promise<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
promise 默认是附加的。这意味着,当 promise
句柄超出范围时,会发送取消信号。
可以通过调用 detach
或使用前缀 +
运算符来分离 Promise。这是一种在运行时替代使用 detached 的方法。分离的 Promise 在销毁时不会发送取消信号。
cobalt::promise<void> my_task();
cobalt::main co_main(int argc, char *argv[])
{
+my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
1 | 通过使用 + ,任务会被分离。如果没有它,编译器会生成一个 nodiscard 警告。 |
执行器
执行器从 thread_local
的 get_executor 函数获取,除非在任何位置使用了 asio::executor_arg
,后跟执行器参数。
cobalt::promise<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
内存资源
内存资源从 thread_local
的 get_default_resource 函数获取,除非在任何位置使用了 std::allocator_arg
,后跟 polymorphic_allocator
参数。
cobalt::promise<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
概述
template<typename Return>
struct [[nodiscard]] promise
{
promise(promise &&lhs) noexcept;
promise& operator=(promise && lhs) noexcept;
// enable `co_await`. (1)
auto operator co_await ();
// Ignore the return value, i.e. detach it. (2)
void operator +() &&;
// Cancel the promise.
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if the result is ready
bool ready() const;
// Check if the promise can be awaited.
explicit operator bool () const; (3)
// Detach or attach
bool attached() const;
void detach();
void attach();
// Create an already completed promimse
static promise
// Get the return value. If !ready() this function has undefined behaviour.
Return get();
};
1 | 支持 中断等待 |
2 | 这允许通过简单的 +my_task() 表达式创建并行运行的 Promise。 |
3 | 这允许像 while (p) co_await p; 这样的代码。 |
Promise
协程 Promise(promise::promise_type
)具有以下属性。
-
延迟等待 == cobalt/generator.hpp
生成器是一个“渴望”协程,可以 co_await
和 co_yield
值给调用者。
cobalt::generator<int> example()
{
printf("In coro 1\n");
co_yield 2;
printf("In coro 3\n");
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n");
printf("In main %d\n", co_await f);
printf("In main %d\n", co_await f);
return 0;
}
这将生成以下输出
In main 0 In coro 1 In main 1 In main 2 In coro 3 In main 4
当 Push
(第二个模板参数)设置为非 void 时,可以将值推送到生成器。
cobalt::generator<int, int> example()
{
printf("In coro 1\n");
int i = co_yield 2;
printf("In coro %d\n", i);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main %d\n", co_await f(3)); (1)
co_return 0;
}
1 | 推送的值通过 operator() 传递给 co_yield 的结果。 |
这将生成以下输出
In main 0 In coro 1 In main 2 In coro 3
Lazy
可以通过等待 initial 来将生成器变为惰性。这个 co_await
表达式会产生 Push
值。这意味着生成器将等待直到第一次被等待,然后处理新推送的值,并在下一个 co_yield 处恢复。
cobalt::generator<int, int> example()
{
int v = co_await cobalt::this_coro::initial;
printf("In coro %d\n", v);
co_yield 2;
printf("In coro %d\n", v);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n"); // < this is now before the co_await initial
printf("In main %d\n", co_await f(1));
printf("In main %d\n", co_await f(3));
return 0;
}
这将生成以下输出
In main 0 In main 1 In coro 1 In main 2 In coro 3 In main 4
执行器
执行器从 thread_local
的 get_executor 函数获取,除非在任何位置使用了 asio::executor_arg
,后跟执行器参数。
cobalt::generator<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
内存资源
内存资源从 thread_local
的 get_default_resource 函数获取,除非在任何位置使用了 std::allocator_arg
,后跟 polymorphic_allocator
参数。
cobalt::generator<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
概述
template<typename Yield, typename Push = void>
struct [[nodiscard]] generator
{
// Movable
generator(generator &&lhs) noexcept = default;
generator& operator=(generator &&) noexcept;
// True until it co_returns & is co_awaited after (1)
explicit operator bool() const;
// Cancel the generator. (3)
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if a value is available
bool ready() const;
// Get the returned value. If !ready() this function has undefined behaviour.
Yield get();
// Cancel & detach the generator.
~generator();
// an awaitable that results in value of Yield
.
using generator_awaitable = unspecified;
// Present when Push
!= void
generator_awaitable operator()( Push && push);
generator_awaitable operator()(const Push & push);
// Present when Push
== void
, i.e. can co_await
the generator directly.
generator_awaitable operator co_await (); (2)
};
1 | 这允许像 while (gen) co_await gen: 这样的代码。 |
2 | 支持 中断等待 |
3 | 取消的生成器可能是可恢复的。 |
cobalt/task.hpp
任务是一个“惰性”协程,可以 co_await
和 co_return
值。也就是说,它不能使用 co_yield
。
cobalt::task<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
与 promise 不同,任务可以在创建它的执行器之外的其他执行器上等待或生成。
执行器
由于 task
是惰性的,因此它在构造时不需要执行器。它会尝试从调用者或等待者那里获取,如果存在的话。否则,它将默认为 thread_local 执行器。
内存资源
内存资源不从 thread_local
的 get_default_resource 函数获取,而是从 pmr::get_default_resource()
获取,除非在任何位置使用了 std::allocator_arg
,后跟 polymorphic_allocator
参数。
cobalt::task<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
概述
template<typename Return>
struct [[nodiscard]] task
{
task(task &&lhs) noexcept = default;
task& operator=(task &&) noexcept = default;
// enable `co_await`
auto operator co_await ();
};
可以通过调用 run(my_task()) 从同步函数同步使用任务。 |
cobalt/detached.hpp
detached 是一个“渴望”协程,可以 co_await
但不能 co_return
值。也就是说,它不能被恢复,通常也不被等待。
cobalt::detached delayed_print(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
printf("Hello world\n");
}
cobalt::main co_main(int argc, char *argv[])
{
delayed_print();
co_return 0;
}
Detached 用于在后台轻松运行协程。
cobalt::detached my_task();
cobalt::main co_main(int argc, char *argv[])
{
my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
1 | 生成 detached 协程。 |
detached 可以像这样为自己分配一个新的取消源
cobalt::detached my_task(asio::cancellation_slot sl)
{
co_await this_coro::reset_cancellation_source(sl);
// do somework
}
cobalt::main co_main(int argc, char *argv[])
{
asio::cancellation_signal sig;
my_task(sig.slot()); (1)
co_await delay(std::chrono::milliseconds(50));
sig.emit(asio::cancellation_type::all);
co_return 0;
}
执行器
执行器从 thread_local
的 get_executor 函数获取,除非在任何位置使用了 asio::executor_arg
,后跟执行器参数。
cobalt::detached my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
内存资源
内存资源从 thread_local
的 get_default_resource 函数获取,除非在任何位置使用了 std::allocator_arg
,后跟 polymorphic_allocator
参数。
cobalt::detached my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
use_op
use_op
令牌是直接创建 op 的方式,即使用 cobalt::use_op
作为完成令牌将创建所需的可等待对象。
auto tim = cobalt::use_op.as_default_on(asio::steady_timer{co_await cobalt::this_coro::executor});
co_await tim.async_wait();
根据完成签名,co_await
表达式可能会抛出异常。
签名 | 返回类型 | 异常 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
任何异常 |
|
|
任何异常 |
use_op 永远不会立即完成,即 await_ready 将始终返回 false,但始终会挂起协程。 |
手动编码操作
操作是 [cobalt_operation] 功能的更高级实现。
该库使得可以轻松创建具有早期完成条件的异步操作,即避免协程挂起的条件。
例如,我们可以创建一个 wait_op
,如果计时器已经过期,则什么也不做。
struct wait_op : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
bool ready(cobalt::handler<system::error_code> ) (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) (3)
{
tim.async_wait(std::move(complete));
}
};
1 | 继承具有匹配签名的 op ,await_transform 会将其拾取。 |
2 | 检查操作是否就绪 - 从 await_ready 调用。 |
3 | 如果操作尚未就绪,则启动操作。 |
cobalt/concepts.hpp
可等待
可等待对象是一个可以与 co_await
一起使用的表达式。
template<typename Awaitable, typename Promise = void>
concept awaitable_type = requires (Awaitable aw, std::coroutine_handle<Promise> h)
{
{aw.await_ready()} -> std::convertible_to<bool>;
{aw.await_suspend(h)};
{aw.await_resume()};
};
template<typename Awaitable, typename Promise = void>
concept awaitable =
awaitable_type<Awaitable, Promise>
|| requires (Awaitable && aw) { {std::forward<Awaitable>(aw).operator co_await()} -> awaitable_type<Promise>;}
|| requires (Awaitable && aw) { {operator co_await(std::forward<Awaitable>(aw))} -> awaitable_type<Promise>;};
此库中的 可等待对象 要求协程 Promise 通过常量引用返回它们的执行器(如果它们提供执行器)。否则,它将使用 this_thread::get_executor() 。 |
启用可等待对象
继承 enable_awaitables
将使协程能够通过 await_transform
等待任何在没有任何 await_transform
的情况下可被 co_await
的对象。
cobalt/this_coro.hpp
this_coro
命名空间提供了访问协程 Promise 内部状态的实用程序。
伪可等待对象
// Awaitable type that returns the executor of the current coroutine.
struct executor_t {}
constexpr executor_t executor;
// Awaitable type that returns the cancellation state of the current coroutine.
struct cancellation_state_t {};
constexpr cancellation_state_t cancellation_state;
// Reset the cancellation state with custom or default filters.
constexpr unspecified reset_cancellation_state();
template<typename Filter>
constexpr unspecified reset_cancellation_state(
Filter && filter);
template<typename InFilter, typename OutFilter>
constexpr unspecified reset_cancellation_state(
InFilter && in_filter,
OutFilter && out_filter);
// get & set the throw_if_cancelled setting.
unspecified throw_if_cancelled();
unspecified throw_if_cancelled(bool value);
// Set the cancellation source in a detached.
unspecified reset_cancellation_source();
unspecified reset_cancellation_source(asio::cancellation_slot slot);
// get the allocator the promise
struct allocator_t {};
constexpr allocator_t allocator;
// get the current cancellation state-type
struct cancelled_t {};
constexpr cancelled_t cancelled;
// set the over-eager mode of a generator
struct initial_t {};
constexpr initial_t initial;
Await Allocator
支持 enable_await_allocator
的协程的分配器可以通过以下方式获得
co_await cobalt::this_coro::allocator;
为了在您自己的协程中启用此功能,可以使用 CRTP 模式继承 enable_await_allocator
struct my_promise : cobalt::enable_await_allocator<my_promise>
{
using allocator_type = __your_allocator_type__;
allocator_type get_allocator();
};
如果可用,则分配器将由 use_op 使用。 |
Await 执行器
支持 enable_await_executor
的协程的执行器可以通过以下方式获得
co_await cobalt::this_coro::executor;
为了在您自己的协程中启用此功能,可以使用 CRTP 模式继承 enable_await_executor
struct my_promise : cobalt::enable_await_executor<my_promise>
{
using executor_type = __your_executor_type__;
executor_type get_executor();
};
如果可用,则执行器将由 use_op 使用。 |
Await deferred
您的协程 Promise 可以继承 enable_await_deferred
,以便在 co_await
表达式中使用单个签名 asio::deferred
。
由于 asio::deferred
现在是默认的完成令牌,因此这允许以下代码,而无需指定任何完成令牌或其他特殊化。
asio::steady_timer t{co_await cobalt::this_coro::executor};
co_await t.async_wait();
内存资源基类
Promise 的 promise_memory_resource_base
基类将在 Promise 中提供一个从默认资源或在 std::allocator_arg
参数之后传递的资源中获取的 get_allocator
。同样,它将添加 operator new
重载,以便协程在其帧分配中使用相同的内存资源。
如果已取消则抛出异常
promise_throw_if_cancelled_base
提供了基本选项,允许操作在另一个实际的 可等待对象 被等待时启用协程抛出异常。
co_await cobalt::this_coro::throw_if_cancelled;
取消状态
promise_cancellation_base
提供了基本选项,允许操作启用协程具有可通过 reset_cancellation_state
重置的 cancellation_state
co_await cobalt::this_coro::reset_cancellation_state();
为了方便起见,还有一个检查当前取消状态的快捷方式
asio::cancellation_type ct = (co_await cobalt::this_coro::cancellation_state).cancelled();
asio::cancellation_type ct = co_await cobalt::this_coro::cancelled; // same as above
cobalt/this_thread.hpp
由于一切都是单线程的,因此此库为每个线程提供了一个执行器和默认内存资源。
namespace boost::cobalt::this_thread
{
pmr::memory_resource* get_default_resource() noexcept; (1)
pmr::memory_resource* set_default_resource(pmr::memory_resource* r) noexcept; (2)
pmr::polymorphic_allocator<void> get_allocator(); (3)
typename asio::io_context::executor_type & get_executor(); (4)
void set_executor(asio::io_context::executor_type exec) noexcept; (5)
}
1 | 获取默认资源 - 除非设置,否则将为 pmr::get_default_resource。 |
2 | 设置默认资源 - 返回之前设置的资源。 |
3 | 获取包装 (1) 的分配器。 |
4 | 获取线程的执行器 - 如果未设置则抛出异常。 |
5 | 设置当前线程的执行器。 |
协程将使用这些作为默认值,但保留一份副本以防万一。
唯一的例外是 cobalt-operation 的初始化,它将使用 this_thread::executor 从中重新抛出。 |
cobalt/channel.hpp
通道可用于在单个线程上的不同协程之间交换数据。
概述
template<typename T>
struct channel
{
// create a channel with a buffer limit, executor & resource.
explicit
channel(std::size_t limit = 0u,
executor executor = this_thread::get_executor(),
pmr::memory_resource * resource = this_thread::get_default_resource());
// not movable.
channel(channel && rhs) noexcept = delete;
channel & operator=(channel && lhs) noexcept = delete;
using executor_type = executor;
const executor_type & get_executor();
// Closes the channel
~channel();
bool is_open() const;
// close the operation, will cancel all pending ops, too
void close();
// an awaitable that yields T
using read_op = unspecified;
// an awaitable that yields void
using write_op = unspecified;
// read a value to a channel
read_op read();
// write a value to the channel
write_op write(const T && value);
write_op write(const T & value);
write_op write( T && value);
write_op write( T & value);
// write a value to the channel if T is void
};
描述
通道是两个协程进行通信和同步的工具。
const std::size_t buffer_size = 2;
channel<int> ch{exec, buffer_size};
// in coroutine (1)
co_await ch.write(42);
// in coroutine (2)
auto val = co_await ch.read();
1 | 将值发送到通道 - 将阻塞直到可以发送为止。 |
2 | 从通道读取值 - 将阻塞直到值可等待为止。 |
这两个操作都可能阻塞,具体取决于通道缓冲区大小。
如果缓冲区大小为零,则需要同时进行 read
和 write
,即充当 rendezvous。
如果缓冲区未满,则写入操作不会挂起协程;同样,如果缓冲区不为空,则读取操作不会挂起。
如果两个操作同时完成(就像空缓冲区的情况一样),则第二个操作将被发布到执行器以供稍后完成。
通道类型可以是 void ,在这种情况下,write 不带参数。 |
通道操作可以取消而不会丢失数据。这使得它们可以与 race 一起使用。
generator<variant2::variant<int, double>> merge(
channel<int> & c1,
channel<double> & c2)
{
while (c1 && c2)
co_yield co_await race(c1, c2);
}
示例
cobalt::promise<void> producer(cobalt::channel<int> & chan)
{
for (int i = 0; i < 4; i++)
co_await chan.write(i);
chan.close();
}
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
while (c.is_open())
std::cout << co_await c.read() << std::endl;
co_await p;
co_return 0;
}
此外,还提供了一个 channel_reader
,使读取通道更方便,并且可以与 BOOST_COBALT_FOR 一起使用。
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
std::cout << value << std::endl;
co_await p;
co_return 0;
}
cobalt/with.hpp
with
机制提供了一种执行协程异步销毁的方法。也就是说,它类似于异步析构函数调用。
struct my_resource
{
cobalt::promise<void> await_exit(std::exception_ptr e);
};
cobalt::promise<void> work(my_resource & res);
cobalt::promise<void> outer()
{
co_await cobalt::with(my_resource(), &work);
}
可以通过提供 await_exit
成员函数或返回 可等待对象 的 tag_invoke
函数,或者通过将销毁作为 with
的第三个参数来完成销毁。
using ws_stream = beast::websocket::stream<asio::ip::tcp::socket>>;
cobalt::promise<ws_stream> connect(urls::url); (1)
cobalt::promise<void> disconnect(ws_stream &ws); (2)
auto teardown(const boost::cobalt::with_exit_tag & wet , ws_stream & ws, std::exception_ptr e)
{
return disconnect(ws);
}
cobalt::promise<void> run_session(ws_stream & ws);
cobalt::main co_main(int argc, char * argv[])
{
co_await cobalt::with(co_await connect(argv[1]), &run_session, &teardown);
co_return 0;
}
1 | 实现 websocket 连接和 websocket 初始化。 |
2 | 实现有序关闭。 |
如果作用域在没有异常的情况下退出,则 std::exception_ptr 为 null。注意:exit 函数通过引用获取 exception_ptr 并对其进行修改是合法的。 |
cobalt/race.hpp
race
函数可用于从一组 可等待对象 中 co_await
一个。
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_wait()
{
co_await cobalt::race(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::race(aws); (2)
}
race
的第一个参数可以是 均匀随机位生成器。
extern promise<void> pv1, pv2;
std::vector<promise<void>> pvv;
std::default_random_engine rdm{1};
// if everything returns void race returns the index
std::size_t r1 = co_await race(pv1, pv2);
std::size_t r2 = co_await race(rdm, pv1, pv2);
std::size_t r3 = co_await race(pvv);
std::size_t r4 = co_await race(rdm, pvv);
// variant if not everything is void. void become monostate
extern promise<int> pi1, pi2;
variant2::variant<monostate, int, int> r5 = co_await race(pv1, pi1, pi2);
variant2::variant<monostate, int, int> r6 = co_await race(rdm, pv1, pi1, pi2);
// a range returns a pair of the index and the result if non-void
std::vector<promise<int>> piv;
std::pair<std::size_t, int> r7 = co_await race(piv);
std::pair<std::size_t, int> r8 = co_await race(rdm, piv);
中断等待
当参数作为右值引用传递时,race 将尝试在 可等待对象 上使用 .interrupt_await
来通知可等待对象立即完成并且结果将被忽略。如果支持,可等待对象 必须在 interrupt_await
返回之前恢复等待协程。如果 race
没有检测到该函数,则会发送取消。
这意味着你可以像这样重用 race
cobalt::promise<void> do_wait()
{
auto t1 = task1();
auto t2 = task2();
co_await cobalt::race(t1, t2); (1)
co_await cobalt::race(t1, t2); (2)
}
1 | 等待第一个任务完成。 |
2 | 等待另一个任务完成。 |
race
将调用 可等待对象 的函数,就像在 co_await
表达式中使用一样,或者根本不评估它们。
left_race
left_race
函数类似于 race
,但遵循严格的从左到右的扫描。这可能会导致饥饿问题,这就是为什么这不是推荐的默认值,但如果在适当的情况下小心处理,则可以用于优先级排序。
概述
// Concept for the random number generator.
template<typename G>
concept uniform_random_bit_generator =
requires ( G & g)
{
{typename std::decay_t<G>::result_type() } -> std::unsigned_integral; // is an unsigned integer type
// T Returns the smallest value that G's operator() may return. The value is strictly less than G::max(). The function must be constexpr.
{std::decay_t<G>::min()} -> std::same_as<typename std::decay_t<G>::result_type>;
// T Returns the largest value that G's operator() may return. The value is strictly greater than G::min(). The function must be constexpr.
{std::decay_t<G>::max()} -> std::same_as<typename std::decay_t<G>::result_type>;
{g()} -> std::same_as<typename std::decay_t<G>::result_type>;
} && (std::decay_t<G>::max() > std::decay_t<G>::min());
// Variadic race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, awaitable ... Promise>
awaitable race(URBG && g, Promise && ... p);
// Ranged race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, range<awaitable> PromiseRange>
awaitable race(URBG && g, PromiseRange && p);
// Variadic race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable race(Promise && ... p);
// Ranged race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable race(PromiseRange && p);
// Variadic left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable left_race(Promise && ... p);
// Ranged left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable left_race(PromiseRange && p);
选择空范围将导致抛出异常。 |
cobalt/gather.hpp
gather
函数可用于同时 co_await
多个 可等待对象,并传递取消信号。
该函数将收集所有完成,并将其作为 system::result
返回,即捕获作为值概念。一个可等待对象抛出异常不会取消其他对象。
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_gather()
{
co_await cobalt::gather(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::gather(aws); (2)
}
gather
将调用 awaitable
的函数,就像在 co_await
表达式中使用一样。
extern promise<void> pv1, pv2;
std::tuple<system::result<int>, system::result<int>> r1 = co_await gather(pv1, pv2);
std::vector<promise<void>> pvv;
pmr::vector<system::result<void>> r2 = co_await gather(pvv);
extern promise<int> pi1, pi2;
std::tuple<system::result<monostate>,
system::result<monostate>,
system::result<int>,
system::result<int>> r3 = co_await gather(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<system::result<int>> r4 = co_await gather(piv);
概述
// Variadic gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable gather(Promise && ... p);
// Ranged gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable gather(PromiseRange && p);
cobalt/join.hpp
join
函数可以用于一次 co_await
多个 awaitable,并具有正确连接的取消操作。
该函数将收集所有完成并将其作为值返回,除非抛出异常。如果抛出异常,所有未完成的操作将被取消(如果可能,则中断),并重新抛出第一个异常。
void 将在元组中以 variant2::monostate 的形式返回,除非所有 awaitable 都产生 void。 |
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_join()
{
co_await cobalt::join(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::join(aws); (2)
}
join
将调用 awaitable
的函数,就像在 co_await
表达式中使用一样。
extern promise<void> pv1, pv2;
/* void */ co_await join(pv1, pv2);
std::vector<promise<void>> pvv;
/* void */ co_await join(pvv);
extern promise<int> pi1, pi2;
std::tuple<monostate, monostate, int, int> r1 = co_await join(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<int> r2 = co_await join(piv);
概述
// Variadic join
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable join(Promise && ... p);
// Ranged join
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable join(PromiseRange && p);
在空范围上选择将导致异常。 |
cobalt/wait_group.hpp
wait_group
函数可用于管理类型为 promise<void>
的多个协程。通过使用匹配的 await_exit
成员,它可以与 cobalt/with.hpp 一起开箱即用。
本质上,wait_group
是一个动态的 promise 列表,它具有 race
函数 (wait_one
)、gather
函数 (wait_all
),并且会在作用域退出时清理。
struct wait_group
{
// create a wait_group
explicit
wait_group(asio::cancellation_type normal_cancel = asio::cancellation_type::none,
asio::cancellation_type exception_cancel = asio::cancellation_type::all);
// insert a task into the group
void push_back(promise<void> p);
// the number of tasks in the group
std::size_t size() const;
// remove completed tasks without waiting (i.e. zombie tasks)
std::size_t reap();
// cancel all tasks
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// wait for one task to complete.
wait_one_op wait_one();
// wait for all tasks to complete
wait_op wait();
// wait for all tasks to complete
wait_op operator co_await ();
// when used with with
, this will receive the exception
// and wait for the completion
// if ep
is set, this will use the exception_cancel
level,
// otherwise the normal_cancel
to cancel all promises.
wait_op await_exit(std::exception_ptr ep);
};
cobalt/spawn.hpp
spawn
函数允许在 asio executor
/execution_context
上运行 task,并使用 completion token 来消耗结果。
auto spawn(Context & context, task<T> && t, CompletionToken&& token);
auto spawn(Executor executor, task<T> && t, CompletionToken&& token);
Spawn 将分发其启动并发布完成。这使得可以使用 task 在另一个执行器上运行 task,并在当前执行器上使用 use_op 来消耗结果。也就是说,spawn
可以用于跨线程。
示例
cobalt::task<int> work();
int main(int argc, char *argv[])
{
asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_1};
auto f = spawn(ctx, work(), asio::use_future);
ctx.run();
return f.get();
}
调用者需要确保执行器不是在多个线程上并发运行的,例如,通过使用单线程的 asio::io_context 或 strand 。 |
cobalt/run.hpp
run
函数类似于 spawn,但它是同步运行的。它会在内部设置执行上下文和内存资源。
当将一段 cobalt 代码集成到同步应用程序中时,这会很有用。
概述
// Run the task and return it's value or rethrow any exception.
T run(task<T> t);
示例
cobalt::task<int> work();
int main(int argc, char *argv[])
{
return run(work());
}
cobalt/thread.hpp
线程类型是创建类似于 main
的环境的另一种方式,但不使用 signal_set
。
cobalt::thread my_thread()
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
1 | 获取执行器 thread 的运行位置 |
2 | 将其与 asio 对象一起使用 |
3 | co_await 一个钴操作 |
要使用线程,您可以像使用 std::thread
一样使用它
int main(int argc, char * argv[])
{
auto thr = my_thread();
thr.join();
return 0;
}
线程也是一个 awaitable
(包括取消)。
cobalt::main co_main(int argc, char * argv[])
{
auto thr = my_thread();
co_await thr;
co_return 0;
}
销毁一个分离的线程将导致硬停止 (io_context::stop ) 并加入该线程。 |
此库中没有任何内容(除了等待 cobalt/thread.hpp 和 cobalt/spawn.hpp 之外)是线程安全的。如果需要在线程之间传输数据,则需要一个线程安全的实用程序,例如 asio::concurrent_channel 。您不能在线程之间共享任何钴原语,唯一的例外是可以将 task spawn 到另一个线程的执行器上。 |
执行器
它还会创建一个要运行的 asio::io_context
,你可以通过 this_coro::executor
获取它。它将被分配给 cobalt::this_thread::get_executor()
。
内存资源
它还会创建一个内存资源,该资源将用作内部内存分配的默认值。它将被分配给 thread_local
到 cobalt::this_thread::get_default_resource()
。
概述
struct thread
{
// Send a cancellation signal
void cancel(asio::cancellation_type type = asio::cancellation_type::all);
// Allow the thread to be awaited. NOOP if the thread is invalid.
auto operator co_await() &-> detail::thread_awaitable; (1)
auto operator co_await() && -> detail::thread_awaitable; (2)
// Stops the io_context & joins the executor
~thread();
/// Move constructible
thread(thread &&) noexcept = default;
using executor_type = executor;
using id = std::thread::id;
id get_id() const noexcept;
// Add the functions similar to `std::thread`
void join();
bool joinable() const;
void detach();
executor_type get_executor() const;
};
1 | 支持 中断等待 |
2 | 始终转发取消 |
cobalt/result.hpp
可以修改 Awaitable 以返回 system::result
或 std::tuple
,而不是使用异常。
// value only
T res = co_await foo();
// as result
system::result<T, std::exception_ptr> res = co_await cobalt::as_result(foo());
// as tuple
std::tuple<std::exception_ptr, T> res = co_await cobalt::as_tuple(foo());
Awaitable 还可以通过使用 cobalt::as_result_tag
和 cobalt::as_tuple_tag
提供 await_resume
重载,从而提供处理结果和元组的自定义方式。
your_result_type await_resume(cobalt::as_result_tag);
your_tuple_type await_resume(cobalt::as_tuple_tag);
// example of an op with result system::error_code, std::size_t
system::result<std::size_t> await_resume(cobalt::as_result_tag);
std::tuple<system::error_code, std::size_t> await_resume(cobalt::as_tuple_tag);
Awaitable 仍然允许抛出异常,例如针对 OOM 等严重异常。 |
cobalt/async_for.hpp
对于生成器等类型,提供了 BOOST_COBALT_FOR
宏,以模拟 for co_await
循环。
cobalt::generator<int> gen();
cobalt::main co_main(int argc, char * argv[])
{
BOOST_COBALT_FOR(auto i, gen())
printf("Generated value %d\n", i);
co_return 0;
}
cobalt/error.hpp
为了使错误更容易管理,cobalt 提供了一个 error_category
,用于 boost::system::error_code
。
enum class error
{
moved_from,
detached,
completed_unexpected,
wait_not_ready,
already_awaited,
allocation_failed
};
system::error_category & cobalt_category();
system::error_code make_error_code(error e);
cobalt/config.hpp
配置添加器允许配置 boost.cobalt 的一些实现细节。
executor_type
执行器类型默认为 boost::asio::any_io_executor
。
您可以通过定义 BOOST_COBALT_CUSTOM_EXECUTOR
并自行添加 boost::cobalt::executor
类型,将其设置为 boost::asio::any_io_executor
。
或者,可以定义 BOOST_COBALT_USE_IO_CONTEXT
以将执行器设置为 boost::asio::io_context::executor_type
。
pmr
Boost.cobalt 可以与不同的 pmr 实现一起使用,默认为 std::pmr
。
可以使用以下宏来配置它
-
BOOST_COBALT_USE_STD_PMR
-
BOOST_COBALT_USE_BOOST_CONTAINER_PMR
-
BOOST_COBALT_USE_CUSTOM_PMR
如果您定义 BOOST_COBALT_USE_CUSTOM_PMR
,则需要提供一个 boost::cobalt::pmr
命名空间,它是 std::pmr
的替代品。
或者,可以使用以下方式禁用 pmr
的使用
-
BOOST_COBALT_NO_PMR
.
use_op
使用一个小型缓冲区优化的资源,其大小可以通过定义 BOOST_COBALT_SBO_BUFFER_SIZE
来设置,默认为 4096 字节。
cobalt/leaf.hpp
template<awaitable TryAwaitable, typename ... H >
auto try_catch(TryAwaitable && try_coro, H && ... h );
template<awaitable TryAwaitable, typename ... H >
auto try_handle_all(TryAwaitable && try_coro, H && ... h );
template<awaitable TryAwaitable, typename ... H >
auto try_handle_some(TryAwaitable && try_coro, H && ... h );
有关详细信息,请参阅 leaf 文档。
cobalt/experimental/context.hpp
这是(很可能)未定义的行为,因为它违反了标准中的前提条件。可以在此处找到解决此问题的论文 (https://isocpp.org/files/papers/P3203R0.html)。 |
此标头为使用基于 boost.fiber
的有栈协程提供了 experimental
支持,就像它们是 C20 协程一样。也就是说,它们可以通过放入 coroutine_handle
来使用 `awaitables`。同样,该实现使用 C20 协程 promise,并像运行 C++20 协程一样运行。
//
void delay(experimental::context<promise<void>> h, std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
h.await(tim.async_wait(cobalt::use_op)); // instead of co_await.
}
cobalt::main co_main(int argc, char *argv[])
{
cobalt::promise<void> dl = cobalt::experimental::make_context(&delay, 50);
co_await dl;
co_return 0;
}
参考
// The internal coroutine context.
/// Args are the function arguments after the handle.
template<typename Return, typename ... Args>
struct context
{
// Get a handle to the promise
promise_type & promise();
const promise_type & promise() const;
// Convert it to any context if the underlying promise is the same
template<typename Return_, typename ... Args_>
constexpr operator context<Return_, Args_...>() const;
// Await something. Uses await_transform automatically.
template<typename Awaitable>
auto await(Awaitable && aw);
// Yield a value, if supported by the promise.
template<typename Yield>
auto yield(Yield && value);
};
// Create a fiber with a custom stack allocator (see boost.fiber for details) and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc && salloc, Args && ... args);
// Create a fiber with the default allocator and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func>
auto make_context(Func && func, Args && ... args);
// Create a fiber with a custom stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc && salloc, Args && ... args);
// Create a fiber with the default stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func>
auto make_context(Func && func, Args && ... args);
深入
自定义执行器
cobalt 默认使用 asio::any_io_executor
的原因之一是它是一种类型擦除的执行器,即您可以提供自己的事件循环,而无需重新编译 cobalt
。
但是,在 Executor TS 的开发过程中,执行器概念变得有点不直观,委婉地说。
Ruben Perez 写了一篇出色的 博客文章,我将毫不客气地从中借鉴。
定义
执行器是指向实际事件循环的类型,并且(廉价地)可复制,它支持属性(见下文),具有可比较的相等性,并且具有 execute
函数。
执行
struct example_executor
{
template<typename Fn>
void execute(Fn && fn) const;
};
以上函数根据其属性执行 fn
。
属性
可以查询、首选或要求属性,例如
struct example_executor
{
// get a property by querying it.
asio::execution::relationship_t &query(asio::execution::relationship_t) const
{
return asio::execution::relationship.fork;
}
// require an executor with a new property
never_blocking_executor require(const execution::blocking_t::never_t);
// prefer an executor with a new property. the executor may or may not support it.
never_blocking_executor prefer(const execution::blocking_t::never_t);
// not supported
example_executor prefer(const execution::blocking_t::always_t);
};
asio::any_io_executor
的属性
为了将执行器包装在 asio::any_io_executor
中,需要两个属性
-
execution::context_t
-
execution::blocking_t::never_t
这意味着我们需要使它们可要求(对于上下文而言没有意义)或从 query
返回期望的值。
execution::context_t
查询应返回 asio::execution_context&
,如下所示
struct example_executor
{
asio::execution_context &query(asio::execution::context_t) const;
};
执行上下文用于管理管理 io 对象生命周期的服务的生命周期,例如 asio 的计时器和套接字。也就是说,通过提供此上下文,asio 的所有 io 都可以使用它。
执行上下文必须在执行器被销毁后仍然存在。 |
以下可能是首选的
-
execution::blocking_t::possibly_t
-
execution::outstanding_work_t::tracked_t
-
execution::outstanding_work_t::untracked_t
-
execution::relationship_t::fork_t
-
execution::relationship_t::continuation_
这意味着您可能希望在执行器中支持它们以进行优化。
blocking
属性
正如我们之前看到的,此属性控制传递给 execute()
的函数是可以在 execute()
中立即运行,还是必须排队以供以后执行。可能的值是
-
asio::execution::blocking.never
:永远不要在execute()
中运行该函数。这是asio::post()
的作用。 -
asio::execution::blocking.possibly
:该函数可能在execute()
中运行,也可能不运行。这是默认值(调用io_context::get_executor
时获得的值)。 -
asio::execution::blocking.always
:该函数始终在execute()
中运行。io_context::executor
不支持此功能。
relationship
属性
relationship
可以取两个值
-
asio::execution::relationship.continuation
:表示传递给execute()
的函数是调用execute()
的函数的延续。 -
asio::execution::relationship.fork
:与上述相反。这是默认值(调用io_context::get_executor()
时获得的值)。
将此属性设置为 continuation
可以在调度函数的方式中启用一些优化。仅当函数排队时(而不是立即运行时)才有效。对于 io_context
,设置后,该函数将安排在更快的线程本地队列中运行,而不是上下文全局队列中运行。
outstanding_work_t
属性
outstanding_work
可以取两个值
-
asio::execution::outstanding_work.tracked
:表示当执行器处于活动状态时,仍有工作要做。 -
asio::execution::outstanding_work.untracked
:与上述相反。这是默认值(调用io_context::get_executor()
时获得的值)。
将此属性设置为 tracked
意味着只要 executor
处于活动状态,事件循环就不会返回。
最小执行器
有了这些,让我们看一下最小执行器的接口。
struct minimal_executor
{
minimal_executor() noexcept;
asio::execution_context &query(asio::execution::context_t) const;
static constexpr asio::execution::blocking_t
query(asio::execution::blocking_t) noexcept
{
return asio::execution::blocking.never;
}
template<class F>
void execute(F && f) const;
bool operator==(minimal_executor const &other) const noexcept;
bool operator!=(minimal_executor const &other) const noexcept;
};
有关使用 python 的 asyncio 事件循环的实现,请参见 example/python.cpp。 |
添加工作保护。
现在,让我们为 outstanding_work
属性添加一个 require
函数,该函数使用多种类型。
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const {return *this; }
};
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const {return *this;}
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const;
};
请注意,从 require
函数返回不同的类型不是必需的,也可以像这样完成
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor require(asio::execution::outstanding_work::untracked_t) const;
};
如果我们想使用 prefer
,它看起来如下所示
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor prefer(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor prefer(asio::execution::outstanding_work::untracked_t) const;
};
总结
如您所见,属性系统并非微不足道,但功能非常强大。实现自定义执行器本身就是一个问题类别,这就是为什么本文档不这样做。相反,有一个关于如何在执行器中包装 python 事件循环的示例。
以下是一些阅读建议。
无栈
C++20 协程是无栈的,这意味着它们没有自己的堆栈。
C++ 中的堆栈描述了调用堆栈,即所有堆叠的函数帧。函数帧是函数运行所需的内存,即存储其变量和返回地址等信息的内存片。
函数帧的大小在编译时已知,但在包含其定义的编译单元之外未知。 |
int bar() {return 0;} // the deepest point of the stack
int foo() {return bar();}
int main()
{
return foo();
}
在上面的示例中,调用堆栈是
main()
foo()
bar()
协程可以实现为有栈的,这意味着它会分配固定大小的内存块,并像线程一样堆叠函数帧。C++20 协程是无栈的,即它们只分配自己的帧,并在恢复时使用调用者的堆栈。使用我们之前的例子
fictional_eager_coro_type<int> example()
{
co_yield 0;
co_yield 1;
}
void nested_resume(fictional_eager_coro_type<int>& f)
{
f.resume();
}
int main()
{
auto f = example();
nested_resume(f);
f.reenter();
return 0;
}
这将产生类似于这样的调用堆栈
main()
f$example()
nested_resume()
f$example()
f$example()
如果协程在线程之间移动,情况也是如此。
惰性 & 急切
协程是惰性的,如果它们只在恢复后才开始执行其代码,而 eager 协程将立即执行,直到其第一个挂起点(即 co_await
、co_yield
或 co_return
表达式)。
lazy_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
lazy.resume();
printf("resumed twice\n");
return 0;
}
这将产生如下输出
enter main
constructed coro
Entered coro
resumed once
Coro Done
resumed twice
而 eager 协程看起来会是这样
eager_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
return 0;
}
这将产生如下输出
enter main
Entered coro
constructed coro
resume once
Coro Done
基准测试
在第 11 代 Intel® Core™ i7-1185G7 @ 3.00GHz 上运行
发布到执行器
该基准测试运行以下代码,使用了 cobalt 的任务、asio::awaitable
和基于 `asio` 的有栈协程 (boost.context)。
cobalt::task<void> atest()
{
for (std::size_t i = 0u; i < n; i++)
co_await asio::post(cobalt::use_op);
}
gcc 12 | clang 16 | |
---|---|---|
cobalt |
2472 |
2098 |
可等待对象 |
2432 |
2253 |
有栈 |
3655 |
3725 |
并行运行 noop 协程
此基准测试使用大小为零的 asio::experimental::channel
,以便并行地读取和写入。它使用带有 cobalt 的 gather 和 asio::awaitable
中的 awaitable_operator
。
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
for (std::size_t i = 0u; i < n; i++)
co_await cobalt::gather(
chan.async_send(system::error_code{}, cobalt::use_task),
chan.async_receive(cobalt::use_task));
}
asio::awaitable<void> awtest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
using boost::asio::experimental::awaitable_operators::operator&&;
for (std::size_t i = 0u; i < n; i++)
co_await (
chan.async_send(system::error_code{}, asio::use_awaitable)
&&
chan.async_receive(asio::use_awaitable));
}
gcc 12 | clang 16 | |
---|---|---|
cobalt |
1563 |
1468 |
可等待对象 |
2800 |
2805 |
立即
此基准测试通过使用大小为 1 的通道来利用立即完成,以便每次操作都是立即的。
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 1u};
for (std::size_t i = 0u; i < n; i++)
{
co_await chan.async_send(system::error_code{}, cobalt::use_op);
co_await chan.async_receive(cobalt::use_op);
}
}
gcc 12 | clang 16 | |
---|---|---|
cobalt |
1810 |
1864 |
可等待对象 |
3109 |
4110 |
有栈 |
3922 |
4705 |
通道
在此基准测试中,将比较 asio::experimental::channel 和 cobalt::channel。
这与并行测试类似,但使用了 cobalt::channel
代替。
gcc | clang | |
---|---|---|
cobalt |
500 |
350 |
可等待对象 |
790 |
770 |
有栈 |
867 |
907 |
操作分配
此基准测试比较了异步操作的相关分配器的不同可能解决方案
gcc | clang | |
---|---|---|
std::allocator |
1136 |
1139 |
cobalt::monotonic |
1149 |
1270 |
pmr::monotonic |
1164 |
1173 |
cobalt::sbo |
1021 |
1060 后一种方法在 cobalt 内部使用。 |
要求
库
Boost.cobalt 需要 C++20 编译器,并直接依赖以下 boost 库
-
boost.asio
-
boost.system
-
boost.circular_buffer
-
boost.intrusive
-
boost.smart_ptr
-
boost.container(对于 clang < 16)
编译器
该库自 Clang 14、Gcc 10 和 MSVC 19.30 (Visual Studio 2022) 起受支持。
Gcc 版本 12.1 和 12.2 似乎存在协程的错误,如[此处](https://godbolt.org/z/6adGcqP1z)所示,应避免用于协程。 |
Clang 仅在 16 中添加了 std::pmr
支持,因此较旧的 clang 版本使用 boost::contianer::pmr
作为替代品。
某些(如果不是全部)MSVC 版本存在协程实现的错误,此库需要解决此问题。这可能会导致不确定的行为和开销。 |
协程的延续可以在从 final_suspend
返回的 awaitable 中完成,如下所示
// in promise
auto final_suspend() noexcept
{
struct final_awaitable
{
std::coroutine_handle<void> continuation{std::noop_coroutine()}; (1)
bool await_ready() const noexcept;
std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> h) noexcept
{
auto cc = continuation;
h.destroy(); (2)
return cc;
}
void await_resume() noexcept {}
};
return final_awaitable{my_continuation};
};
1 | 延续 |
2 | 在延续之前自销毁协程 |
final_suspend
在 MSVC 上无法正确挂起协程,因此 h.destroy()
将导致协程帧上的元素被双重销毁。因此,msvc 需要发布销毁,以便在外部执行。这将导致开销,并使内存的实际释放不确定。
致谢
如果没有 CppAlliance 及其创始人 Vinnie Falco,这个库就不可能实现。Vinnie 非常信任我,让我在这个项目上工作,尽管他对这个库的设计方式有非常不同的看法。
还要感谢 Ruben Perez 和 Richard Hodges,他们听取了我的设计问题并给了我建议和用例。此外,如果没有 Chris Kohlhoff 出色的 boost.asio,这个库也不可能实现。