Working With Any Number of Futures
When we switched from using two futures to three in the previous section, we
also had to switch from using join
to using join3
. It would be annoying to
have to call a different function every time we changed the number of futures we
wanted to join. Happily, we have a macro form of join
to which we can pass an
arbitrary number of arguments. It also handles awaiting the futures itself.
Thus, we could rewrite the code from Listing 17-13 to use join!
instead of
join3
, as in Listing 17-14:
This is definitely a nice improvement over needing to swap between join
and
join3
and join4
and so on! However, even this macro form only works when we
know the number of futures ahead of time. In real-world Rust, though, pushing
futures into a collection and then waiting on some or all the futures in that
collection to complete is a common pattern.
To check all the futures in some collection, we will need to iterate over and
join on all of them. The trpl::join_all
function accepts any type which
implements the Iterator
trait, which we learned about back in Chapter 13, so
it seems like just the ticket. Let’s try putting our futures in a vector, and
replace join!
with join_all
.
Unfortunately, this does not compile. Instead, we get this error:
error[E0308]: mismatched types
--> src/main.rs:43:37
|
8 | let tx1_fut = async move {
| _______________________-
9 | | let vals = vec![
10 | | String::from("hi"),
11 | | String::from("from"),
... |
19 | | }
20 | | };
| |_________- the expected `async` block
21 |
22 | let rx_fut = async {
| ______________________-
23 | | while let Some(value) = rx.recv().await {
24 | | println!("received '{value}'");
25 | | }
26 | | };
| |_________- the found `async` block
...
43 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a different `async` block
|
= note: expected `async` block `{async block@src/main.rs:8:23: 20:10}`
found `async` block `{async block@src/main.rs:22:22: 26:10}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and and casting it to a trait object
This might be surprising. After all, none of them returns anything, so each
block produces a Future<Output = ()>
. However, Future
is a trait, not a
concrete type. The concrete types are the individual data structures generated
by the compiler for async blocks. You cannot put two different hand-written
structs in a Vec
, and the same thing applies to the different structs
generated by the compiler.
To make this work, we need to use trait objects, just as we did in “Returning
Errors from the run function” in Chapter 12. (We will cover trait objects
in detail in Chapter 18.) Using trait objects lets us treat each of the
anonymous futures produced by these types as the same type, since all of them
implement the Future
trait.
Note: In Chapter 8, we discussed another way to include multiple types in a
Vec
: using an enum to represent each of the different types which can
appear in the vector. We cannot do that here, though. For one thing, we have
no way to name the different types, because they are anonymous. For another,
the reason we reached for a vector and join_all
in the first place was to be
able to work with a dynamic collection of futures where we do not know what
they will all be until runtime.
We start by wrapping each of the futures in the vec!
in a Box::new
, as shown
in Listing 17-16.
Unfortunately, this still does not compile. In fact, we have the same basic
error we did before, but we get one for both the second and third Box::new
calls, and we also get new errors referring to the Unpin
trait. We will come
back to the Unpin
errors in a moment. First, let’s fix the type errors on the
Box::new
calls, by explicitly providing the type of futures
as a trait
object (Listing 17-17).
The type we had to write here is a little involved, so let’s walk through it:
- The innermost type is the future itself. We note explicitly that the output of
the future is the unit type
()
by writingFuture<Output = ()>
. - Then we annotate the trait with
dyn
to mark it as dynamic. - The entire trait reference is wrapped in a
Box
. - Finally, we state explicitly that
futures
is aVec
containing these items.
That already made a big difference. Now when we run the compiler, we only have
the errors mentioning Unpin
. Although there are three of them, notice that
each is very similar in its contents.
error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
--> src/main.rs:46:24
|
46 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `join_all`
--> /Users/chris/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
--> src/main.rs:46:9
|
46 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
--> /Users/chris/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
--> src/main.rs:46:33
|
46 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
--> /Users/chris/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.
That is a lot to digest, so let’s pull it apart. The first part of the message
tell us that the first async block (src/main.rs:8:23: 20:10
) does not
implement the Unpin
trait, and suggests using pin!
or Box::pin
to resolve
it. Later in the chapter, we will dig into a few more details about Pin
and
Unpin
. For the moment, though, we can just follow the compiler’s advice to get
unstuck! In Listing 17-18, we start by updating the type annotation for
futures
, with a Pin
wrapping each Box
. Second, we use Box::pin
to pin
the futures themselves.
If we compile and run this, we finally get the output we hoped for:
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
Phew!
There is a bit more we can explore here. For one thing, using Pin<Box<T>>
comes with a small amount of extra overhead from putting these futures on the
heap with Box
—and we are only doing that to get the types to line up. We don’t
actually need the heap allocation, after all: these futures are local to this
particular function. As noted above, Pin
is itself a wrapper type, so we can
get the benefit of having a single type in the Vec
—the original reason we
reached for Box
—without doing a heap allocation. We can use Pin
directly
with each future, using the std::pin::pin
macro.
However, we must still be explicit about the type of the pinned reference;
otherwise Rust will still not know to interpret these as dynamic trait objects,
which is what we need them to be in the Vec
. We therefore pin!
each future
when we define it, and define futures
as a Vec
containing pinned mutable
references to the dynamic Future
type, as in Listing 17-19.
We got this far by ignoring the fact that we might have different Output
types. For example, in Listing 17-20, the anonymous future for a
implements
Future<Output = u32>
, the anonymous future for b
implements Future<Output = &str>
, and the anonymous future for c
implements Future<Output = bool>
.
We can use trpl::join!
to await them, because it allows you to pass in
multiple future types and produces a tuple of those types. We cannot use
trpl::join_all
, because it requires the futures passed in all to have the same
type. Remember, that error is what got us started on this adventure with Pin
!
This is a fundamental tradeoff: we can either deal with a dynamic number of
futures with join_all
, as long as they all have the same type, or we can deal
with a set number of futures with the join
functions or the join!
macro,
even if they have different types. This is the same as working with any other
types in Rust, though. Futures are not special, even though we have some nice
syntax for working with them, and that is a good thing.
Racing futures
When we “join” futures with the join
family of functions and macros, we
require all of them to finish before we move on. Sometimes, though, we only
need some future from a set to finish before we move on—kind of like racing
one future against another. This operation is often named race
for exactly
that reason.
Note: Under the hood, race
is built on a more general function, select
,
which you will encounter more often in real-world Rust code. A select
function can do a lot of things that trpl::race
function cannot, but it also
has some additional complexity that we can skip over for now.
In Listing 17-21, we use trpl::race
to run two futures, slow
and fast
,
against each other. Each one prints a message when it starts running, pauses for
some amount of time by calling and awaiting sleep
, and then prints another
message when it finishes. Then we pass both to trpl::race
and wait for one of
them to finish. (The outcome here won’t be too surprising: fast
wins!) Note
that unlike the first time we used race
back in Our First Async
Program, we just ignore the Either
instance it returns here,
because all of the interesting behavior happens in the body of the async blocks.
Notice that if you flip the order of the arguments to race
, the order of the
“started” messages changes, even though the fast
future always completes
first. That is because the implementation of this particular race
function is
not fair. It always runs the futures passed as arguments in the order they are
passed. Other implementations are fair, and will randomly choose which future
to poll first. Regardless of whether the implementation of race we are using is
fair, though, one of the futures will run up to the first .await
in its body
before another task can start.
Recall from Our First Async Program that at each await point, Rust gives a runtime a chance to pause the task and switch to another one if the future being awaited is not ready. The inverse is also true: Rust only pauses async blocks and hands control back to a runtime at an await point. Everything between await points is synchronous.
That means if you do a bunch of work in an async block without an await point, that future will block any other futures from making progress. You may sometimes hear this referred to as one future starving other futures. In some cases, that may not be a big deal. However, if you are doing some kind of expensive setup or long-running work, or if you have a future which will keep doing some particular task indefinitely, you will need to think about when and where to hand control back to the runtime.
By the same token, if you have long-running blocking operations, async can be a useful tool for providing ways for different parts of the program to relate to each other.
But how would you hand control back to the runtime in those cases?
Yielding
Let’s simulate a long-running operation. Listing 17-22 introduces a slow
function. It uses std::thread::sleep
instead of trpl::sleep
so that calling
slow
will block the current thread for some number of milliseconds. We can use
slow
to stand in for real-world operations which are both long-running and
blocking.
In Listing 17-23, we use slow
to emulate doing this kind of CPU-bound work in
a pair of futures. To begin, each future only hands control back to the runtime
after carrying out a bunch of slow operations.
If you run this, you will see this output:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
As with our earlier example, race
still finishes as soon as a
is done. There
is no interleaving between the two futures, though. The a
future does all of
its work until the trpl::sleep
call is awaited, then the b
future does all
of its work until its own trpl::sleep
call is awaited, and then the a
future
completes. To allow both futures to make progress between their slow tasks, we
need await points so we can hand control back to the runtime. That means we need
something we can await!
We can already see this kind of handoff happening in Listing 17-23: if we
removed the trpl::sleep
at the end of the a
future, it would complete
without the b
future running at all. Maybe we could use the sleep
function
as a starting point?
In Listing 17-24, we add trpl::sleep
calls with await points between each call
to slow
. Now the two futures’ work is interleaved:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
The a
future still runs for a bit before handing off control to b
, because
it calls slow
before ever calling trpl::sleep
, but after that the futures
swap back and forth each time one of them hits an await point. In this case, we
have done that after every call to slow
, but we could break up the work
however makes the most sense to us.
We do not really want to sleep here, though: we want to make progress as fast
as we can. We just need to hand back control to the runtime. We can do that
directly, using the yield_now
function. In Listing 17-25, we replace all those
sleep
calls with yield_now
.
This is both clearer about the actual intent and can be significantly faster
than using sleep
, because timers like the one used by sleep
often have
limits to how granular they can be. The version of sleep
we are using, for
example, will always sleep for at least a millisecond, even if we pass it a
Duration
of one nanosecond. Again, modern computers are fast: they can do a
lot in one millisecond!
You can see this for yourself by setting up a little benchmark, like the one in
Listing 17-26. (This is not an especially rigorous way to do performance
testing, but it suffices to show the difference here.) Here, we skip all the
status printing, pass a one-nanosecond Duration
to trpl::sleep
, and let
each future run by itself, with no switching between the futures. Then we run
for 1,000 iterations and see how long the future using trpl::sleep
takes
compared to the future using trpl::yield_now
.
The version with yield_now
is way faster!
This means that async can be useful even for compute-bound tasks, depending on what else your program is doing, because it provides a useful tool for structuring the relationships between different parts of the program. This is a form of cooperative multitasking, where each future has both the power to determine when it hands over control via await points. Each future therefore also has the responsibility to avoid blocking for too long. In some Rust-based embedded operating systems, this is the only kind of multitasking!
In real-world code, you will not usually be alternating function calls with await points on every single line, of course. While yielding control like this is relatively inexpensive, it is not free! In many cases, trying to break up a compute-bound task might make it significantly slower, so sometimes it is better for overall performance to let an operation block briefly. You should always measure to see what your code’s actual performance bottlenecks are. The underlying dynamic is an important one to keep in mind if you are seeing a lot of work happening in serial that you expected to happen concurrently, though!
Building Our Own Async Abstractions
We can also compose futures together to create new patterns. For example, we can
build a timeout
function with async building blocks we already have. When we
are done, the result will be another building block we could use to build up yet
further async abstractions.
Listing 17-27 shows how we would expect this timeout
to work with a slow
future.
Let’s implement this! To begin, let’s think about the API for timeout
:
- It needs to be an async function itself so we can await it.
- Its first parameter should be a future to run. We can make it generic to allow it to work with any future.
- Its second parameter will be the maximum time to wait. If we use a
Duration
, that will make it easy to pass along totrpl::sleep
. - It should return a
Result
. If the future completes successfully, theResult
will beOk
with the value produced by the future. If the timeout elapses first, theResult
will beErr
with the duration that the timeout waited for.
Listing 17-28 shows this declaration.
That satisfies our goals for the types. Now let’s think about the behavior we
need: we want to race the future passed in against the duration. We can use
trpl::sleep
to make a timer future from the duration, and use trpl::race
to
run that timer with the future the caller passes in.
We also know that race
is not fair, and polls arguments in the order they are
passed. Thus, we pass future_to_try
to race
first so it gets a chance to
complete even if max_time
is a very short duration. If future_to_try
finishes first, race
will return Left
with the output from future
. If
timer
finishes first, race
will return Right
with the timer’s output of
()
.
In Listing 17-29, we match on the result of awaiting trpl::race
. If the
future_to_try
succeeded and we get a Left(output)
, we return Ok(output)
.
If the sleep timer elapsed instead and we get a Right(())
, we ignore the ()
with _
and return Err(max_time)
instead.
With that, we have a working timeout
, built out of two other async helpers. If
we run our code, it will print the failure mode after the timeout:
Failed after 2 seconds
Because futures compose with other futures, you can build really powerful tools using smaller async building blocks. For example, you can use this same approach to combine timeouts with retries, and in turn use those with things like network calls—one of the examples from the beginning of the chapter!
In practice, you will usually work directly with async
and .await
, and
secondarily with functions and macros like join
, join_all
, race
, and so
on. You will only need to reach for pin
now and again to use them with those
APIs.
We have now seen a number of ways to work with multiple futures at the same time. Up next, we will look at how we can work with multiple futures in a sequence over time, with streams. Here are a couple more things you might want to consider first, though:
-
We used a
Vec
withjoin_all
to wait for all of the futures in some group to finish. How could you use aVec
to process a group of futures in sequence, instead? What are the tradeoffs of doing that? -
Take a look at the
futures::stream::FuturesUnordered
type from thefutures
crate. How would using it be different from using aVec
? (Don’t worry about the fact that it is from thestream
part of the crate; it works just fine with any collection of futures.)