概述
Here’s a list of relevant features in cobalt
An eager coroutine returning a single result- consider it the default |
|
An eager coroutine that can yield multiple values. |
|
A lazy version of promise that can be spawned onto other executors. |
|
A coroutine similar to promise, without a handle |
A function that waits for one coroutine out of a set that is ready in a pseudo-random way, to avoid starvation. |
|
A function that waits for a set of coroutines and returns all of them as value or throws an exception if any awaitable does so. |
|
A function that waits for a set of coroutines and returns all of them as |
|
A deterministic |
A thread-local utility to send values between coroutines. |
|
An async RAII helper, that allows async teardown when exceptions occur |
A short introduction to C++ coroutines |
Read if you’ve never used coroutines before |
|
An abbreviated high level view of the features and concepts |
Read if you’re familiar with asio & coroutines and want a rough idea what this library offers. |
|
Low level view of usages |
Read if you want to get coding quickly |
|
API 参考 |
Look up details while coding |
|
Some implementation details |
Read if you’re not confused enough |
动机
Many languages programming languages like node.js and python provide easy to use single-threaded concurrency frameworks. While more complex than synchronous code, single threaded asynchronicity avoids many of the pitfalls & overhead of multi-threading.
That is, one coroutine can work, while others wait for events (e.g. a response from a server). This allows to write applications that do multiple things simultaneously on a single thread.
This library is meant to provide this to C++: simple single threaded asynchronicity akin to node.js and asyncio in python that works with existing libraries like boost.beast, boost.mysql or boost.redis. It based on boost.asio.
It takes a collection of concepts from other languages and provides them based on C++20 coroutines.
-
easy asynchronous base functions, such as an async main & threads
-
an async scope
Unlike asio::awaitable and asio::experimental::coro, cobalt coroutines are open. That is, an asio::awaitable can only await and be awaited by other asio::awaitable and does not provide coroutine specific synchronization mechanisms.
cobalt on the other hand provides a coroutine specific channel and different wait types (race, gather etc.) that are optimized to work with coroutines and awaitables.
协程入门
异步编程
Asynchronous programming generally refers to a style of programming that allows tasks to be run in the background, while the other works is performed.
Imagine if you will a get-request function that performs a full http request including connecting & ssl handshakes etc.
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
auto res = http_get("https://boost.ac.cn");
printf("%s", res.c_str());
return 0;
}
The above code would be traditional synchronous programming. If we want to perform two requests in parallel we would need to create another thread to run another thread with synchronous code.
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
std::string other_res;
std::thread thr{[&]{ other_res = http_get("https://cppalliance.org"); }};
auto res = http_get("https://boost.ac.cn");
thr.join();
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
This works, but our program will spend most of the time waiting for input. Operating systems provide APIs that allow IO to be performed asynchronously, and libraries such as boost.asio provide portable ways to manage asynchronous operations. Asio itself does not dictate a way to handle the completions. This library (boost.cobalt) provides a way to manage this all through coroutines/awaitables.
cobalt::promise<std::string> http_cobalt_get(std:string_view url);
cobalt::main co_main(int argc, char * argv[])
{
auto [res, other_res] =
cobalt::join(
http_cobalt_get("https://boost.ac.cn"),
http_cobalt_get("https://cppalliance.org")
);
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
In the above code the asynchronous function to perform the request takes advantage of the operating system APIs so that the actual IO doesn’t block. This means that while we’re waiting for both functions to complete, the operations are interleaved and non-blocking. At the same time cobalt provides the coroutine primitives that keep us out of callback hell.
协程
Coroutines are resumable functions. Resumable means that a function can suspend, i.e. pass the control back to the caller multiple times.
A regular function yields control back to the caller with the return function, where it also returns the value.
A coroutine on the other hand might yield control to the caller and get resumed multiple times.
A coroutine has three control keywords akin to co_return (of which only co_return has to be supported).
-
co_return -
co_yield -
co_await
co_return
This is similar to return, but marks the function as a coroutine.
co_await
The co_await expression suspends for an Awaitable, i.e. stops execution until the awaitable resumes it.
E.g.
cobalt::promise<void> delay(std::chrono::milliseconds);
cobalt::task<void> example()
{
co_await delay(std::chrono::milliseconds(50));
}
A co_await expression can yield a value, depending on what it is awaiting.
cobalt::promise<std::string> read_some();
cobalt::task<void> example()
{
std::string res = co_await read_some();
}
In cobalt most coroutine primitives are also Awaitables. |
co_yield
The co_yield expression is similar to the co_await, but it yields control to the caller and carries a value.
例如:
cobalt::generator<int> iota(int max)
{
int i = 0;
while (i < max)
co_yield i++;
co_return i;
}
A co_yield expression can also produce a value, which allows the user of yielding coroutine to push values into it.
cobalt::generator<int> iota()
{
int i = 0;
bool more = false;
do
{
more = co_yield i++;
}
while(more);
co_return -1;
}
可等待对象
Awaitables are types that can be used in a co_await expression.
struct awaitable_prototype
{
bool await_ready();
template<typename T>
see_below await_suspend(std::coroutine_handle<T>);
return_type await_resume();
};
Type will be implicitly converted into an awaitable if there is an operator co_await call available. This documentation will use awaitable to include these types, and "actual_awaitable" to refer to type conforming to the above prototype. |
In a co_await expression the waiting coroutine will first invoke await_ready to check if the coroutine needs to suspend. When ready, it goes directly to await_resume to get the value, as there is no suspension needed. Otherwise, it will suspend itself and call await_suspend with a std::coroutine_handle to its own promise.
std::coroutine_handle<void> can be used for type erasure. |
The return_type is the result type of the co_await expression, e.g. int
int i = co_await awaitable_with_int_result();
The return type of the await_suspend can be three things
-
void -
bool -
std::coroutine_handle<U>
If it is void the awaiting coroutine remains suspended. If it is bool, the value will be checked, and if false, the awaiting coroutine will resume right away.
If a std::coroutine_handle is returned, this coroutine will be resumed. The latter allows await_suspend to return the handle passed in, being effectively the same as returning false.
If the awaiting coroutine gets re-resumed right away, i.e. after calling await_resume, it is referred to as "immediate completion" within this library. This is not to be confused with a non-suspending awaitable, i.e. one that returns true from await_ready.
事件循环
Since the coroutines in cobalt can co_await events, they need to be run on an event-loop. That is another piece of code is responsible for tracking outstanding event and resume a resuming coroutines that are awaiting them. This pattern is very common and is used in a similar way by node.js or python’s asyncio.
cobalt uses an asio::io_context as its default event loop. That is, the classes thread, main and the run function are using it internally.
You can use any event loop that can produce an asio::any_io_executor with the library. The easiest way to achieve this is by using spawn.
The event loop is accessed through an executor (following the asio terminology) and can be manually set using set_executor.
导览
进入 cobalt 环境
In order to use awaitables we need to be able to co_await them, i.e. be within a coroutine.
We got four ways to achieve this
- cobalt/main.hpp
-
replace
int mainwith a coroutine
cobalt::main co_main(int argc, char* argv[])
{
// co_await things here
co_return 0;
}
- cobalt/thread.hpp
-
create a thread for the asynchronous environments
cobalt::thread my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
auto t = my_thread();
t.join();
return 0;
}
- cobalt/task.hpp
-
create a task and run or spawn it
cobalt::task<void> my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
cobalt::run(my_task()); // sync
asio::io_context ctx;
cobalt::spawn(ctx, my_task(), asio::detached);
ctx.run();
return 0;
}
Promises
Promises are the recommended default coroutine type. They’re eager and thus easily usable for ad-hoc concurrecy.
cobalt::promise<int> my_promise()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// start the promise here
auto p = my_promise();
// do something else here
co_await do_the_other_thing();
// wait for the promise to complete
auto res = co_await p;
co_return res;
}
Tasks
Tasks are lazy, which means they won’t do anything before awaited or spawned.
cobalt::task<int> my_task()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the task here
auto t = my_task();
// do something else here first
co_await do_the_other_thing();
// start and wait for the task to complete
auto res = co_await t;
co_return res;
}
Generator
A generator is the only type in cobalt that can co_yield values.
Generator are eager by default. Unlike std::generator the cobalt::generator can co_await and thus is asynchronous.
cobalt::generator<int> my_generator()
{
for (int i = 0; i < 10; i++)
co_yield i;
co_return 10;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator();
while (g)
printf("Generator %d\n", co_await g);
co_return 0;
}
Values can be pushed into the generator, that will be returned from the co_yield.
An eager generator will produce the first result before being awaited. That is, when we call co_await g(4) the result from the initial co_yield is ready. The generator then receives the value 4 that got passed in as a return from the co_yield, and processes it.
cobalt::generator<std::string, int> my_eager_push_generator(int value)
{
while (value != 0)
value = co_yield std::to_string(value);
co_return "";
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(5);
assert("5" == co_await g(4)); // result of 5
assert("4" == co_await g(3)); // result of 4
assert("3" == co_await g(2)); // result of 3
assert("2" == co_await g(1)); // result of 2
assert("1" == co_await g(0)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
A coroutine can also be made lazy using this_coro::initial.
A lazy generator does wait for the first co_await to start work, i.e. it suspends when initial gets awaited. The data processing is not a step ahead, but happens when co_await-ed.
cobalt::generator<std::string, int> my_eager_push_generator()
{
auto value = co_await this_coro::initial;
while (value != 0)
value = co_yield std::to_string(value);
co_return "";
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(); // lazy, so the generator waits for the first pushed value
assert("5" == co_await g(5)); // result of 5
assert("4" == co_await g(4)); // result of 4
assert("3" == co_await g(3)); // result of 3
assert("2" == co_await g(2)); // result of 2
assert("1" == co_await g(1)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
join
If multiple awaitables work in parallel they can be awaited simultaneously with join.
cobalt::promise<int> some_work();
cobalt::promise<double> more_work();
cobalt::main co_main(int argc, char * argv[])
{
std::tuple<int, double> res = cobalt::join(some_work(), more_work());
co_return 0;
}
race
If multiple awaitables work in parallel, but we want to be notified if either completes, we shall use race.
cobalt::generator<int> some_data_source();
cobalt::generator<double> another_data_source();
cobalt::main co_main(int argc, char * argv[])
{
auto g1 = some_data_source();
auto g2 = another_data_source();
int res1 = co_await g1;
double res2 = co_await g2;
printf("Result: %f", res1 * res2);
while (g1 && g2)
{
switch(variant2::variant<int, double> nx = co_await cobalt::race(g1, g2))
{
case 0:
res1 = variant2::get<0>(nx);
break;
case 1:
res2 = variant2::get<1>(nx);
break;
}
printf("New result: %f", res1 * res2);
}
co_return 0;
}
race in this context will not cause any data loss. |
教程
delay
Let’s start with the simplest example possible: a simple delay.
cobalt::main co_main(int argc, char * argv[]) (1)
{
asio::steady_timer tim{co_await asio::this_coro::executor, (2)
std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
co_await tim.async_wait(cobalt::use_op); (4)
co_return 0; (5)
}
| 1 | The co_main function defines an implicit main when used and is the easiest way to set up an environment to run asynchronous code. |
| 2 | Take the executor from the current coroutine promise. |
| 3 | Use an argument to set the timeout |
| 4 | Perform the wait by using cobalt::use_op. |
| 5 | Return a value that gets returned from the implicit main. |
In this example we use the cobalt/main.hpp header, which provides us with a main coroutine if co_main is defined as above. This has a few advantages
-
The environment get set up correctly (
executor&memory) -
asio is signaled that the context is single threaded
-
an
asio::signal_setwithSIGINT&SIGTERMis automatically connected to cancellations (i.e.Ctrl+Ccauses cancellations)
This coroutine then has an executor in its promise (the promise the C++ name for a coroutine state. Not to be confused with cobalt/promise.hpp) which we can obtain through the dummy-awaitables in the this_coro namespace.
echo server
We’ll be using the use_op (asio completion) token everywhere, so we’re using a default completion token, so that we can skip the last parameters.
namespace cobalt = boost::cobalt;
namespace this_coro = boost::cobalt::this_coro;
We’re writing the echo function as a promise coroutine. It’s an eager coroutine and recommended as the default; in case a lazy coro is needed, task is available.
cobalt::promise<void> echo(cobalt::io::stream_socket socket)
{
try (1)
{
char data[4096];
while (socket.is_open()) (2)
{
std::size_t n = co_await socket.read_some(boost::asio::buffer(data)); (3)
co_await cobalt::io::write(socket, boost::asio::buffer(data, n)); (4)
}
}
catch (std::exception& e)
{
std::printf("echo: exception: %s\n", e.what());
}
}
| 1 | When using the use_op completion token, I/O errors are translated into C++ exceptions. Additionally, if the coroutine gets cancelled (e.g. because the user hit Ctrl-C), an exception will be raised, too. Under these conditions, we print the error and exit the loop. |
| 2 | We run the loop until we get cancelled (exception) or the user closes the connection. |
| 3 | Read as much as is available. |
| 4 | Write all the read bytes. |
Note that promise is eager. Calling echo will immediately execute code until async_read_some and then return control to the caller.
Next, we also need an acceptor function. Here, we’re using a generator to manage the acceptor state. This is a coroutine that can be co_awaited multiple times, until a co_return expression is reached.
cobalt::generator<cobalt::io::stream_socket> listen()
{
cobalt::io::acceptor acceptor(cobalt::io::endpoint{cobalt::io::tcp_v4, "0.0.0.0", 55555});
for (;;) (1)
{
cobalt::io::stream_socket sock = co_await acceptor.accept(); (2)
co_yield std::move(sock); (3)
}
co_return cobalt::io::stream_socket{}; (4)
}
| 1 | Cancellation will also lead to an exception here being thrown from the co_await |
| 2 | Asynchronously accept the connection |
| 3 | Yield it to the awaiting coroutine |
| 4 | co_return a value for C++ conformance. |
With those two functions we can now write the server
cobalt::promise<void> run_server(cobalt::wait_group & workers)
{
auto l = listen(); (1)
while (true)
{
if (workers.size() == 10u)
co_await workers.wait_one(); (2)
else
workers.push_back(echo(co_await l)); (3)
}
}
| 1 | Construct the listener generator coroutine. When the object is destroyed, the coroutine will be cancelled, performing all required cleanup. |
| 2 | When we have more than 10 workers, we wait for one to finish |
| 3 | Accept a new connection & launch it. |
The wait_group is used to manage the running echo functions. This class will cancel & await the running echo coroutines.
We do not need to do the same for the listener, because it will just stop on its own, when l gets destroyed. The destructor of a generator will cancel it.
Since the promise is eager, just calling it is enough to launch. We then put those promises into a wait_group which will allow us to tear down all the workers on scope exit.
cobalt::main co_main(int argc, char ** argv)
{
co_await cobalt::with(cobalt::wait_group(), &run_server); (1)
co_return 0u;
}
| 1 | Run run_server with an async scope. |
The with function shown above, will run a function with a resource such as wait_group. On scope exit with will invoke & co_await an asynchronous teardown function. This will cause all connections to be properly shutdown before co_main exists.
price ticker
To demonstrate channels and other tools, we need a certain complexity. For that purpose our project is a price ticker, that connects to https://blockchain.info. A user can then connection to localhost to query a given currency pair, like this
wscat -c localhost:8080/btc/usd
First we do the same declarations as echo-server.
using executor_type = cobalt::use_op_t::executor_with_default<cobalt::executor>;
using socket_type = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;
The next step is to write a function to connect an ssl-stream, to connect upstream
cobalt::promise<asio::ssl::stream<socket_type>> connect(
std::string host, boost::asio::ssl::context & ctx)
{
asio::ip::tcp::resolver res{cobalt::this_thread::get_executor()};
auto ep = co_await res.async_resolve(host, "https", cobalt::use_op); (1)
asio::ssl::stream<socket_type> sock{cobalt::this_thread::get_executor(), ctx};
co_await sock.next_layer().async_connect(*ep.begin()); (2)
co_await sock.async_handshake(asio::ssl::stream_base::client); (3)
co_return sock; (4)
}
| 1 | Lookup the host |
| 2 | Connect to the endpoint |
| 3 | Do the ssl handshake |
| 4 | Return the socket to the caller |
Next, we’ll need a function to do the websocket upgrade on an existing ssl-stream.
cobalt::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
ws.set_option(beast::websocket::stream_base::decorator(
[](beast::websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " cobalt-ticker");
req.set(http::field::origin,
"https://exchange.blockchain.com"); (1)
}));
co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
| 1 | blockchain.info requires this header to be set. |
| 2 | Perform the websocket handshake. |
Once the websocket is connected, we want to continuously receive json messages, for which a generator is a good choice.
cobalt::generator<json::object> json_reader(websocket_type & ws)
try
{
beast::flat_buffer buf;
while (ws.is_open()) (1)
{
auto sz = co_await ws.async_read(buf); (2)
json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
auto obj = json::parse(data);
co_yield obj.as_object(); (3)
buf.consume(sz);
}
co_return {};
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
| 1 | Keep running as long as the socket is open |
| 2 | Read a frame from the websocket |
| 3 | Parse & co_yield it as an object. |
This then needs to be connected to subscriber, for which we’ll utilize channels to pass raw json. To make life-time management easy, the subscriber will hold a shared_ptr, and the producer a weak_ptr.
using subscription = std::pair<std::string, std::weak_ptr<cobalt::channel<json::object>>>;
using subscription_channel = std::weak_ptr<cobalt::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;
The main function running the blockchain connector, operates on two inputs: data coming from the websocket and a channel to handle new subscriptions.
cobalt::promise<void> run_blockchain_info(cobalt::channel<subscription> & subc)
try
{
asio::ssl::context ctx{asio::ssl::context_base::tls_client};
websocket_type ws{co_await connect("blockchain.info", ctx)};
co_await connect_to_blockchain_info(ws); (1)
subscription_map subs;
std::list<std::string> unconfirmed;
auto rd = json_reader(ws); (2)
while (ws.is_open()) (3)
{
switch (auto msg = co_await cobalt::race(rd, subc.read()); msg.index()) (4)
{
case 0: (5)
if (auto ms = get<0>(msg);
ms.at("event") == "rejected") // invalid sub, cancel however subbed
co_await handle_rejections(unconfirmed, subs, ms);
else
co_await handle_update(unconfirmed, subs, ms, ws);
break;
case 1: // (6)
co_await handle_new_subscription(
unconfirmed, subs,
std::move(get<1>(msg)), ws);
break;
}
}
for (auto & [k ,c] : subs)
{
if (auto ptr = c.lock())
ptr->close();
}
}
catch(std::exception & e)
{
std::cerr << "Exception: " << e.what() << std::endl;
throw;
}
| 1 | Initialize the connection |
| 2 | Instantiate the json_reader |
| 3 | Run as long as the websocket is open |
| 4 | Select, i.e. wait for either a new json message or subscription |
| 5 | When it’s a json handle an update or a rejection |
| 6 | Handle new subscription messages |
The handle_* function’s contents are not as important for the cobalt functionality, so it’s skipped in this tutorial.
The handle_new_subscription function sends a message to the blockchain.info, which will send a confirmation or rejection back. The handle_rejection and handle_update will take the json values and forward them to the subscription channel.
On the consumer side, our server will just forward data to the client. If the client inputs data, we’ll close the websocket immediately. We’re using as_tuple to ignore potential errors.
cobalt::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
system::error_code ec;
co_await st.async_read(buf, asio::as_tuple(cobalt::use_op));
co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(cobalt::use_op));
st.next_layer().close(ec);
}
Next, we’re running the session that the users sends
cobalt::promise<void> run_session(beast::websocket::stream<socket_type> st,
cobalt::channel<subscription> & subc)
try
{
http::request<http::empty_body> req;
beast::flat_buffer buf;
co_await http::async_read(st.next_layer(), buf, req); (1)
// check the target
auto r = urls::parse_uri_reference(req.target());
if (r.has_error() || (r->segments().size() != 2u)) (2)
{
http::response<http::string_body> res{http::status::bad_request, 11};
res.body() = r.has_error() ? r.error().message() :
"url needs two segments, e.g. /btc/usd";
co_await http::async_write(st.next_layer(), res);
st.next_layer().close();
co_return ;
}
co_await st.async_accept(req); (3)
auto sym = std::string(r->segments().front()) + "-" +
std::string(r->segments().back());
boost::algorithm::to_upper(sym);
// close when data gets sent
auto p = read_and_close(st, std::move(buf)); (4)
auto ptr = std::make_shared<cobalt::channel<json::object>>(1u); (5)
co_await subc.write(subscription{sym, ptr}); (6)
while (ptr->is_open() && st.is_open()) (7)
{
auto bb = json::serialize(co_await ptr->read());
co_await st.async_write(asio::buffer(bb));
}
co_await st.async_close(beast::websocket::close_code::going_away,
asio::as_tuple(cobalt::use_op)); (8)
st.next_layer().close();
co_await p; (9)
}
catch(std::exception & e)
{
std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
| 1 | Read the http request, because we want the path |
| 2 | Check the path, e.g. /btc/usd. |
| 3 | Accept the websocket |
| 4 | Start reading & close if the consumer sends something |
| 5 | Create the channel to receive updates |
| 6 | Send a subscription requests to run_blockchain_info |
| 7 | While the channel & websocket are open, we’re forwarding data. |
| 8 | Close the socket & ignore the error |
| 9 | Since the websocket is surely closed by now, wait for the read_and_close to close. |
With run_session and run_blockchain_info written, we can not move on to main
cobalt::main co_main(int argc, char * argv[])
{
acceptor_type acc{co_await cobalt::this_coro::executor,
asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
std::cout << "Listening on localhost:8080" << std::endl;
constexpr int limit = 10; // allow 10 ongoing sessions
cobalt::channel<subscription> sub_manager; (1)
co_await join( (2)
run_blockchain_info(sub_manager),
cobalt::with( (3)
cobalt::wait_group(
asio::cancellation_type::all,
asio::cancellation_type::all),
[&](cobalt::wait_group & sessions) -> cobalt::promise<void>
{
while (!co_await cobalt::this_coro::cancelled) (4)
{
if (sessions.size() >= limit) (5)
co_await sessions.wait_one();
auto conn = co_await acc.async_accept(); (6)
sessions.push_back( (7)
run_session(
beast::websocket::stream<socket_type>{std::move(conn)},
sub_manager));
}
})
);
co_return 0;
}
| 1 | Create the channel to manage subscriptions |
| 2 | Use join to run both tasks in parallel. |
| 3 | Use an cobalt scope to provide a wait_group. |
| 4 | Run until cancelled. |
| 5 | When we’ve reached the limit we wait for one task to complete. |
| 6 | Wait for a new connection. |
| 7 | Insert the session into the wait_group. |
Main is using join because one task failing should cancel the other one.
delay op
We’ve used the use_op so far, to use an implicit operation based on asio’s completion token mechanic.
We can however implement our own ops, that can also utilize the await_ready optimization. Unlike immediate completion, the coroutine will never suspend when await_ready returns true.
To leverage this coroutine feature, cobalt provides an easy way to create a skipable operation
struct wait_op final : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
void ready(cobalt::handler<system::error_code> h ) override (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) override (3)
{
tim.async_wait(std::move(complete));
}
};
cobalt::main co_main(int argc, char * argv[])
{
asio::steady_timer tim{co_await asio::this_coro::executor,
std::chrono::milliseconds(std::stoi(argv[1]))};
co_await wait_op(tim); (4)
co_return 0; //
}
| 1 | Declare the op. We inherit op to make it awaitable. |
| 2 | The pre-suspend check is implemented here |
| 3 | Do the wait if we need to |
| 4 | Use the op just like any other awaitable. |
This way we can minimize the amounts of coroutine suspensions.
While the above is used with asio, you can also use these handlers with any other callback based code.
带有 push 值的生成器
Coroutines with push values are not as common, but can simplify certain issues significantly.
Since we’ve already got a json_reader in the previous example, here’s how we can write a json_writer that gets values pushed in.
The advantage of using a generator is the internal state management.
cobalt::generator<system::error_code, json::object>
json_writer(websocket_type & ws)
try
{
char buffer[4096];
json::serializer ser;
while (ws.is_open()) (1)
{
auto val = co_yield system::error_code{}; (2)
while (!ser.done())
{
auto sv = ser.read(buffer);
co_await ws.cobalt_write({sv.data(), sv.size()}); (3)
}
}
co_return {};
}
catch (system::system_error& e)
{
co_return e.code();
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
| 1 | Keep running as long as the socket is open |
| 2 | co_yield the current error and retrieve a new value. |
| 3 | Write a frame to the websocket |
Now we can use the generator like this
auto g = json_writer(my_ws);
extern std::vector<json::value> to_write;
for (auto && tw : std::move(to_write))
{
if (auto ec = co_await g(std::move(tw)))
return ec; // yield error
}
高级示例
More examples are provided in the repository as code only. All examples are listed below.
An http client that performs a single http get request. |
|
Using the |
|
Using nanobind to integrate cobalt with python. It uses python’s asyncio as executor and allows C++ to co_await python functions et vice versa. |
|
Adopting |
|
Creating a |
|
Using worker threads with |
|
Using an |
|
The example used by the delay section |
|
The example used by the delay op section |
|
The example used by the echo server section |
|
The example used by the price ticker section |
|
The example used by the channel reference |
设计
概念
This library has two fundamental concepts
-
协程
An awaitable is an expression that can be used with co_await from within a coroutine, e.g.
co_await delay(50ms);
However, a coroutine promise can define an await_transform, i.e. what is actually valid to use with co_await expression depends on the coroutine.
Thus, we should redefine what an awaitable is: An awaitable is a type that can be co_await-ed from within a coroutine, which promise does not define await_transform.
A pseudo-keyword is a type that can be used in a coroutines that is adds special functionality for it due to its promise await_transform.
All the verbs in the this_coro namespace are such pseudo-keywords.
auto exec = co_await this_coro::executor;
This library exposes a set of enable_* base classes for promises, to make the creation of custom coroutines easy. This includes the enable_awaitables, which provides an await_transform that just forward awaitables. |
A coroutine in the context of this documentation refers to an asynchronous coroutine, i.e. synchronous coroutines like std::generator are not considered.
All coroutines except main are also actual awaitables.
执行器
Since everything is asynchronous the library needs to use an event-loop. Because everything is single-threaded, it can be assumed that there is exactly one executor per thread, which will suffice for 97% of use-cases. Therefore, there is a thread_local executor that gets used as default by the coroutine objects (although stored by copy in the coroutine promise).
Likewise, there is one executor type used by the library, which defaults to asio::any_io_executor.
If you write your own coroutine, it should hold a copy of the executor, and have a get_executor function returning it by const reference. |
使用 Strands
While strands can be used, they are not compatible with the thread_local executor. This is because they might switch threads, thus they can’t be thread_local.
If you wish to use strands (e.g. through a spawn) the executor for any promise, generator or channel must be assigned manually.
In the case of a channel this is a constructor argument, but for the other coroutine types, asio::executor_arg needs to be used. This is done by having asio::executor_arg_t (somewhere) in the argument list directly followed by the executor to be used in the argument list of the coroutine, e.g.
cobalt::promise<void> example_with_executor(int some_arg, asio::executor_arg_t, cobalt::executor);
This way the coroutine-promise can pick up the executor from the third argument, instead of defaulting to the thread_local one.
The arguments can of course be defaulted, to make them less inconvenient, if they are sometimes with a thread_local executor.
cobalt::promise<void> example_with_executor(int some_arg,
asio::executor_arg_t = asio::executor_arg,
cobalt::executor = cobalt::this_thread::get_executor());
If this gets omitted on a strand an exception of type asio::bad_allocator is thrown, or - worse - the wrong executor is used.
多态内存资源
Similarly, the library uses a thread_local pmr::memory_resource to allocate coroutine frames & to use as allocator on asynchronous operations.
The reason is, that users may want to customize allocations, e.g. to avoid locks, limit memory usage or monitor usage. pmr allows us to achieve this without introducing unnecessary template parameters, i.e. no promise<T, Allocator> complexity. Using pmr however does introduce some minimal overheads, so a user has the option to disable by defining BOOST_COBALT_NO_PMR.
op uses an internal resource optimized for asio’s allocator usages and gather, race and join use a monotonic resource to miminize allocations. Both still work with BOOST_COBALT_NO_PMR defined, in which case they’ll use new/delete as upstream allocations.
如果您编写自己的协程,它应该有一个返回 pmr::polymorphic_allocator<void> 的 get_allocator 函数。 |
取消
cobalt 使用基于 asio::cancellation_signal 的隐式取消。这主要被隐式使用(例如,与 race 一起),因此在示例中很少有显式使用。
如果您编写自定义协程,它必须从 get_cancellation_slot 函数返回一个 cancellation_slot,以便能够取消其他操作。 |
| 如果您编写自定义 awaitable,它可以在 await_suspend 中使用该函数来接收取消信号。 |
Promise
主要的协程类型是 promise,它是 eager 的。默认使用它的原因是,编译器可以优化掉不挂起的 promise,如下所示
cobalt::promise<void> noop()
{
co_return;
}
理论上,await 上面的操作是一个 noop,但实际上,截至 2023 年,编译器尚未实现。
Race
最重要的同步机制是 race 函数。
它以伪随机顺序 await 多个 awaitable,并返回第一个完成的结果,然后忽略其余的。
也就是说,它以伪随机的顺序启动 co_await,一旦发现一个 awaitable 已准备好或立即完成,就会停止。
cobalt::generator<int> gen1();
cobalt::generator<double> gen2();
cobalt::promise<void> p()
{
auto g1 = gen1();
auto g2 = gen2();
while (!co_await cobalt::this_coro::cancelled)
{
switch(auto v = co_await race(g1, g2); v.index())
{
case 0:
printf("Got int %d\n", get<0>(v));
break;
case 1:
printf("Got double %f\n", get<1>(v));
break;
}
}
}
race 是触发取消的首选方式,例如
cobalt::promise<void> timeout();
cobalt::promise<void> work();
race(timeout(), work());
interrupt_await
如果它朴素地取消,它会丢失数据。因此,引入了 interrupt_await 的概念,它告诉支持它的 awaitable 立即恢复 awaiter 并返回或抛出一个被忽略的值。
struct awaitable
{
bool await_ready() const;
template<typename Promise>
std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
T await_resume();
void interrupt_await() &;
};
如果 interrupt_await 未能立即恢复 (h),race 将发送一个取消信号。
race 以正确的引用限定符应用这些
auto g = gen1();
race(g, gen2());
如果可用,上面将调用 g1 的 interrupt_await() & 函数和 g2 的 interrupt_await() && 函数。
总的来说,cobalt 中的协程支持 lvalue 中断,即 interrupt_await() &。 channel 操作是未限定的,即在两种情况下都有效。 |
Associators
cobalt 使用 asio 的 associator 概念,但对其进行了简化。也就是说,它有三个 associator,它们是 await promise 的成员函数。
-
const executor_type & get_executor()(始终是executor,必须返回 const 引用) -
allocator_type get_allocator()(始终是pmr::polymorphic_allocator<void>) -
cancellation_slot_type get_cancellation_slot()(必须具有与asio::cancellation_slot相同的 IF)
cobalt 使用 concepts 来检查这些是否存在于其 await_suspend 函数中。
这样,自定义协程就可以支持取消、执行器和分配器。
在自定义 awaitable 中,您可以通过以下方式获取它们
struct my_awaitable
{
bool await_ready();
template<typename T>
void await_suspend(std::corutine_handle<P> h)
{
if constexpr (requires (Promise p) {p.get_executor();})
handle_executor(h.promise().get_executor();
if constexpr (requires (Promise p) {p.get_cancellation_slot();})
if ((cl = h.promise().get_cancellation_slot()).is_connected())
cl.emplace<my_cancellation>();
}
void await_resume();
};
取消连接在 co_await 表达式中(如果由协程 & awaitable 支持),包括同步机制,如 race。
Threading
主要的技术原因是,切换协程最有效的方法是从 await_suspend 返回新协程的句柄,如下所示
struct my_awaitable
{
bool await_ready();
std::coroutine_handle<T> await_suspend(std::coroutine_handle<U>);
void await_resume();
};
在这种情况下,awaiting coroutine 将在调用 await_suspend 之前挂起,并且返回的 coroutine 将被恢复。当然,如果我们必须通过执行器,这将不起作用。
这不仅适用于 await-ed coroutine,也适用于 channel。此库中的 channel 使用 awaitable 的侵入式列表,并可能从 write_operation 的 await_suspend 返回读取(因此挂起)协程的句柄。
I/O
cobalt::io 命名空间提供了一个 boost.asio 的包装器库。它被简化并编译以加速开发和编译时间。
参考
cobalt/main.hpp
使用 cobalt 应用程序最简单的方法是使用具有以下签名的 co_main 函数
cobalt::main co_main(int argc, char *argv[]);
声明 co_main 将添加一个 main 函数,该函数执行所有必要的步骤来在事件循环上运行协程。这使我们能够编写非常简单的异步程序。
cobalt::main co_main(int argc, char *argv[])
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
| 1 | 获取正在运行的 main 的执行器 |
| 2 | 将其与 asio 对象一起使用 |
| 3 | await 一个 cobalt 操作 |
主 promise 将创建一个 asio::signal_set 并将其用于取消。 SIGINT 变为总取消,而 SIGTERM 变为终端取消。
| 取消不会转发给分离的协程。用户需要注意在取消时结束它们,因为否则程序不允许优雅终止。 |
执行器
它还将创建一个 asio::io_context 来运行,您可以通过 this_coro::executor 获取它。它将被分配给 cobalt::this_thread::get_executor()。
Memory Resource
它还创建一个内存资源,将用作内部内存分配的默认值。它将被分配给 thread_local 的 cobalt::this_thread::get_default_resource()。
Promise
每个协程都有一个内部状态,称为 promise(不要与 cobalt::promise 混淆)。根据协程的属性,可以 await 不同的内容,就像我们在上面的示例中所用的那样。
它们是通过继承实现的,并在不同的 promise 类型之间共享
主 promise 具有以下属性。
规范
-
声明
co_main将隐式声明一个main函数 -
main仅在定义co_main时存在。 -
SIGINT和SIGTERM将导致内部任务被取消。
cobalt/promise.hpp
promise 是一个 eager 协程,可以 await 和 co_return 值。也就是说,它不能使用 co_yield。
cobalt::promise<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
promise 默认是 attached 的。这意味着,当 promise 句柄超出范围时,将发送取消信号。
可以通过调用 detach 或使用前缀 + 运算符来分离 promise。这是使用 detached 的运行时替代方法。分离的 promise 在销毁时不会发送取消信号。
cobalt::promise<void> my_task();
cobalt::main co_main(int argc, char *argv[])
{
+my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
| 1 | 通过使用 +,任务将被分离。没有它,编译器会生成一个 nodiscard 警告。 |
执行器
除非在任何位置使用 asio::executor_arg 后跟执行器参数,否则执行器将从 thread_local get_executor 函数获取。
cobalt::promise<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
Memory Resource
除非在任何位置使用 std::allocator_arg 后跟 polymorphic_allocator 参数,否则内存资源将从 thread_local get_default_resource 函数获取。
cobalt::promise<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
Outline
template<typename Return>
struct [[nodiscard]] promise
{
promise(promise &&lhs) noexcept;
promise& operator=(promise && lhs) noexcept;
// enable `co_await`. (1)
auto operator co_await ();
// Ignore the return value, i.e. detach it. (2)
void operator +() &&;
// Cancel the promise.
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if the result is ready
bool ready() const;
// Check if the promise can be awaited.
explicit operator bool () const; (3)
// Detach or attach
bool attached() const;
void detach();
void attach();
// Create an already completed promimse
static promise
// Get the return value. If !ready() this function has undefined behaviour.
Return get();
};
| 1 | 支持 中断等待 |
| 2 | 这允许使用简单的 +my_task() 表达式创建并行运行的 promise。 |
| 3 | 这允许像 while (p) co_await p; 这样的代码 |
Promise
协程 promise (promise::promise_type) 具有以下属性。
-
延迟 await == cobalt/generator.hpp
generator 是一个 eager 协程,可以 await 并向调用者 yield 值。也就是说,它不能使用 co_yield。
cobalt::generator<int> example()
{
printf("In coro 1\n");
co_yield 2;
printf("In coro 3\n");
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n");
printf("In main %d\n", co_await f);
printf("In main %d\n", co_await f);
return 0;
}
这将生成以下输出
In main 0 In coro 1 In main 1 In main 2 In coro 3 In main 4
当 Push(第二个模板参数)设置为非 void 时,可以将值推送到 generator 中
cobalt::generator<int, int> example()
{
printf("In coro 1\n");
int i = co_yield 2;
printf("In coro %d\n", i);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main %d\n", co_await f(3)); (1)
co_return 0;
}
| 1 | 推入的值通过 operator() 传递给 co_yield 的结果。 |
这将生成以下输出
In main 0 In coro 1 In main 2 In coro 3
Lazy
通过 await 初始值,generator 可以变为 lazy。这个 co_await 表达式将产生 Push 值。这意味着 generator 将等待直到它第一次被 await,然后处理新推入的值并在下一个 co_yield 处恢复。
cobalt::generator<int, int> example()
{
int v = co_await cobalt::this_coro::initial;
printf("In coro %d\n", v);
co_yield 2;
printf("In coro %d\n", v);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n"); // < this is now before the co_await initial
printf("In main %d\n", co_await f(1));
printf("In main %d\n", co_await f(3));
return 0;
}
这将生成以下输出
In main 0 In main 1 In coro 1 In main 2 In coro 3 In main 4
执行器
除非在任何位置使用 asio::executor_arg 后跟执行器参数,否则执行器将从 thread_local get_executor 函数获取。
cobalt::generator<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
Memory Resource
除非在任何位置使用 std::allocator_arg 后跟 polymorphic_allocator 参数,否则内存资源将从 thread_local get_default_resource 函数获取。
cobalt::generator<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
Outline
template<typename Yield, typename Push = void>
struct [[nodiscard]] generator
{
// Movable
generator(generator &&lhs) noexcept = default;
generator& operator=(generator &&) noexcept;
// True until it co_returns & is co_awaited after (1)
explicit operator bool() const;
// Cancel the generator. (3)
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if a value is available
bool ready() const;
// Get the returned value. If !ready() this function has undefined behaviour.
Yield get();
// Cancel & detach the generator.
~generator();
// an awaitable that results in value of Yield.
using generator_awaitable = unspecified;
// Present when Push != void
generator_awaitable operator()( Push && push);
generator_awaitable operator()(const Push & push);
// Present when Push == void, i.e. can co_await the generator directly.
generator_awaitable operator co_await (); (2)
};
| 1 | 这允许像 while (gen) co_await gen: 这样的代码 |
| 2 | 支持 中断等待 |
| 3 | 取消的 generator 可能是可恢复的 |
| eager generator 的销毁将被推迟,lazy 将立即销毁。 |
Promise
generator promise 具有以下属性。
cobalt/task.hpp
task 是一个 lazy 协程,可以 await 和 co_return 值。也就是说,它不能使用 co_yield。
cobalt::task<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
与 promise 不同,task 可以被 await 或 spawn 到比创建时不同的执行器上。
执行器
由于 task 是 lazy 的,因此它不需要在构造时具有执行器。它会尝试从调用者或 awaiter(如果存在)获取。否则,它将默认为 thread_local 执行器。
Memory Resource
内存资源 **不** 从 thread_local get_default_resource 函数获取,而是从 pmr::get_default_resource() 获取,除非 std::allocator_arg 出现在任何位置,后跟 polymorphic_allocator 参数。
cobalt::task<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
Outline
template<typename Return>
struct [[nodiscard]] task
{
task(task &&lhs) noexcept = default;
task& operator=(task &&) noexcept = default;
// enable `co_await`
auto operator co_await ();
};
可以通过调用 run(my_task()) 从同步函数同步使用任务。 |
Promise
task promise 具有以下属性。
cobalt/detached.hpp
detached 是一个 eager 协程,可以 await 但不能 co_return 值。也就是说,它不能被恢复,通常也不会被 await。
cobalt::detached delayed_print(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
printf("Hello world\n");
}
cobalt::main co_main(int argc, char *argv[])
{
delayed_print();
co_return 0;
}
Detached 用于轻松地在后台运行协程。
cobalt::detached my_task();
cobalt::main co_main(int argc, char *argv[])
{
my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
| 1 | 分离 coroutine。 |
detached 可以像这样分配一个新的取消源
cobalt::detached my_task(asio::cancellation_slot sl)
{
co_await this_coro::reset_cancellation_source(sl);
// do somework
}
cobalt::main co_main(int argc, char *argv[])
{
asio::cancellation_signal sig;
my_task(sig.slot()); (1)
co_await delay(std::chrono::milliseconds(50));
sig.emit(asio::cancellation_type::all);
co_return 0;
}
执行器
除非在任何位置使用 asio::executor_arg 后跟执行器参数,否则执行器将从 thread_local get_executor 函数获取。
cobalt::detached my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
Memory Resource
除非在任何位置使用 std::allocator_arg 后跟 polymorphic_allocator 参数,否则内存资源将从 thread_local get_default_resource 函数获取。
cobalt::detached my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
use_op
use_op token 是创建 op 的直接方式,即使用 cobalt::use_op 作为完成 token 将创建所需的 awaitable。
auto tim = cobalt::use_op.as_default_on(asio::steady_timer{co_await cobalt::this_coro::executor});
co_await tim.async_wait();
根据完成签名,co_await 表达式可能会抛出异常。
| 签名 | 返回类型 | 异常 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
任何异常 |
|
|
任何异常 |
use_op 永远不会立即完成,即 await_ready 始终返回 false,但总是挂起协程。 |
Hand coded Operations
操作是 [cobalt_operation] 功能的更高级实现。
此库可以轻松创建具有早期完成条件的异步操作,即避免协程完全挂起的条件。
例如,我们可以创建一个 wait_op,如果计时器已过期则什么也不做。
struct wait_op : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
bool ready(cobalt::handler<system::error_code> ) (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) (3)
{
tim.async_wait(std::move(complete));
}
};
| 1 | 继承 op 并具有匹配的签名 await_transform 会拾取它 |
| 2 | 检查操作是否准备好 - 从 await_ready 调用 |
| 3 | 如果操作未准备好,则启动操作。 |
cobalt/concepts.hpp
Awaitable
awaitable 是一个可以使用 co_await 的表达式。
template<typename Awaitable, typename Promise = void>
concept awaitable_type = requires (Awaitable aw, std::coroutine_handle<Promise> h)
{
{aw.await_ready()} -> std::convertible_to<bool>;
{aw.await_suspend(h)};
{aw.await_resume()};
};
template<typename Awaitable, typename Promise = void>
concept awaitable =
awaitable_type<Awaitable, Promise>
|| requires (Awaitable && aw) { {std::forward<Awaitable>(aw).operator co_await()} -> awaitable_type<Promise>;}
|| requires (Awaitable && aw) { {operator co_await(std::forward<Awaitable>(aw))} -> awaitable_type<Promise>;};
awaitables 在此库中要求协程 promise 返回其执行器(通过 const 引用),如果它们提供的话。否则,它将使用 this_thread::get_executor()。 |
Enable awaitables
继承 enable_awaitables 将使协程能够通过 await_transform await 任何在没有 await_transform 的情况下可 await 的内容。
cobalt/this_coro.hpp
this_coro 命名空间提供实用程序来访问协程 promise 的内部状态。
伪 awaitables
// Awaitable type that returns the executor of the current coroutine.
struct executor_t {}
constexpr executor_t executor;
// Awaitable type that returns the cancellation state of the current coroutine.
struct cancellation_state_t {};
constexpr cancellation_state_t cancellation_state;
// Reset the cancellation state with custom or default filters.
constexpr unspecified reset_cancellation_state();
template<typename Filter>
constexpr unspecified reset_cancellation_state(
Filter && filter);
template<typename InFilter, typename OutFilter>
constexpr unspecified reset_cancellation_state(
InFilter && in_filter,
OutFilter && out_filter);
// get & set the throw_if_cancelled setting.
unspecified throw_if_cancelled();
unspecified throw_if_cancelled(bool value);
// Set the cancellation source in a detached.
unspecified reset_cancellation_source();
unspecified reset_cancellation_source(asio::cancellation_slot slot);
// get the allocator the promise
struct allocator_t {};
constexpr allocator_t allocator;
// get the current cancellation state-type
struct cancelled_t {};
constexpr cancelled_t cancelled;
// set the over-eager mode of a generator
struct initial_t {};
constexpr initial_t initial;
Await Allocator
支持 enable_await_allocator 的协程的分配器可以通过以下方式获取
co_await cobalt::this_coro::allocator;
为了启用此功能以供您自己的协程使用,您可以继承 enable_await_allocator 并使用 CRTP 模式
struct my_promise : cobalt::enable_await_allocator<my_promise>
{
using allocator_type = __your_allocator_type__;
allocator_type get_allocator();
};
| 如果可用,分配器将由 use_op 使用 |
Await Executor
支持 enable_await_executor 的协程的分配器可以通过以下方式获取
co_await cobalt::this_coro::executor;
为了启用此功能以供您自己的协程使用,您可以继承 enable_await_executor 并使用 CRTP 模式
struct my_promise : cobalt::enable_await_executor<my_promise>
{
using executor_type = __your_executor_type__;
executor_type get_executor();
};
| 如果可用,执行器将由 use_op 使用 |
Await deferred
您的协程 promise 可以继承 enable_await_deferred,以便在 co_await 表达式中使用单个签名 asio::deferred。
由于 asio::deferred 现在是默认的完成 token,因此这允许下面的代码在不指定任何完成 token 或其他特化的情况下工作。
asio::steady_timer t{co_await cobalt::this_coro::executor};
co_await t.async_wait();
Memory resource base
promise 的 promise_memory_resource_base 基类将提供一个 get_allocator,该分配器从默认资源或 std::allocator_arg 参数后面的参数中获取。同样,它将添加 operator new 重载,以便协程使用相同的内存资源为其帧分配。
Throw if cancelled
promise_throw_if_cancelled_base 提供了基本选项,允许操作启用协程在 await 另一个实际 awaitable 时抛出异常。
co_await cobalt::this_coro::throw_if_cancelled;
Cancellation state
promise_cancellation_base 提供了基本选项,允许操作启用协程拥有一个可以通过 reset_cancellation_state 重置的取消状态。
co_await cobalt::this_coro::reset_cancellation_state();
为了方便起见,还有一个快捷方式可以检查当前取消状态
asio::cancellation_type ct = (co_await cobalt::this_coro::cancellation_state).cancelled();
asio::cancellation_type ct = co_await cobalt::this_coro::cancelled; // same as above
cobalt/this_thread.hpp
由于一切都是单线程的,该库为每个线程提供了执行器和默认内存资源。
namespace boost::cobalt::this_thread
{
pmr::memory_resource* get_default_resource() noexcept; (1)
pmr::memory_resource* set_default_resource(pmr::memory_resource* r) noexcept; (2)
pmr::polymorphic_allocator<void> get_allocator(); (3)
typename asio::io_context::executor_type & get_executor(); (4)
void set_executor(asio::io_context::executor_type exec) noexcept; (5)
}
| 1 | 获取默认资源 - 如果未设置,则为 pmr::get_default_resource |
| 2 | 设置默认资源 - 返回先前设置的资源 |
| 3 | 获取包装器分配器 (1) |
| 4 | 获取线程的执行器 - 如果未设置则抛出 |
| 5 | 设置当前线程的执行器。 |
协程将使用这些作为默认值,但会保留一份副本以备不时之需。
| 唯一的例外是初始化 cobalt 操作,它将使用 this_thread::executor 来重新抛出异常。 |
cobalt/channel.hpp
Channel 可用于在单个线程上的不同协程之间交换数据。
Outline
template<typename T>
struct channel
{
// create a channel with a buffer limit, executor & resource.
explicit
channel(std::size_t limit = 0u,
executor executor = this_thread::get_executor(),
pmr::memory_resource * resource = this_thread::get_default_resource());
// not movable.
channel(channel && rhs) noexcept = delete;
channel & operator=(channel && lhs) noexcept = delete;
using executor_type = executor;
const executor_type & get_executor();
// Closes the channel
~channel();
bool is_open() const;
// close the operation, will cancel all pending ops, too
void close();
// an awaitable that yields T
using read_op = unspecified;
// an awaitable that yields void
using write_op = unspecified;
// read a value to a channel
read_op read();
// write a value to the channel
write_op write(const T && value);
write_op write(const T & value);
write_op write( T && value);
write_op write( T & value);
// write a value to the channel if T is void
};
描述
Channel 是两个协程进行通信和同步的工具。
const std::size_t buffer_size = 2;
channel<int> ch{exec, buffer_size};
// in coroutine (1)
co_await ch.write(42);
// in coroutine (2)
auto val = co_await ch.read();
| 1 | 将值发送到 channel - 将阻塞直到可以发送 |
| 2 | 从 channel 读取值 - 将阻塞直到可以 await 值。 |
这两种操作都可能取决于 channel 的缓冲区大小而阻塞。
如果缓冲区大小为零,则 read 和 write 需要同时发生,即充当交汇点。
如果缓冲区未满,则写入操作不会挂起协程;同样,如果缓冲区未空,则读取操作不会挂起。
如果两个操作同时完成(如空缓冲区的情况),则第二个操作将被发布到执行器以供稍后完成。
channel 类型可以是 void,在这种情况下 write 不需要参数。 |
Channel 操作可以在不丢失数据的情况下取消。这使得它们可以与 race 一起使用。
generator<variant2::variant<int, double>> merge(
channel<int> & c1,
channel<double> & c2)
{
while (c1 && c2)
co_yield co_await race(c1.read(), c2.read());
}
示例
cobalt::promise<void> producer(cobalt::channel<int> & chan)
{
for (int i = 0; i < 4; i++)
co_await chan.write(i);
chan.close();
}
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
while (c.is_open())
std::cout << co_await c.read() << std::endl;
co_await p;
co_return 0;
}
此外,还提供了一个 channel_reader 以使读取 channel 更方便并与 BOOST_COBALT_FOR 一起使用。
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
std::cout << value << std::endl;
co_await p;
co_return 0;
}
cobalt/with.hpp
with 工具提供了一种执行协程异步拆卸的方法。也就是说,它就像一个异步析构函数调用。
struct my_resource
{
cobalt::promise<void> await_exit(std::exception_ptr e);
};
cobalt::promise<void> work(my_resource & res);
cobalt::promise<void> outer()
{
co_await cobalt::with(my_resource(), &work);
}
拆卸可以通过提供 await_exit 成员函数或返回 awaitable 的 tag_invoke 函数来完成,或者通过将拆卸作为第三个参数提供给 with。
using ws_stream = beast::websocket::stream<asio::ip::tcp::socket>>;
cobalt::promise<ws_stream> connect(urls::url); (1)
cobalt::promise<void> disconnect(ws_stream &ws); (2)
auto teardown(const boost::cobalt::with_exit_tag & wet , ws_stream & ws, std::exception_ptr e)
{
return disconnect(ws);
}
cobalt::promise<void> run_session(ws_stream & ws);
cobalt::main co_main(int argc, char * argv[])
{
co_await cobalt::with(co_await connect(argv[1]), &run_session, &teardown);
co_return 0;
}
| 1 | 实现 websocket 连接和 websocket 启动 |
| 2 | 实现有序关闭。 |
如果作用域在没有异常的情况下退出,则 std::exception_ptr 为 null。注意:exit 函数可以按引用接受 exception_ptr 并修改它,这是合法的。 |
cobalt/race.hpp
race 函数可用于从一组 awaitable 中 await 一个。
它可以作为可变参数函数使用多个 awaitable,或者在 awaitables 的范围内调用。
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_wait()
{
co_await cobalt::race(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::race(aws); (2)
}
| 1 | 等待一组可变参数 awaitables |
| 2 | 等待 awaitables 的向量 |
race 的第一个参数可以是 uniform random bit generator。
extern promise<void> pv1, pv2;
std::vector<promise<void>> pvv;
std::default_random_engine rdm{1};
// if everything returns void race returns the index
std::size_t r1 = co_await race(pv1, pv2);
std::size_t r2 = co_await race(rdm, pv1, pv2);
std::size_t r3 = co_await race(pvv);
std::size_t r4 = co_await race(rdm, pvv);
// variant if not everything is void. void become monostate
extern promise<int> pi1, pi2;
variant2::variant<monostate, int, int> r5 = co_await race(pv1, pi1, pi2);
variant2::variant<monostate, int, int> r6 = co_await race(rdm, pv1, pi1, pi2);
// a range returns a pair of the index and the result if non-void
std::vector<promise<int>> piv;
std::pair<std::size_t, int> r7 = co_await race(piv);
std::pair<std::size_t, int> r8 = co_await race(rdm, piv);
Interrupt Wait
当参数作为 rvalue 引用传递时,race 将尝试对 awaitable 使用 .interrupt_await 来信号 awaitable 立即完成并且结果将被忽略。如果支持,Awaitable 必须在 interrupt_await 返回之前恢复 awaiting coroutine。如果 race 未检测到该函数,它将发送一个取消信号。
这意味着您可以像这样重用 race
cobalt::promise<void> do_wait()
{
auto t1 = task1();
auto t2 = task2();
co_await cobalt::race(t1, t2); (1)
co_await cobalt::race(t1, t2); (2)
}
| 1 | 等待第一个任务完成 |
| 2 | 等待其他任务完成 |
race 将调用 awaitable 的函数,就像在 co_await 表达式中使用一样,或者根本不评估它们。
left_race
left_race 函数类似于 race,但遵循严格的从左到右扫描。这可能导致饥饿问题,因此不推荐默认使用此方法,但如果小心处理,可用于优先级排序。
Outline
// Concept for the random number generator.
template<typename G>
concept uniform_random_bit_generator =
requires ( G & g)
{
{typename std::decay_t<G>::result_type() } -> std::unsigned_integral; // is an unsigned integer type
// T Returns the smallest value that G's operator() may return. The value is strictly less than G::max(). The function must be constexpr.
{std::decay_t<G>::min()} -> std::same_as<typename std::decay_t<G>::result_type>;
// T Returns the largest value that G's operator() may return. The value is strictly greater than G::min(). The function must be constexpr.
{std::decay_t<G>::max()} -> std::same_as<typename std::decay_t<G>::result_type>;
{g()} -> std::same_as<typename std::decay_t<G>::result_type>;
} && (std::decay_t<G>::max() > std::decay_t<G>::min());
// Variadic race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, awaitable ... Promise>
awaitable race(URBG && g, Promise && ... p);
// Ranged race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, range<awaitable> PromiseRange>
awaitable race(URBG && g, PromiseRange && p);
// Variadic race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable race(Promise && ... p);
// Ranged race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable race(PromiseRange && p);
// Variadic left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable left_race(Promise && ... p);
// Ranged left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable left_race(PromiseRange && p);
| 选择空范围将导致异常。 |
cobalt/gather.hpp
gather 函数可用于一次性 await 多个 awaitables,并传递取消。
该函数将收集所有完成并将它们作为 system::result 返回,即捕获概念作为值。一个 awaitable 抛出异常不会取消其他。
它可以作为可变参数函数使用多个 Awaitable,或者在 awaitables 的范围内调用。
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_gather()
{
co_await cobalt::gather(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::gather(aws); (2)
}
| 1 | 等待一组可变参数 awaitables |
| 2 | 等待 awaitables 的向量 |
gather 将调用 awaitable 的函数,就像在 co_await 表达式中使用一样。
extern promise<void> pv1, pv2;
std::tuple<system::result<int>, system::result<int>> r1 = co_await gather(pv1, pv2);
std::vector<promise<void>> pvv;
pmr::vector<system::result<void>> r2 = co_await gather(pvv);
extern promise<int> pi1, pi2;
std::tuple<system::result<monostate>,
system::result<monostate>,
system::result<int>,
system::result<int>> r3 = co_await gather(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<system::result<int>> r4 = co_await gather(piv);
Outline
// Variadic gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable gather(Promise && ... p);
// Ranged gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable gather(PromiseRange && p);
cobalt/join.hpp
join 函数可用于一次性 await 多个 awaitable,并正确连接取消。
该函数将收集所有完成并将它们作为值返回,除非抛出异常。如果抛出异常,所有未完成的操作都将被取消(如果可能则中断),并且第一个异常将被重新抛出。
void 将作为元组中的 variant2::monostate 返回,除非所有 awaitables 都 yield void。 |
它可以作为可变参数函数使用多个 Awaitable,或者在 awaitables 的范围内调用。
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_join()
{
co_await cobalt::join(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::join(aws); (2)
}
| 1 | 等待一组可变参数 awaitables |
| 2 | 等待 awaitables 的向量 |
join 将调用 awaitable 的函数,就像在 co_await 表达式中使用一样。
extern promise<void> pv1, pv2;
/* void */ co_await join(pv1, pv2);
std::vector<promise<void>> pvv;
/* void */ co_await join(pvv);
extern promise<int> pi1, pi2;
std::tuple<monostate, monostate, int, int> r1 = co_await join(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<int> r2 = co_await join(piv);
Outline
// Variadic join
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable join(Promise && ... p);
// Ranged join
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable join(PromiseRange && p);
| 选择空范围将导致异常。 |
cobalt/wait_group.hpp
wait_group 函数可用于管理多个 promise<void> 类型的协程。它与 cobalt/with.hpp 开箱即用,通过具有匹配的 await_exit 成员。
本质上,wait_group 是一个 promise 的动态列表,它有一个 race 函数(wait_one),一个 gather 函数(wait_all),并且在作用域退出时会进行清理。
struct wait_group
{
// create a wait_group
explicit
wait_group(asio::cancellation_type normal_cancel = asio::cancellation_type::none,
asio::cancellation_type exception_cancel = asio::cancellation_type::all);
// insert a task into the group
void push_back(promise<void> p);
// the number of tasks in the group
std::size_t size() const;
// remove completed tasks without waiting (i.e. zombie tasks)
std::size_t reap();
// cancel all tasks
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// wait for one task to complete.
wait_one_op wait_one();
// wait for all tasks to complete
wait_op wait();
// wait for all tasks to complete
wait_op operator co_await ();
// when used with with , this will receive the exception
// and wait for the completion
// if ep is set, this will use the exception_cancel level,
// otherwise the normal_cancel to cancel all promises.
wait_op await_exit(std::exception_ptr ep);
};
cobalt/spawn.hpp
spawn 函数允许在 asio executor/execution_context 上运行 task 并使用 completion token 消耗结果。
auto spawn(Context & context, task<T> && t, CompletionToken&& token);
auto spawn(Executor executor, task<T> && t, CompletionToken&& token);
Spawn 将分派其启动并将完成发布出去。这使得可以使用 task 在另一个执行器上运行任务并在当前执行器上使用 use_op 消耗结果。也就是说,spawn 可用于跨线程。
示例
cobalt::task<int> work();
int main(int argc, char *argv[])
{
asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_1};
auto f = spawn(ctx, work(), asio::use_future);
ctx.run();
return f.get();
}
调用者需要确保执行器不会同时在多个线程上运行,例如,通过使用单线程 asio::io_context。 |
cobalt/run.hpp
run 函数类似于 spawn,但以同步方式运行。它将内部设置执行上下文和内存资源。
这在将 cobalt 代码片段集成到同步应用程序中时可能很有用。
Outline
// Run the task and return it's value or rethrow any exception.
T run(task<T> t);
示例
cobalt::task<int> work();
int main(int argc, char *argv[])
{
return run(work());
}
cobalt/thread.hpp
thread 类型是创建类似于 main 的环境的另一种方式,但它不使用 signal_set。
cobalt::thread my_thread()
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
| 1 | 获取正在运行的 thread 的执行器 |
| 2 | 将其与 asio 对象一起使用 |
| 3 | await 一个 cobalt 操作 |
要使用 thread,您可以像使用 std::thread 一样使用它
int main(int argc, char * argv[])
{
auto thr = my_thread();
thr.join();
return 0;
}
thread 也 是一个 awaitable(包括取消)。
cobalt::main co_main(int argc, char * argv[])
{
auto thr = my_thread();
co_await thr;
co_return 0;
}
销毁 detached thread 将导致硬停止 (io_context::stop) 并连接线程。 |
此库中的任何内容(除了 await cobalt/thread.hpp 和 cobalt/spawn.hpp)都不是线程安全的。如果您需要跨线程传输数据,您将需要像 asio::concurrent_channel 这样的线程安全实用程序。您不能在线程之间共享任何 cobalt 原语,唯一例外是能够将 task spawn 到另一个线程的执行器上。 |
执行器
它还将创建一个 asio::io_context 来运行,您可以通过 this_coro::executor 获取它。它将被分配给 cobalt::this_thread::get_executor()。
Memory Resource
它还创建一个内存资源,将用作内部内存分配的默认值。它将被分配给 thread_local 的 cobalt::this_thread::get_default_resource()。
Outline
struct thread
{
// Send a cancellation signal
void cancel(asio::cancellation_type type = asio::cancellation_type::all);
// Allow the thread to be awaited. NOOP if the thread is invalid.
auto operator co_await() &-> detail::thread_awaitable; (1)
auto operator co_await() && -> detail::thread_awaitable; (2)
// Stops the io_context & joins the executor
~thread();
/// Move constructible
thread(thread &&) noexcept = default;
using executor_type = executor;
using id = std::thread::id;
id get_id() const noexcept;
// Add the functions similar to `std::thread`
void join();
bool joinable() const;
void detach();
executor_type get_executor() const;
};
| 1 | 支持 中断等待 |
| 2 | 始终转发取消 |
Promise
thread promise 具有以下属性。
cobalt/result.hpp
Awaitables 可以修改为返回 system::result 或 std::tuple 而不是使用异常。
// value only
T res = co_await foo();
// as result
system::result<T, std::exception_ptr> res = co_await cobalt::as_result(foo());
// as tuple
std::tuple<std::exception_ptr, T> res = co_await cobalt::as_tuple(foo());
Awaitables 还可以提供自定义结果和元组处理方式,通过提供使用 cobalt::as_result_tag 和 cobalt::as_tuple_tag 的 await_resume 重载。
your_result_type await_resume(cobalt::as_result_tag);
your_tuple_type await_resume(cobalt::as_tuple_tag);
// example of an op with result system::error_code, std::size_t
system::result<std::size_t> await_resume(cobalt::as_result_tag);
std::tuple<system::error_code, std::size_t> await_resume(cobalt::as_tuple_tag);
| Awaitables 仍然允许抛出异常,例如,对于 OOM 等严重异常。 |
cobalt/async_for.hpp
对于像 generators 这样的类型,提供了一个 BOOST_COBALT_FOR 宏来模拟 for co_await 循环。
cobalt::generator<int> gen();
cobalt::main co_main(int argc, char * argv[])
{
BOOST_COBALT_FOR(auto i, gen())
printf("Generated value %d\n", i);
co_return 0;
}
cobalt/error.hpp
为了使错误更容易管理,cobalt 提供了一个 error_category 以与 boost::system::error_code 一起使用。
enum class error
{
moved_from,
detached,
completed_unexpected,
wait_not_ready,
already_awaited,
allocation_failed
};
system::error_category & cobalt_category();
system::error_code make_error_code(error e);
cobalt/composition.hpp
包含 cobalt/composition.hpp 提供了将任何函数(其最后一个参数为 cobalt::completion_handler)定义的定义。这允许简单地创建复合操作。
co_await 语句的结果会自动转换为元组(类似于使用的 as_tuple),以避免异常。
结果需要被 co_return。传递给协程的 Handler 不得被触碰。
struct echo_op final : op<system::error_code>
{
echo_op(cobalt::io::stream & str) : stream(str) {}
void initiate(completion_handler<system::error_code>) final;
cobalt::io::stream & str;
};
void echo_op::initiate(completion_handler<system::error_code, std::size_t>)
{
char buf[4096];
auto [ec, n] = co_await str.read_some(buf);
auto buf = asio::buffer(buf, n);
while (!ec && !buf.empty() && !!co_await this_coro::cancelled)
{
std::tie(ec, n) = co_await str.write_some(buf);
buf += n;
}
if (!!co_await this_coro::cancelled)
co_return {asio::error::operation_aborted, m};
else
co_return {system::error_code{}, m};
}
cobalt/config.hpp
config adder 允许配置 boost.cobalt 的一些实现细节。
executor_type
executor 类型默认为 boost::asio::any_io_executor。
您可以将其设置为 boost::asio::any_io_executor,方法是定义 BOOST_COBALT_CUSTOM_EXECUTOR 并自己添加 boost::cobalt::executor 类型。
或者,可以定义 BOOST_COBALT_USE_IO_CONTEXT 将 executor 设置为 boost::asio::io_context::executor_type。
pmr
Boost.cobalt 可与不同的 pmr 实现一起使用,默认使用 std::pmr。
可以使用以下宏进行配置
-
BOOST_COBALT_USE_STD_PMR -
BOOST_COBALT_USE_BOOST_CONTAINER_PMR -
BOOST_COBALT_USE_CUSTOM_PMR
如果定义了 BOOST_COBALT_USE_CUSTOM_PMR,您将需要提供一个 boost::cobalt::pmr 命名空间,它是 std::pmr 的即插即用替代品。
或者,可以通过以下方式禁用 pmr 使用
-
BOOST_COBALT_NO_PMR.
use_op 使用一个小型缓冲区优化的资源,其大小可以通过定义 BOOST_COBALT_SBO_BUFFER_SIZE 来设置,默认为 4096 字节。
cobalt/io/buffer.hpp
缓冲区头文件为 io 函数提供了通用的缓冲区序列表示。它实现为单个缓冲区后跟一个缓冲区 span。这允许从前面删除字节,并使用单个 buffer、registered_buffer。
const_buffer_sequence 实现了一个 ConstBufferSequence,而 mutable_buffer_sequence 实现了一个 MutableBufferSequence。
namespace io
{
// Aliases for the asio functions.
using asio::buffer;
using asio::buffer_copy;
using asio::buffer_size;
}
mutable_buffer_sequence
namespace io
{
using asio::buffer;
using asio::mutable_buffer;
struct mutable_buffer_sequence
{
std::size_t buffer_count() const;
mutable_buffer_sequence(asio::mutable_registered_buffer buffer = {});
mutable_buffer_sequence(asio::mutable_buffer head);
template<typename T>
requires (std::constructible_from<std::span<const asio::mutable_buffer>, T&&>)
mutable_buffer_sequence(T && value);
mutable_buffer_sequence(std::span<const asio::mutable_buffer> spn);
// drop n bytes from the buffer sequence
mutable_buffer_sequence & operator+=(std::size_t n);
// a random access iterator over the buffers
struct const_iterator;
const_iterator begin() const;
const_iterator end() const;
// is this a registered buffer
bool is_registered() const;
};
// Invokes Func either with the mutable_buffer_sequence, an `asio::mutable_registered_buffer` or a `asio::mutable_buffer`.
template<typename Func>
auto visit(const mutable_buffer_sequence & seq, Func && func);
const_buffer_sequence
using asio::const_buffer;
struct const_buffer_sequence
{
std::size_t buffer_count();
const_buffer_sequence(asio::const_registered_buffer buffer);
const_buffer_sequence(asio::mutable_registered_buffer buffer);
const_buffer_sequence(asio::const_buffer head) { this->head_ = head; }
const_buffer_sequence(asio::mutable_buffer head) { this->head_ = head; }
template<typename T>
requires (std::constructible_from<std::span<const asio::const_buffer>, T&&>)
const_buffer_sequence(T && value);
const_buffer_sequence(std::span<const asio::const_buffer> spn);
// drop bytes from the front, i.e. advance the buffer
const_buffer_sequence & operator+=(std::size_t n);
// a random access iterator over the buffers
struct const_iterator;
// Access the sequence as a range.
const_iterator begin() const;
const_iterator end() const;
// is this a registered buffer
bool is_registered() const;
};
// Invokes Func either with the const_buffer_sequence, an `asio::const_registered_buffer` or a `asio::const_buffer.
template<typename Func>
auto visit(const const_buffer_sequence & seq, Func && func);
}
cobalt/io/ops.hpp
此包装器中的大部分功能都通过操作来实现。
它们是类型擦除的,但不是通过使用 virtual。这使得反虚拟化更容易。必须提供 implementation 函数,而 try_implementation_t 是可选的,将在 ready 函数中使用。两者都将使用 void *this 作为第一个参数进行调用。
struct [[nodiscard]] write_op final : op<system::error_code, std::size_t>
{
const_buffer_sequence buffer;
using implementation_t = void(void*, const_buffer_sequence, completion_handler<system::error_code, std::size_t>);
using try_implementation_t = void(void*, const_buffer_sequence, handler<system::error_code, std::size_t>);
write_op(const_buffer_sequence buffer,
void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code, std::size_t> handler) final;
void ready(handler<system::error_code, std::size_t> handler) final;
};
struct [[nodiscard]] read_op final : op<system::error_code, std::size_t>
{
mutable_buffer_sequence buffer;
using implementation_t = void(void*, mutable_buffer_sequence, completion_handler<system::error_code, std::size_t>);
using try_implementation_t = void(void*, mutable_buffer_sequence, handler<system::error_code, std::size_t>);
read_op(mutable_buffer_sequence buffer,
void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code, std::size_t> handler) final;
void ready(handler<system::error_code, std::size_t> handler) final;
};
struct [[nodiscard]] write_at_op final : op<system::error_code, std::size_t>
{
std::uint64_t offset;
const_buffer_sequence buffer;
using implementation_t = void(void*, std::uint64_t, const_buffer_sequence, completion_handler<system::error_code, std::size_t>);
using try_implementation_t = void(void*, std::uint64_t, const_buffer_sequence, handler<system::error_code, std::size_t>);
write_at_op(std::uint64_t offset,
const_buffer_sequence buffer,
void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code, std::size_t> handler) final;
void ready(handler<system::error_code, std::size_t> handler) final;
};
struct [[nodiscard]] read_at_op final : op<system::error_code, std::size_t>
{
std::uint64_t offset;
mutable_buffer_sequence buffer;
using implementation_t = void(void*, std::uint64_t, mutable_buffer_sequence, completion_handler<system::error_code, std::size_t>);
using try_implementation_t = void(void*, std::uint64_t, mutable_buffer_sequence, handler<system::error_code, std::size_t>);
read_at_op(std::uint64_t offset,
mutable_buffer_sequence buffer,
void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code, std::size_t> handler) final;
void ready(handler<system::error_code, std::size_t> handler) final;
};
struct [[nodiscard]] wait_op final : op<system::error_code>
{
using implementation_t = void(void*, completion_handler<system::error_code>);
using try_implementation_t = void(void*, handler<system::error_code>);
wait_op(void * this_,
implementation_t *implementation,
try_implementation_t * try_implementation = nullptr);
void initiate(completion_handler<system::error_code> handler) final;
void ready(handler<system::error_code> handler) final;
};
op 可以重用,并且公共成员(如 buffer)可以在 op 未被 await 时修改。例如,像这样
write_op wo = do_w;
auto sz = co_await wo;
while (sz > 0)
{
wo.buffer += sz;
sz = co_await wo;
}
cobalt/io/steady_timer.hpp
steady_timer 是 asio::steady_timer 的一个简单包装器。
如果计时器已过期,co_await t.wait() 不会挂起。 |
struct steady_timer
{
/// The clock type.
typedef std::chrono::steady_clock clock_type;
/// The duration type of the clock.
typedef typename clock_type::duration duration;
/// The time point type of the clock.
typedef typename clock_type::time_point time_point;
steady_timer(const cobalt::executor & executor = this_thread::get_executor());
steady_timer(const time_point& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
steady_timer(const duration& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
// cancel the timer. This is safe to call from another thread
void cancel();
// The current expiration time.
time_point expiry() const;
// Rest the expiry time, either with an absolute time or a duration.
void reset(const time_point& expiry_time);
void reset(const duration& expiry_time);
// Check if the timer is already expired.
bool expired() const;
// Get the wait operation.
[[nodiscard]] wait_op wait();
};
cobalt/io/system_timer.hpp
system_timer 是 asio::system_timer 的一个简单包装器。
如果计时器已过期,co_await t.wait() 不会挂起。 |
struct system_timer
{
/// The clock type.
typedef std::chrono::system_clock clock_type;
/// The duration type of the clock.
typedef typename clock_type::duration duration;
/// The time point type of the clock.
typedef typename clock_type::time_point time_point;
system_timer(const cobalt::executor & executor = this_thread::get_executor());
system_timer(const time_point& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
system_timer(const duration& expiry_time, const cobalt::executor & executor = this_thread::get_executor());
// cancel the timer. This is safe to call from another thread
void cancel();
// The current expiration time.
time_point expiry() const;
// Rest the expiry time, either with an absolute time or a duration.
void reset(const time_point& expiry_time);
void reset(const duration& expiry_time);
// Check if the timer is already expired.
bool expired() const;
// Get the wait operation.
[[nodiscard]] wait_op wait();
};
cobalt/io/sleep.hpp
sleep 操作是计时器的便利包装器。每个 sleep 操作都会创建一个计时器,因此重用计时器通常更有效。
// Waits using a steady_timer.
template<typename Duration>
inline auto sleep(const std::chrono::time_point<std::chrono::steady_clock, Duration> & tp);
// Waits using a system_timer.
template<typename Duration>
inline auto sleep(const std::chrono::time_point<std::chrono::system_clock, Duration> & tp);
// Waits using a steady_timer.
template<typename Rep, typename Period>
inline auto sleep(const std::chrono::duration<Rep, Period> & dur);
cobalt/io/signal_set.hpp
signal_set 是 signal_set 的包装器。
Outline
struct signal_set
{
signal_set(const cobalt::executor & executor = this_thread::get_executor());
signal_set(std::initializer_list<int> sigs, const cobalt::executor & executor = this_thread::get_executor());
// Cancel all operations associated with the signal set.
[[nodiscard]] system::result<void> cancel();
// Remove all signals from a signal_set.
[[nodiscard]] system::result<void> clear();
// Add a signal to a signal_set.
[[nodiscard]] system::result<void> add(int signal_number);
// Remove a signal from a signal_set.
[[nodiscard]] system::result<void> remove(int signal_number);
};
示例
io::signal_set s = {SIGINT, SIGTERM};
int sig = co_await s.wait();
if (s == SIGINT)
interrupt();
else if (s == SIGTERM)
terminate();
cobalt/io/stream.hpp
stream 是一个 io 对象,允许读写,例如 tcp socket。
struct BOOST_SYMBOL_VISIBLE write_stream
{
virtual ~write_stream() = default;
[[nodiscard]] virtual write_op write_some(const_buffer_sequence buffer) = 0;
};
struct BOOST_SYMBOL_VISIBLE read_stream
{
virtual ~read_stream() = default;
[[nodiscard]] virtual read_op read_some(mutable_buffer_sequence buffer) = 0;
};
struct stream : read_stream, write_stream
{
};
cobalt/io/random_access_device.hpp
random_access_device 是一个 io 对象,允许在随机位置进行读写。例如,文件。
struct BOOST_SYMBOL_VISIBLE random_access_write_device
{
virtual ~random_access_write_device() = default;
[[nodiscard]] virtual write_at_op write_some_at(std::uint64_t offset, const_buffer_sequence buffer) = 0;
};
struct BOOST_SYMBOL_VISIBLE random_access_read_device
{
virtual ~random_access_read_device() = default;
[[nodiscard]] virtual read_at_op read_some_at(std::uint64_t offset, mutable_buffer_sequence buffer) = 0;
};
struct random_access_device : random_access_read_device, random_access_write_device
{
};
cobalt/io/read.hpp
read 和 read_at 函数读取直到缓冲区已满。
struct read_all final : op<system::error_code, std::size_t>
{
read_op step;
read_all(read_op op);
};
read_all read(stream & str, mutable_buffer_sequence buffer);
struct read_all_at final : op<system::error_code, std::size_t>
{
read_at_op step;
read_all_at(read_at_op op);
};
read_all_at read_at(stream & str, std::uint64_t offset, mutable_buffer_sequence buffer);
cobalt/io/read.hpp
write 和 write_at 将完整缓冲区写入流。
struct write_all final : op<system::error_code, std::size_t>
{
read_op step;
write_all(read_op op);
};
write_all write(stream & str, const_buffer_sequence buffer);
struct write_all_at final : op<system::error_code, std::size_t>
{
write_at_op step;
write_all_at(write op);
};
write_all_at write_at(stream & str, std::uint64_t offset, const_buffer_sequence buffer);
cobalt/io/file.hpp
file 对象提供文件 IO,这可能取决于 asio 的配置方式而具有异步性。如果 asio 不支持异步文件 IO,cobalt 将回退到同步操作。
struct file
{
enum flags
{
read_only = O_RDONLY,
write_only = O_WRONLY,
read_write = O_RDWR,
append = O_APPEND,
create = O_CREAT,
exclusive = O_EXCL,
truncate = O_TRUNC,
sync_all_on_write = O_SYNC
};
// Implement bitmask operations as shown in C++ Std [lib.bitmask.types].
friend flags operator&(flags x, flags y);
friend flags operator|(flags x, flags y);
friend flags operator^(flags x, flags y);
friend flags operator~(flags x);
friend flags& operator&=(flags& x, flags y);
friend flags& operator|=(flags& x, flags y);
friend flags& operator^=(flags& x, flags y);
/// Basis for seeking in a file.
enum seek_basis
{
seek_set = SEEK_SET,
seek_cur = SEEK_CUR,
seek_end = SEEK_END
};
using native_handle_type = __unspecified__;
system::result<void> assign(const native_handle_type & native_file);
system::result<void> cancel();
executor get_executor();
bool is_open() const;
system::result<void> close();
native_handle_type native_handle();
system::result<void> open(const char * path, flags open_flags);
system::result<void> open(const std::string & path, flags open_flags);
system::result<native_handle_type> release();
system::result<void> resize(std::uint64_t n);
system::result<std::uint64_t> size() const;
system::result<void> sync_all();
system::result<void> sync_data();
explicit file(executor exec);
file(executor exec, native_handle_type fd);
};
cobalt/io/stream_file.hpp
stream_file 提供对文件的流访问。如果 asio 不支持异步文件 IO,cobalt 将回退到同步操作。
struct stream_file : file, stream
{
stream_file(const executor & executor = this_thread::get_executor());
stream_file(const char * path, file::flags open_flags,
const executor & executor = this_thread::get_executor());
stream_file(const std::string & path, file::flags open_flags,
const executor & executor = this_thread::get_executor());
stream_file(const native_handle_type & native_file,
const executor & executor = this_thread::get_executor());
stream_file(stream_file && sf) noexcept;
write_op write_some(const_buffer_sequence buffer);
read_op read_some(mutable_buffer_sequence buffer);
// advance the position in the file
system::result<std::uint64_t> seek(
std::int64_t offset,
seek_basis whence);
};
cobalt/io/random_access_file.hpp
stream_file 提供对文件的随机访问。如果 asio 不支持异步文件 IO,cobalt 将回退到同步操作。
struct random_access_file : file, random_access_device
{
using native_handle_type = file::native_handle_type;
random_access_file(const executor & executor = this_thread::get_executor());
random_access_file(const char * path, file::flags open_flags,
const executor & executor = this_thread::get_executor());
random_access_file(const std::string & path, file::flags open_flags,
const executor & executor = this_thread::get_executor());
random_access_file(const native_handle_type & native_file,
const executor & executor = this_thread::get_executor());
random_access_file(random_access_file && sf) noexcept;
write_at_op write_some_at(std::uint64_t offset, const_buffer_sequence buffer);
read_at_op read_some_at(std::uint64_t offset, mutable_buffer_sequence buffer);
};
cobalt/io/serial_port.hpp
serial_port 是 asio::serial_port 的一个简单包装器。
struct serial_port
{
// The underlying handle
using native_handle_type = typename asio::basic_serial_port<executor>::native_handle_type;
native_handle_type native_handle();
serial_port(const cobalt::executor & executor = this_thread::get_executor());
serial_port(serial_port && lhs) = default;
serial_port(std::string_view device, const cobalt::executor & executor = this_thread::get_executor());
serial_port(native_handle_type native_handle, const cobalt::executor & executor = this_thread::get_executor());
system::result<void> close();
system::result<void> cancel();
bool is_open() const;
// Send a break sequence to the serial port.
system::result<void> send_break();
[[nodiscard]] system::result<void> set_baud_rate(unsigned rate);
[[nodiscard]] system::result<unsigned> get_baud_rate();
[[nodiscard]] system::result<void> set_character_size(unsigned rate);
[[nodiscard]] system::result<unsigned> get_character_size();
// An enumerator {none, software, hardware}
using flow_control = asio::serial_port_base::flow_control::type;
[[nodiscard]] system::result<void> set_flow_control(flow_control rate);
[[nodiscard]] system::result<flow_control> get_flow_control();
// An enumerator {none, odd, even}
using parity = asio::serial_port_base::parity::type;
[[nodiscard]] system::result<void> set_parity(parity rate);
[[nodiscard]] system::result<parity> get_parity();
[[nodiscard]] system::result<void> assign(native_handle_type native_handle);
[[nodiscard]] system::result<void> open(std::string_view device);
// read & write some data. Yields the number of bytes transferred.
[[nodiscard]] write_op write_some(const_buffer_sequence buffer);
[[nodiscard]] read_op read_some (mutable_buffer_sequence buffer);
};
cobalt/io/endpoint.hpp
endpoint 头文件提供了通用的 asio 兼容 endpoint 类型和协议。
protocol_type
struct protocol_type
{
using family_t = __unspecified__;
using type_t = __unspecified__;
using protocol_t = __unspecified__;
constexpr family_t family() const noexcept;
constexpr type_t type() const noexcept;
constexpr protocol_t protocol() const noexcept;
// explicitly construct protocol type.
constexpr explicit
protocol_type(family_t family = static_cast<family_t>(0),
type_t type = static_cast<type_t>(0),
protocol_t protocol = static_cast<protocol_t>(0)) noexcept;
// allows construction from other procols, e.g. asio
template<typename OtherProtocol>
requires requires (const OtherProtocol & op)
{
{static_cast<family_t>(op.family())};
{static_cast<type_t>(op.type())};
{static_cast<protocol_t>(op.protocol())};
}
constexpr protocol_type(const OtherProtocol & op) noexcept
// make the orderable
friend
constexpr auto operator<=>(const protocol_type & , const protocol_type &) noexcept = default;
};
protocol_type tcp_v4{AF_INET, SOCK_STREAM, IPPROTO_TCP};
// from asio
protocol_type tcp_v6 = boost::asio::ip::tcp::v6();
static_protocol
static protocol 提供编译时使用的协议定义。
template<protocol_type::family_t Family = static_cast<protocol_type::family_t>(0),
protocol_type::type_t Type = static_cast<protocol_type::type_t>(0),
protocol_type::protocol_t Protocol = static_cast<protocol_type::protocol_t>(0)>
struct static_protocol
{
using family_t = protocol_type::family_t ;
using type_t = protocol_type::type_t ;
using protocol_t = protocol_type::protocol_t;
constexpr family_t family() const noexcept {return Family;};
constexpr type_t type() const noexcept {return Type;};
constexpr protocol_t protocol() const noexcept {return Protocol;};
using endpoint = io::endpoint;
};
constexpr static_protocol<...> ip;
constexpr static_protocol<...> ip_v4;
constexpr static_protocol<...> ip_v6;
constexpr static_protocol<...> tcp;
constexpr static_protocol<...> tcp_v4;
constexpr static_protocol<...> tcp_v6;
constexpr static_protocol<...> udp;
constexpr static_protocol<...> udp_v4;
constexpr static_protocol<...> udp_v6;
constexpr static_protocol<...> icmp;
constexpr static_protocol<...> local_stream;
constexpr static_protocol<...> local_datagram;
constexpr static_protocol<...> local_seqpacket;
constexpr static_protocol<...> local_protocol;
ip_address
struct ip_address
{
bool is_ipv6() const;
bool is_ipv4() const;
std::uint16_t port() const;
std::array<std::uint8_t, 16u> addr() const;
boost::static_string<45> addr_str() const;
};
struct ip_address_v4
{
std::uint16_t port() const;
std::uint32_t addr() const;
boost::static_string<15> addr_str() const;
};
struct ip_address_v6
{
std::uint16_t port() const;
std::array<std::uint8_t, 16u> addr() const;
boost::static_string<45> addr_str() const;
};
make_endpoint / get_endpoint
make_endpoint_tag 用于 make_endpoint 函数的 tag_invoke。用户可以定义另一个 tag_invoke 函数来支持自定义 endpoints。
template<protocol_type::family_t Family>
struct make_endpoint_tag {};
template<protocol_type::family_t Family>
struct get_endpoint_tag {};
提供了以下函数
std::size_t tag_invoke(make_endpoint_tag<AF_UNIX>,
asio::detail::socket_addr_type* base,
std::string_view sv);
const local_endpoint* tag_invoke(get_endpoint_tag<AF_UNIX>,
protocol_type actual,
const endpoint::addr_type * addr);
std::size_t tag_invoke(make_endpoint_tag<AF_INET>,
asio::detail::socket_addr_type* base,
std::uint32_t address,
std::uint16_t port);
std::size_t tag_invoke(make_endpoint_tag<AF_INET>,
asio::detail::socket_addr_type* base,
std::string_view address,
std::uint16_t port);
const ip_address_v4* tag_invoke(get_endpoint_tag<AF_INET>,
protocol_type actual,
const endpoint::addr_type * addr);
std::size_t tag_invoke(make_endpoint_tag<AF_INET6>,
asio::detail::socket_addr_type* base,
std::span<std::uint8_t, 16> address,
std::uint16_t port);
std::size_t tag_invoke(make_endpoint_tag<AF_INET6>,
asio::detail::socket_addr_type* base,
std::string_view address,
std::uint16_t port);
const ip_address_v6* tag_invoke(get_endpoint_tag<AF_INET6>,
protocol_type actual,
const endpoint::addr_type * addr);
std::size_t tag_invoke(make_endpoint_tag<AF_UNSPEC>,
asio::detail::socket_addr_type* base,
std::string_view address,
std::uint16_t port);
const ip_address* tag_invoke(get_endpoint_tag<AF_UNSPEC>,
protocol_type actual,
const endpoint::addr_type * addr);
endpoint
endpoint 函数持有并
struct endpoint
{
using storage_type = asio::detail::sockaddr_storage_type;
using addr_type = asio::detail::socket_addr_type;
void resize(std::size_t size);
void * data() {return &storage_; }
const void * data() const {return &storage_; }
std::size_t size() const {return size_;}
std::size_t capacity() const {return sizeof(storage_);}
void set_type (protocol_type::type_t type) { type_ = type;}
void set_protocol(protocol_type::protocol_t protocol) { protocol_ = protocol;}
protocol_type protocol() const;
endpoint() = default;
endpoint(const endpoint & ep);
// Construct a endpoint using make_endpoint_tag
template<protocol_type::family_t Family,
protocol_type::type_t Type,
protocol_type::protocol_t Protocol,
typename ... Args>
requires requires (make_endpoint_tag<Family> proto,
addr_type* addr, Args && ... args)
{
{tag_invoke(proto, addr, std::forward<Args>(args)...)} -> std::convertible_to<std::size_t>;
}
endpoint(static_protocol<Family, Type, Protocol> proto, Args && ... args);
// allows constructing an endpoint from an asio type
template<typename OtherEndpoint>
requires requires (OtherEndpoint oe)
{
{oe.protocol()} -> std::convertible_to<protocol_type>;
{oe.data()} -> std::convertible_to<void*>;
{oe.size()} -> std::convertible_to<std::size_t>;
}
endpoint(OtherEndpoint && oe);
};
class bad_endpoint_access : public std::exception
{
public:
bad_endpoint_access() noexcept = default;
char const * what() const noexcept;
};
// Uses `get_endpoint_tag<Protocol> to return a `pointer` to the result.
// Throws `bad_endpoint_access` if it's the wrong type
auto get(const endpoint & ep);
// Uses `get_endpoint_tag<Protocol> to return a `pointer` to the result.
template<static_protocol Protocol>
friend auto get_if(const endpoint * ep);
cobalt::io::endpoint ep{cobalt::io::tcp_v4, "127.0.0.1", 8080};
cobalt/io/socket.hpp
Socket 类是任何 socket 的基类。
struct socket
{
[[nodiscard]] system::result<void> open(protocol_type prot = protocol_type {});
[[nodiscard]] system::result<void> close();
[[nodiscard]] system::result<void> cancel();
[[nodiscard]] bool is_open() const;
// asio acceptor compatibility
template<typename T>
struct rebind_executor {using other = socket;};
using shutdown_type = asio::socket_base::shutdown_type;
using wait_type = asio::socket_base::wait_type;
using message_flags = asio::socket_base::message_flags;
constexpr static int message_peek = asio::socket_base::message_peek;
constexpr static int message_out_of_band = asio::socket_base::message_out_of_band;
constexpr static int message_do_not_route = asio::socket_base::message_do_not_route;
constexpr static int message_end_of_record = asio::socket_base::message_end_of_record;
using native_handle_type = asio::basic_socket<protocol_type, executor>::native_handle_type;
native_handle_type native_handle();
// Drop the connection
[[nodiscard]] system::result<void> shutdown(shutdown_type = shutdown_type::shutdown_both);
// endpoint of a connected endpiotn
[[nodiscard]] system::result<endpoint> local_endpoint() const;
[[nodiscard]] system::result<endpoint> remote_endpoint() const;
system::result<void> assign(protocol_type protocol, native_handle_type native_handle);
system::result<native_handle_type> release();
/// socket options
[[nodiscard]] system::result<std::size_t> bytes_readable();
[[nodiscard]] system::result<void> set_debug(bool debug);
[[nodiscard]] system::result<bool> get_debug() const;
[[nodiscard]] system::result<void> set_do_not_route(bool do_not_route);
[[nodiscard]] system::result<bool> get_do_not_route() const;
[[nodiscard]] system::result<void> set_enable_connection_aborted(bool enable_connection_aborted);
[[nodiscard]] system::result<bool> get_enable_connection_aborted() const;
[[nodiscard]] system::result<void> set_keep_alive(bool keep_alive);
[[nodiscard]] system::result<bool> get_keep_alive() const;
[[nodiscard]] system::result<void> set_linger(bool linger, int timeout);
[[nodiscard]] system::result<std::pair<bool, int>> get_linger() const;
[[nodiscard]] system::result<void> set_receive_buffer_size(std::size_t receive_buffer_size);
[[nodiscard]] system::result<std::size_t> get_receive_buffer_size() const;
[[nodiscard]] system::result<void> set_send_buffer_size(std::size_t send_buffer_size);
[[nodiscard]] system::result<std::size_t> get_send_buffer_size() const;
[[nodiscard]] system::result<void> set_receive_low_watermark(std::size_t receive_low_watermark);
[[nodiscard]] system::result<std::size_t> get_receive_low_watermark() const;
[[nodiscard]] system::result<void> set_send_low_watermark(std::size_t send_low_watermark);
[[nodiscard]] system::result<std::size_t> get_send_low_watermark() const;
[[nodiscard]] system::result<void> set_reuse_address(bool reuse_address);
[[nodiscard]] system::result<bool> get_reuse_address() const;
[[nodiscard]] system::result<void> set_no_delay(bool reuse_address);
[[nodiscard]] system::result<bool> get_no_delay() const;
wait_op wait(wait_type wt = wait_type::wait_read);
// Connect to a specific endpoint
connect_op connect(endpoint ep);
// connect to one of the given endpoints. Returns the one connected to.
ranged_connect_op connect(endpoint_sequence ep);
protected:
// Adopt the under-specified endpoint. E.g. to tcp from an endpoint specified as ip_address
virtual void adopt_endpoint_(endpoint & ) {}
};
// Connect to sockets using the given protocol
system::result<void> connect_pair(protocol_type protocol, socket & socket1, socket & socket2);
cobalt/io/socket.hpp
流 socket 的 socket 类,例如 TCP。
struct [[nodiscard]] stream_socket final : socket, stream
{
stream_socket(const cobalt::executor & executor = this_thread::get_executor());
stream_socket(stream_socket && lhs);
stream_socket(native_handle_type h, protocol_type protocol = protocol_type(),
const cobalt::executor & executor = this_thread::get_executor());
stream_socket(endpoint ep,
const cobalt::executor & executor = this_thread::get_executor());
write_op write_some(const_buffer_sequence buffer);
read_op read_some(mutable_buffer_sequence buffer);};
// Connect to sockets using the given protocol
inline system::result<std::pair<stream_socket, stream_socket>> make_pair(decltype(local_stream) protocol);
cobalt/io/socket.hpp
这个 socket 类实现了一个数据报 socket,例如 UDP。
struct [[nodiscard]] datagram_socket final : socket
{
datagram_socket(const cobalt::executor & executor = this_thread::get_executor());
datagram_socket(datagram_socket && lhs);
datagram_socket(native_handle_type h, protocol_type protocol = protocol_type(),
const cobalt::executor & executor = this_thread::get_executor());
datagram_socket(endpoint ep,
const cobalt::executor & executor = this_thread::get_executor());
write_op send(const_buffer_sequence buffer);
read_op receive(mutable_buffer_sequence buffer);
};
// Create a pair of connected sockets.
inline system::result<std::pair<datagram_socket, datagram_socket>> make_pair(decltype(local_datagram) protocol);
cobalt/io/socket.hpp
顺序数据包 socket 的 socket 类,例如 SCTP。
struct seq_packet_socket : socket
{
seq_packet_socket(const cobalt::executor & executor = this_thread::get_executor());
seq_packet_socket(seq_packet_socket && lhs);
seq_packet_socket(native_handle_type h, protocol_type protocol = protocol_type(),
const executor & executor = this_thread::get_executor());
seq_packet_socket(endpoint ep, const executor & executor = this_thread::get_executor());
receive_op receive(message_flags in_flags, message_flags& out_flags, mutable_buffer_sequence buffer);
send_op send(message_flags in_flags, const_buffer_sequence buffer);
receive_op receive(message_flags in_flags, mutable_buffer_sequence buffer);
};
// Connect to sockets using the given protocol
system::result<void> make_pair(decltype(local_seqpacket) protocol);
cobalt/io/resolver.hpp
解析器允许查找给定地址和服务的端点。
struct resolver
{
resolver(const executor & exec = this_thread::get_executor());
resolver(resolver && ) = delete;
void cancel();
// Produces a endpoint_sequence
[[nodiscard]] auto resolve(std::string_view host, std::string_view service);
};
// Short hand for performing a single lookup.
auto lookup(std::string_view host, std::string_view service,
const executor & exec = this_thread::get_executor());
cobalt/io/acceptor.hpp
接受器可用于接受连接。
struct acceptor
{
using wait_type = asio::socket_base::wait_type;
constexpr static std::size_t max_listen_connections = asio::socket_base::max_listen_connections;
acceptor(const cobalt::executor & executor = this_thread::get_executor());
acceptor(endpoint ep, const cobalt::executor & executor = this_thread::get_executor());
system::result<void> bind(endpoint ep);
system::result<void> listen(int backlog = max_listen_connections); // int backlog = net::max_backlog()
endpoint local_endpoint();
// accept any connection and assign it to `sock`
accept_op accept(socket & sock);
// Accept a connection of a stream_socket
template<protocol_type::family_t F = tcp.family(), protocol_type::protocol_t P = tcp.protocol()>
accept_op accept(static_protocol<F, tcp.type(), P> stream_proto = tcp);
// Accept a connection of a seq_packet
template<protocol_type::family_t F, protocol_type::protocol_t P>
accept_op accept(static_protocol<F, local_seqpacket.type(), P> stream_proto = tcp);
// For a connection to be ready
wait_op wait(wait_type wt = wait_type::wait_read);
};
cobalt/io/ssl.hpp
SSL 流是一个可以升级为 SSL 的 stream_socket。
namespace ssl
{
enum class verify
{
none = asio::ssl::verify_none,
peer = asio::ssl::verify_peer,
fail_if_no_peer_cert = asio::ssl::verify_fail_if_no_peer_cert,
client_once = asio::ssl::verify_client_once
};
using context = asio::ssl::context;
using verify_mode = asio::ssl::verify_mode;
struct stream final : socket, cobalt::io::stream, asio::ssl::stream_base
{
stream(context & ctx, const cobalt::executor & executor = this_thread::get_executor());
stream(context & ctx, native_handle_type h, protocol_type protocol = protocol_type(),
const cobalt::executor & executor = this_thread::get_executor());
stream(context & ctx, endpoint ep,
const cobalt::executor & executor = this_thread::get_executor());
write_op write_some(const_buffer_sequence buffer) override;
read_op read_some(mutable_buffer_sequence buffer) override;
// Indicates whether or not an ssl upgrade has been performed
[[nodiscard]] bool secure() const {return upgraded_;}
template<typename VerifyCallback>
system::result<void> set_verify_callback(VerifyCallback vc);
system::result<void> set_verify_depth(int depth);
system::result<void> set_verify_mode(verify depth);
[[nodiscard]] auto handshake(handshake_type type);
[[nodiscard]] auto handshake(handshake_type type, const_buffer_sequence buffer);
[[nodiscard]] auto shutdown();
};
}
| 构建脚本为该类创建了一个单独的库 (boost_cobalt_io_ssl),以便 boost_cobalt_io 可以在没有 OpenSSL 的情况下使用。 |
cobalt/experimental/context.hpp
| 这是未定义的行为,因为它违反了标准中的一个先决条件。 |
此头文件提供了 experimental 支持,可以使用基于 boost.fiber 的 stackful 协程,就好像它们是 C20 协程一样。也就是说,它们可以通过放入 coroutine_handle 来使用 `awaitables`。同样,该实现使用 C20 协程 Promise 并像 C++20 协程一样运行它。
//
void delay(experimental::context<promise<void>> h, std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
h.await(tim.async_wait(cobalt::use_op)); // instead of co_await.
}
cobalt::main co_main(int argc, char *argv[])
{
cobalt::promise<void> dl = cobalt::experimental::make_context(&delay, 50);
co_await dl;
co_return 0;
}
参考
// The internal coroutine context.
/// Args are the function arguments after the handle.
template<typename Return, typename ... Args>
struct context
{
// Get a handle to the promise
promise_type & promise();
const promise_type & promise() const;
// Convert it to any context if the underlying promise is the same
template<typename Return_, typename ... Args_>
constexpr operator context<Return_, Args_...>() const;
// Await something. Uses await_transform automatically.
template<typename Awaitable>
auto await(Awaitable && aw);
// Yield a value, if supported by the promise.
template<typename Yield>
auto yield(Yield && value);
};
// Create a fiber with a custom stack allocator (see boost.fiber for details) and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc && salloc, Args && ... args);
// Create a fiber with the default allocator and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func>
auto make_context(Func && func, Args && ... args);
// Create a fiber with a custom stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc && salloc, Args && ... args);
// Create a fiber with the default stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func>
auto make_context(Func && func, Args && ... args);
In-Depth
Custom Executors
cobalt 默认使用 asio::any_io_executor 的原因之一是它是一个类型擦除的执行器,也就是说,您可以提供自己的事件循环而无需重新编译 cobalt。
然而,在 Executor TS 的开发过程中,执行器的概念变得有点难以理解,说得委婉些。
Ruben Perez 写了一篇很棒的博客文章,我将无耻地借鉴。
定义
执行器是一个指向实际事件循环的类型,并且是(廉价)可复制的,它支持属性(见下文),是相等可比的,并且有一个 execute 函数。
execute
struct example_executor
{
template<typename Fn>
void execute(Fn && fn) const;
};
上面的函数根据其属性执行 fn。
属性
属性可以被查询、首选或必需,例如
struct example_executor
{
// get a property by querying it.
asio::execution::relationship_t &query(asio::execution::relationship_t) const
{
return asio::execution::relationship.fork;
}
// require an executor with a new property
never_blocking_executor require(const execution::blocking_t::never_t);
// prefer an executor with a new property. the executor may or may not support it.
never_blocking_executor prefer(const execution::blocking_t::never_t);
// not supported
example_executor prefer(const execution::blocking_t::always_t);
};
asio::any_io_executor 的属性
为了将执行器包装在 asio::any_io_executor 中,需要两个属性
-
`execution::context_t
-
execution::blocking_t::never_t
这意味着我们需要要么使它们成为必需的(这对上下文没有意义),要么从 query 返回预期的值。
execution::context_t 查询应返回 asio::execution_context&,如下所示
struct example_executor
{
asio::execution_context &query(asio::execution::context_t) const;
};
执行上下文用于管理管理 io 对象的服务的生命周期,例如 asio 的计时器和 socket。也就是说,通过提供此上下文,所有 asio 的 io 都与它一起工作。
execution_context 必须在执行器被销毁后仍然存在。 |
以下可能是首选的
-
execution::blocking_t::possibly_t -
execution::outstanding_work_t::tracked_t -
execution::outstanding_work_t::untracked_t -
execution::relationship_t::fork_t -
execution::relationship_t::continuation_
这意味着您可能希望在执行器中支持它们以进行优化。
blocking 属性
正如我们之前所见,此属性控制传递给 execute() 的函数是否可以立即运行,作为 execute() 的一部分,或者必须排队等待以后执行。可能的值是
-
asio::execution::blocking.never:从不在execute()的一部分运行函数。这就是asio::post()所做的。 -
asio::execution::blocking.possibly:函数可能作为execute()的一部分运行,也可能不运行。这是默认值(调用io_context::get_executor时获得的值)。 -
asio::execution::blocking.always:函数始终作为execute()的一部分运行。io_context::executor不支持此选项。
relationship 属性
relationship 可以取两个值
-
asio::execution::relationship.continuation:表示传递给execute()的函数是调用execute()的函数的延续。 -
asio::execution::relationship.fork:与上述相反。这是默认值(调用io_context::get_executor()时获得的值)。
将此属性设置为 continuation 会在函数调度方式上启用一些优化。它仅在函数被排队时(而不是立即运行时)才起作用。对于 io_context,设置后,函数将被调度到一个更快的、线程本地的队列中运行,而不是上下文全局队列。
outstanding_work_t 属性
outstanding_work 可以取两个值
-
asio::execution::outstanding_work.tracked:表示只要执行器还存在,就还有工作要做。 -
asio::execution::outstanding_work.untracked:与上述相反。这是默认值(调用io_context::get_executor()时获得的值)。
将此属性设置为 tracked 意味着事件循环在执行器存在期间不会返回。
A minimal executor
有了这些,让我们看看一个最小执行器的接口。
struct minimal_executor
{
minimal_executor() noexcept;
asio::execution_context &query(asio::execution::context_t) const;
static constexpr asio::execution::blocking_t
query(asio::execution::blocking_t) noexcept
{
return asio::execution::blocking.never;
}
template<class F>
void execute(F && f) const;
bool operator==(minimal_executor const &other) const noexcept;
bool operator!=(minimal_executor const &other) const noexcept;
};
有关使用 Python 的 asyncio 事件循环的实现的示例,请参阅 example/python.cpp。 |
Adding a work guard.
现在,让我们为 outstanding_work 属性添加一个 require 函数,它使用多个类型。
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const {return *this; }
};
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const {return *this;}
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const;
};
请注意,不必从 require 函数返回不同的类型,也可以这样做
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor require(asio::execution::outstanding_work::untracked_t) const;
};
如果我们想使用 prefer,它看起来如下
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor prefer(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor prefer(asio::execution::outstanding_work::untracked_t) const;
};
总结
正如您所见,属性系统并非易事,但功能强大。实现自定义执行器本身就是一个问题类别,因此本文档不介绍该内容。相反,有一个关于如何将 Python 事件循环包装在执行器中的示例。
以下是一些阅读建议。
Stackless
C++20 协程是无栈的,这意味着它们没有自己的栈。
C++ 中的栈描述了调用栈,即所有堆叠的函数帧。函数帧是函数运行所需的内存,即用于存储其变量和返回地址等信息的内存片段。
| 函数帧的大小在编译时可知,但在包含其定义的编译单元之外是未知的。 |
int bar() {return 0;} // the deepest point of the stack
int foo() {return bar();}
int main()
{
return foo();
}
上面的示例中的调用栈是
main()
foo()
bar()
协程可以实现为有栈的,这意味着它分配固定大小的内存块,并像线程一样堆叠函数帧。C++20 协程是无栈的,即它们只分配自己的帧,并在恢复时使用调用者的栈。使用我们之前的例子
fictional_eager_coro_type<int> example()
{
co_yield 0;
co_yield 1;
}
void nested_resume(fictional_eager_coro_type<int>& f)
{
f.resume();
}
int main()
{
auto f = example();
nested_resume(f);
f.reenter();
return 0;
}
这将产生一个类似这样的调用栈
main()
f$example()
nested_resume()
f$example()
f$example()
如果协程在线程之间移动,情况也是如此。
Lazy & eager
如果协程仅在其被恢复后开始执行代码,则它是惰性的,而一个急切的协程将立即执行直到其第一个挂起点(即 co_await、co_yield 或 co_return 表达式)。
lazy_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
lazy.resume();
printf("resumed twice\n");
return 0;
}
这将产生类似这样的输出
enter main
constructed coro
Entered coro
resumed once
Coro Done
resumed twice
而一个急切的协程看起来会像这样
eager_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
return 0;
}
这将产生类似这样的输出
enter main
Entered coro
constructed coro
resume once
Coro Done
基准测试
在 11 代 Intel® Core™ i7-1185G7 @ 3.00GHz 上运行
Posting to an executor
基准测试正在运行以下代码,基于 cobalt 的任务、asio::awaitable 和 asio 的 stackful 协程(boost.context)。
cobalt::task<void> atest()
{
for (std::size_t i = 0u; i < n; i++)
co_await asio::post(cobalt::use_op);
}
| gcc 12 | clang 16 | |
|---|---|---|
cobalt |
2472 |
2098 |
awaitable |
2432 |
2253 |
stackful |
3655 |
3725 |
Running noop coroutine in parallel
此基准测试使用一个大小为零的 asio::experimental::channel,以在其中并行读写。它使用 cobalt 的 gather 和 asio::awaitable 中的 awaitable_operator。
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
for (std::size_t i = 0u; i < n; i++)
co_await cobalt::gather(
chan.async_send(system::error_code{}, cobalt::use_task),
chan.async_receive(cobalt::use_task));
}
asio::awaitable<void> awtest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
using boost::asio::experimental::awaitable_operators::operator&&;
for (std::size_t i = 0u; i < n; i++)
co_await (
chan.async_send(system::error_code{}, asio::use_awaitable)
&&
chan.async_receive(asio::use_awaitable));
}
| gcc 12 | clang 16 | |
|---|---|---|
cobalt |
1563 |
1468 |
awaitable |
2800 |
2805 |
Immediate
此基准测试通过使用大小为 1 的通道来利用立即完成,以便每个操作都是立即的。
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 1u};
for (std::size_t i = 0u; i < n; i++)
{
co_await chan.async_send(system::error_code{}, cobalt::use_op);
co_await chan.async_receive(cobalt::use_op);
}
}
| gcc 12 | clang 16 | |
|---|---|---|
cobalt |
1810 |
1864 |
awaitable |
3109 |
4110 |
stackful |
3922 |
4705 |
通道
在此基准测试中,比较了 asio::experimental::channel 和 cobalt::channel。
这类似于并行测试,但使用了 cobalt::channel。
| gcc | clang | |
|---|---|---|
cobalt |
500 |
350 |
awaitable |
790 |
770 |
stackful |
867 |
907 |
Operation Allocations
此基准测试比较了异步操作关联分配器的不同可能解决方案。
| gcc | clang | |
|---|---|---|
std::allocator |
1136 |
1139 |
cobalt::monotonic |
1149 |
1270 |
pmr::monotonic |
1164 |
1173 |
cobalt::sbo |
1021 |
1060 后一种方法由 cobalt 内部使用。 |
要求
库
Boost.cobalt 需要 C++20 编译器,并直接依赖于以下 boost 库
-
boost.asio
-
boost.system
-
boost.circular_buffer
-
boost.intrusive
-
boost.smart_ptr
-
boost.container (适用于 clang < 16)
编译器
此库自 Clang 14、Gcc 10 和 MSVC 19.30 (Visual Studio 2022) 起受支持。
| Gcc 版本 12.1 和 12.2 似乎有一个协程与无栈变量的 bug,如[此处](https://godbolt.org/z/6adGcqP1z)所示,应避免用于协程。 |
Clang 仅在 16 版本中添加了 std::pmr 支持,因此较旧的 clang 版本使用 boost::container::pmr 作为即插即用替换。
| 一些(如果不是全部)MSVC 版本有一个损坏的协程实现,该库需要绕过。这可能会导致非确定性行为和开销。 |
协程的延续可以在 final_suspend 返回的 awaitable 中完成,如下所示
// in promise
auto final_suspend() noexcept
{
struct final_awaitable
{
std::coroutine_handle<void> continuation{std::noop_coroutine()}; (1)
bool await_ready() const noexcept;
std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> h) noexcept
{
auto cc = continuation;
h.destroy(); (2)
return cc;
}
void await_resume() noexcept {}
};
return final_awaitable{my_continuation};
};
| 1 | 延续 |
| 2 | 在延续之前自毁协程 |
final_suspend 在 MSVC 上无法正确挂起协程,因此 h.destroy() 会导致协程帧中的元素被双重销毁。因此,msvc 将需要将销毁推迟到线下进行。这会导致开销,并使实际的内存释放变得不确定。
致谢
如果没有 CppAlliance 及其创始人 Vinnie Falco,这个库就不可能实现。Vinnie 足够信任我,让我能够在此项目上工作,尽管他自己对这样的库应该如何设计有着截然不同的看法。
还要感谢 Ruben Perez 和 Richard Hodges 倾听我的设计问题并给我建议和用例。此外,如果没有 Chris Kohlhoff 出色的 boost.asio,这个库就不可能实现。