Waiting for a result¶
Motivation and usage¶
As noted elsewhere, the best way to wait for ResultCapture
tasks is usually to run them in
a nursery, and access the results after the nursery block. For example:
async with trio.open_nursery() as n:
results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
print("results:", *[r.result() for r in results])
It is also possible to wait on an individual task by using the ResultBase.wait_done()
method:
async with trio.open_nursery() as n:
results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
await results[2].wait_done()
# Task 2 is now done (while other tasks may still be running).
print("result 2:", results[2].result()
# All tasks are now done.
print("results:", *[r.result() for r in results])
The ResultBase.wait_done()
method is a method of the base class, so it is also available
for Future
objects, as are the utility functions below.
Sometimes, you may need some more control than this. This page documents a few utility functions in
aioresult for slightly more complex waiting patterns. Under the hood, they all have very simple
implementations in terms of ResultBase.wait_done()
.
If you wish to wait for a collection of tasks to complete without using a nursery, or have a
collection of Future
objects (which cannot be used in the above pattern), then you can use
the wait_all()
function:
async with trio.open_nursery() as n:
results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
even_results = results[::2]
await wait_all(even_results)
# All tasks in even positions are now done.
print("even results:", *[r.result() for r in even_results]
# All tasks are now done.
print("all results:", *[r.result() for r in results])
Similarly, wait_any()
may be used to wait until (at least) one of the tasks is complete.
The return value of the function is the ResultBase
object that is completed:
async with trio.open_nursery() as n:
results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
completed = await wait_any(results)
print("task completed:", completed.args[0], "result:", completed.result())
print("all results:", *[r.result() for r in results])
If you wish to access results one at a time, as they complete, then it is tempting to call
wait_any()
in a loop, erasing the most recent result each time. But a simpler solution is to
use the results_to_channel()
function:
async with trio.open_nursery() as n:
results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
# Create a channel and run aioresult.results_to_channel() in the nursery
send_channel, receive_channel = trio.open_memory_channel(1)
n.start_soon(results_to_channel, results, send_channel)
async for r in receive_channel:
print("task completed:", r.args[0], "result:", r.result())
Running a callback when a task is done¶
Another utility routine that was considered for aioresult is one that runs a callback when the task
is done, similar to asyncio.Future.add_done_callback()
. This would be straightforward to
implement but is not in keeping with the spirit of structured concurrency. If
needed, it is simple to write a little boilerplate to wait for ResultBase.wait_done()
and
then call the desired callback:
def my_callback(result):
print("Task completed:", result.args[0], "result:", result.result())
async def wait_and_run(result):
await result.wait_done()
my_callback(result)
async def run_all():
async with trio.open_nursery() as n:
results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
for r in results:
n.start_soon(wait_and_run, r)
# By the time this line is reached, my_callback() has been completed for each result.
An alternative, if my_callback()
is a function you can modify, is to wait for
ResultBase.wait_done()
at the start. That way, it can be run in the nursery directly:
async def my_callback(result):
await result.wait_done()
print("Task completed:", result.args[0], "result:", result.result())
async def run_all():
async with trio.open_nursery() as n:
results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
for r in results:
n.start_soon(my_callback, r)
# By the time this line is reached, my_callback() has been completed for each result.
Of course, if you can get this far, then you can probably modify your function to call the underlying routine directly and eliminate your usage of aioresult altogether:
async def my_fn(i):
result = await foo(i)
print("Task completed:", i, "result:", result)
async def run_all():
async with trio.open_nursery() as n:
for i in range(10):
n.start_soon(my_fn, i)
Reference¶
- await aioresult.wait_all(results: Iterable[ResultBase]) None ¶
Waits until all tasks are done.
The implementation is extremely simple: it just iterates over the parameter and calls
ResultBase.wait_done()
on each in turn. (It does not matter if they finish in a different order than the iteration order becauseResultBase.wait_done()
returns immediately if the task is already done.)- Parameters:
results – The results to wait for.
- await aioresult.wait_any(results: Iterable[ResultBase]) ResultBase ¶
Waits until one of the tasks is complete, and returns that object.
Note that it is possible that, when this function returns, more than one of the tasks has actually completed (i.e.,
ResultBase.is_done()
could returnTrue
for more than one of them).- Parameters:
results – The
ResultBase
objects to wait for.- Returns:
One of the objects in
result
.- Raises:
RuntimeError – If
results
is empty.
- await aioresult.results_to_channel(results: Iterable[ResultBase], channel: MemorySendChannel | MemoryObjectSendStream, close_on_complete: bool = True) None ¶
Waits for
ResultBase
tasks to complete, and sends them to an async channel.The results are waited for in parallel, so they are sent to the channel in the order they complete rather than the order they are in the
results
iterable. As usual when waiting for async tasks, the ordering is not guaranteed for tasks that finish at very similar times.This function does not return until all tasks in
results
have completed, so it would normally be used by being passed totrio.Nursery.start_soon()
, rather than being directly awaited in the caller.- Parameters:
results – The
ResultBase
objects to send to the channel.channel – The send end of the channel to send to.
close_on_complete – If
True
(the default), the channel will be closed when the function completes. This means that iterating over the receive end of the channel withasync for
will complete once all results have been returned.
Warning
If
close_on_complete
is True and this routine is cancelled then the channel is still closed. (The close is done in afinally:
block.) This means that, in this situation, anasync for
loop over the receive end will complete without all results being returned. This is done to avoid a recipient waiting forever.In practice, it will very often be the case that the same nursery is used for both this routine and the routine that iterates over the results. In that case, the
async for
would be interrupted anyway.