Capturing a result¶
Motivation and usage¶
The main class of aioresult is the ResultCapture
class. If you are directly awaiting a task
then there is no need to use this class – you can just use the return value:
result1 = await foo(1)
result2 = await foo(2)
print("results:", result1, result2)
If you want to run your tasks in parallel then you would typically use a nursery, but then it’s harder to get hold of the results:
async with trio.open_nursery() as n:
n.start_soon(foo, 1)
n.start_soon(foo, 2)
# At this point the tasks have completed, but the results are lost
print("results: ??")
To get access to the results, the usual advice is to either modify the routines so that they
store their result somewhere rather than returning it or to create a little wrapper function
that stores the return value of the function you actually care about. ResultCapture
is a
simple helper to do this:
async with trio.open_nursery() as n:
result1 = ResultCapture.start_soon(n, foo, 1)
result2 = ResultCapture.start_soon(n, foo, 2)
# At this point the tasks have completed, and results are stashed in ResultCapture objects
print("results", result1.result(), result2.result())
You can get very similar effect to asyncio.gather()
by using a nursery and an array
of ResultCapture
objects:
async with trio.open_nursery() as n:
results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
print("results:", *[r.result() for r in results])
Unlike asyncio’s gather, you benefit from the safer behaviour of Trio nurseries if one of the tasks
throws an exception. ResultCapture
is also more flexible because you don’t have to use a
list, for example you could use a dictionary:
async with trio.open_nursery() as n:
results = {i: ResultCapture.start_soon(n, foo, i) for i in range(10)}
print("results:", *[f"{i} -> {r.result()}," for i, r in results.items()])
It is also possible to check whether the task is done, wait for it to be done, and checker whether it finished with an exception:
async with trio.open_nursery() as n:
result1 = ResultCapture.start_soon(n, foo, 1)
result2 = ResultCapture.start_soon(n, foo, 2)
await result1.wait_done()
# At this point the first task is done but the second could still be running
assert result1.is_done()
print("results:", *[f"{i} -> {r.result()}," for i, r in results.items()])
Exception handling¶
Behaviour¶
A key design decision about the ResultCapture
class is that exceptions are allowed to
escape out of the task so they propagate into the task’s nursery.
To illustrate this behaviour, here is an async function that raises an exception:
async def raises_after(n):
print(f"throws_after({n}) starting")
await trio.sleep(n)
print(f"throws_after({n}) raising")
raise RuntimeError(n)
Consider its use in the following snippet:
try:
async with trio.open_nursery() as n:
result1 = ResultCapture.start_soon(n, raises_after, 1)
result2 = ResultCapture.start_soon(n, raises_after, 2)
print("Completed without exception")
except Exception as e:
print("Exception caught")
print(f"result1 exception: {repr(result1.exception())}")
print(f"result2 exception: {repr(result2.exception())}")
This results in the following output:
throws_after(1) starting
throws_after(2) starting
throws_after(1) raising
Exception caught
result1 exception: RuntimeError(1)
result2 exception: Cancelled()
This happens because the first task raises an exception after 1 second, which ResultCapture
allows to propagate out into the nursery n
and causes the other task to be cancelled. Once the
cancellation is complete, the nursery re-raises the exception, so "Exception caught"
is printed.
The next two print statements show that result1
finished with the exception it raised, while the
second was cancelled before it had the chance to raise its own exception.
If a task raises an exception and you attempt to retrieve its return value by calling
ResultBase.result()
, then it will raise an instance of TaskFailedException
rather
than the original exception. For example, if the following is run after the above snippet then it
will print "TaskFailedException from RuntimeError(1)"
rather than "RuntimeError"
:
try:
print(result1.result())
except RuntimeError as e:
print("RuntimeError")
except TaskFailedException as e:
print(f"TaskFailedException from {repr(e.__cause__)}")
Overriding this behaviour¶
It is possible to override the behaviour of ResultCapture
so that it suppresses exceptions
from propagating out into the nursery. For example, consider the following snippet, which is like
the one above but with suppress_exception
set to True
:
try:
async with trio.open_nursery() as n:
result1 = ResultCapture.start_soon(n, raises_after, 1, suppress_exception=True)
result2 = ResultCapture.start_soon(n, raises_after, 2, suppress_exception=True)
print("Completed without exception")
except Exception as e:
print("Exception caught")
print(f"result1 exception: {repr(result1.exception())}")
print(f"result2 exception: {repr(result2.exception())}")
The output will instead look like this:
throws_after(1) starting
throws_after(2) starting
throws_after(1) raising
throws_after(2) raising
Completed without exception
result1 exception: RuntimeError(1)
result2 exception: RuntimeError(2)
This option is most useful when using the utility functions to wait for a result. Note
that it only suppresses exceptions of type Exception
, not those that directly derive from
BaseException
(e.g., KeyboardInterrupt
and cancellation exceptions), since it
usually would not make sense to suppress those. It also does not change the fact that any exception
raised by ResultBase.result()
is wrapped in a TaskFailedException
.
Motivation¶
A common requirement for a task-like class is that it automatically handles cancellation when there are complex interdependencies between tasks and some fail (by raising an exception or by explicit cancellation). For example, if tasks A and B are waiting on a common task C, then perhaps cancelling A (only) should allow B and C to complete, whereas also cancelling B should automatically cancel C too so that it does not pointlessly continue when its result it not needed. A deliberate design decision of aioresult is that it does not attempt to solve this particular problem. Its goal is simply to capture the result of a task, and leaves cancellation and exception semantics unchanged. This is why the main class is called “ResultCapture” rather than “Task”.
On the other hand, some other libraries, such as Outcome and trio-future, take a much lower-level approach: they consume any exception thrown by the task and reraise it when the result is retrieved. This gives the calling code more control: it can choose at what point to retrieve the result, and therefore at what point the exception is thrown. However, it has some disadvantages:
The calling code must ensure the exception is always retrieved, otherwise the exception is silently lost. That would be particularly problematic if it is an injected exception such as
trio.Cancelled
orKeyboardInterrupt
. This can be difficult to arrange reliably, especially if multiple tasks like this raise exceptions. The whole point of structured concurrency was meant to be that you don’t have to worry about this problem!For many exceptions, it does not make sense to be raised more than once, so the calling code must be careful to retrieve the result only once. For example, Outcome raises an error if it is unwrapped more than once.
The calling code must be careful about whether the exception still makes sense in the context in which the result is retrieved. For example, if the exception is a
trio.Cancelled
then its corresponding nursery must still be live.
The simpler semantics of aioresult avoids the above complexities: the exception always makes sense in its context (because it’s in the original place it was raised) and you are free to retrieve the result once, many times, or not at all.
How it works¶
The implementation of ResultCapture
is very simple. Rather than running your routine
directly in a nursery, you instead run ResultCapture.run()
in the nursery, which runs the
routine and saves the result (or exception) in a member variable:
# ResultCapture.run() method
def run(self, **kwargs):
try:
self._result = await self._fn(*self._args, **kwargs)
except BaseException as e:
self._exception = e
raise
finally:
self._done_event.set()
The actual implementation looks slightly different, but that’s just so it can share some code with
the Future
class, and so that it can avoid calling raise
if
suppress_exception=True
. It still fundamentally works like this.
The typical way to run this is to start it in a nursery with the ResultCapture.start_soon()
class method. This is even more simple! It just constructs an instance and then runs it in the
nursery:
@classmethod
def start_soon(cls: type, nursery: Nursery, routine, *args):
task = cls(routine, *args) # cls is ResultCapture, so this constructs an instance
nursery.start_soon(task.run)
return task
It is not strictly necessary to use the ResultCapture.start_soon()
function; if you need
more control about how the routine is run then it is reasonable to instantiate
ResultCapture
explicitly in your code and arrange for the ResultCapture.run()
method to be called. For example, this is useful when you need to use trio.Nursery.start()
rather than trio.Nursery.start_soon()
; see the next section for details.
Wait for a task to finish starting¶
Trio and anyio support waiting until a task has finished starting with trio.Nursery.start()
and anyio.abc.TaskGroup.start()
. For example, a routine that supports this could look like
this:
async def my_fn(i, task_status=trio.TASK_STATUS_IGNORED):
await trio.sleep(i)
task_status.started(i * 2)
await trio.sleep(i)
return i * 3
It could be used as follows:
async with trio.open_nursery() as n:
start_result = await n.start(my_fn, 1)
# At this point task is running in background
# Another 1 second passes before we get here
For example, trio.serve_tcp()
signals that it has finished starting when the port is open
for listening, and it returns which port number it is listening on (which is useful because the
port can be assigned automatically).
The peculiar-looking default value of trio.TASK_STATUS_IGNORED
is there so that the function can
be called without using it as part of this special dance. In particular, this means you can use
these functions with ResultCapture
as usual:
async with trio.open_nursery() as n:
rc = ResultCapture.start_soon(n, my_fn, 1)
print("Done result:", rc.result())
If you need to wait until the task has just finished starting or retrieve its start value then the
trick is to construct the ResultCapture
instance and use its ResultCapture.run()
method explicitly, rather than using the ResultCapture.start_soon()
wrapper function as
usual. This allows you to pass ResultCapture.run()
to trio.Nursery.start()
rather
than trio.Nursery.start_soon()
:
async with trio.open_nursery() as n:
rc = ResultCapture(my_fn, 1)
start_value = await n.start(rc.run)
print("Start value:", start_value)
print("Done result:", rc.result())
For most usages, the code above is exactly what you need.
In some rare cases, it may be useful to run the startup code for several routines concurrently. In
that case, as always, the solution is to use a nursery, which this time executes the startup code.
The ResultCapture.capture_start_and_done_results()
function allows this. It returns two
ResultCapture
instances, with the first representing the start result and the second
representing the done result. It can be used like so:
async with trio.open_nursery() as rn:
async with trio.open_nursery() as sn:
sr1, dr1 = ResultCapture.capture_start_and_done_results(rn, my_fn, 1, start_nursery=sn)
sr2, dr2 = ResultCapture.capture_start_and_done_results(rn, my_fn, 2, start_nursery=sn)
# The nursery sn is done, so both tasks are now started
print("Start results:", sr1.result(), sr2.result())
# The nursery rn is done, so both tasks are now done
print("Done results:", dr1.result(), dr2.result())
The implementation of ResultCapture.capture_start_and_done_results()
is fairly simple,
although it is awkward enough that it is useful not to have to write it out every time:
@classmethod
def capture_start_and_done_results(
cls, run_nursery: Nursery, routine, *args, start_nursery: Optional[Nursery] = None
):
if start_nursery is None:
start_nursery = run_nursery
done_result = cls(routine, *args) # cls is ResultCapture, so this creates an instance
start_result = cls(run_nursery.start, done_result.run) # As does this
start_nursery.start_soon(start_result.run)
return start_result, done_result
Reference¶
The two main classes in aioresult, ResultCapture
and Future
, have almost
identical interfaces: they both allow waiting for and retrieving a value. That interface is
contained in the base class, ResultBase
:
Note
If you are returning a ResultCapture
or Future
from a function then you may
wish to document the return type as just ResultBase
because that has all the methods
relevant for retrieving a result.
- class aioresult.ResultBase¶
Base class for
ResultCapture
andFuture
. Has methods for checking if the task is done and fetching its result.- result() object ¶
Returns the captured result of the task.
- Returns:
The value returned by the task.
- Raises:
TaskNotDoneException – If the task is not done yet; use
is_done()
orwait_done()
to check or wait for completion to avoid this exception.TaskFailedException – If the task failed with an exception; see Exception handling.
- exception() BaseException | None ¶
Returns the exception raised by the task.
It is usually better design to use the
result()
method and catch the exception it raises. However,exception()
can be useful in some situations e.g. filtering a list ofResultCapture
objects.- Returns:
The exception raised by the task, if it completed by raising an exception. Note that it is the original exception returned, not a
TaskFailedException
as raised byresult()
.If the task completed by returning a value then this method returns
None
.
- Raises:
TaskNotDoneException – If the task is not done yet.
- is_done() bool ¶
Returns
True
if the task is done i.e. the result (or an exception) is captured. ReturnsFalse
otherwise.
- await wait_done() None ¶
Waits until the task is done.
This is mainly useful for
Future
instances.For
ResultCapture
instances, there are specialised situations where it may be useful to use this method to wait until the task is done, typically where you are writing library code and you want to start a routine in a user supplied nursery but wait for it in some other context. Typically, though, it is better design to wait for the task’s nursery to complete. Consider a nursery-based approach before using this method.Note
If the underlying routine raises an exception (which includes the case that it is cancelled) then this routine will not raise an exception; it will simply return to indicate that the task is done. Similarly, cancelling a call to this routine will not cancel the task running in this object. This is in contrast to awaiting an
asyncio.Future
instance, where cancellations propagate directly. See Exception handling for more information.
ResultBase
makes uses of the following exception classes:
- exception aioresult.TaskFailedException¶
Bases:
Exception
Exception raised when accessing
ResultBase.result()
for a task that raised an exception. This exception is raised as a chained exception:raise TaskFailedException(self) from original_exception
This allows access to the original exception and the relevant
ResultCapture
orFuture
as attributes:- __cause__: BaseException¶
The original exception that was raised by the task.
- args: tuple[ResultBase]¶
1-tuple of the
ResultCapture
orFuture
that raised this exception.
- exception aioresult.TaskNotDoneException¶
Bases:
Exception
Exception raised when attempting to access the result (using
ResultBase.result()
orResultBase.exception()
) that is not complete yet.It is raised with the capture instance passed as the argument to the exception:
raise TaskNotDoneException(self)
This allows access to the
ResultCapture
orFuture
from the exception using theargs
attribute:- args: tuple[ResultBase]¶
1-tuple of the
ResultCapture
orFuture
that raised this exception.
The main class in aioresult is ResultCapture
:
- class aioresult.ResultCapture(routine, *args, suppress_exception: bool = False)¶
Bases:
ResultBase
Captures the result of a task for later access.
Most usually, an instance is created with the
start_soon()
class method. However, it is possible to instantiate directly, in which case you will need to arrange for therun()
method to be called.- Parameters:
routine – An async callable.
args – Positional arguments for
routine
.suppress_exception – If
True
, exceptions derived fromException
(not those directly derived fromBaseException
will be caught internally rather than allowed to escape into the enclosing nursery. IfFalse
(the default), all exceptions will be allowed to escape into the enclosing context.
Note
ResultCapture
inherits most of its methods from its base classResultBase
; see the documentation for that class for the inherited methods.- classmethod start_soon(nursery: Nursery | TaskGroup, routine, *args, suppress_exception: bool = False)¶
Runs the task in the given nursery and captures its result.
Under the hood, this simply constructs an instance with
ResultCapture(routine, *args)
, starts the routine by callingnursery.start_soon(rc.run)
, then returns the new object. It’s literally three lines long! But it’s the preferred way to create an instance.- Parameters:
nursery – A
trio.Nursery
oranyio.abc.TaskGroup
to run the routine.routine – An async callable.
args – Positional arguments for
routine
. If you want to pass keyword arguments, usefunctools.partial()
.suppress_exception – If
True
, exceptions derived fromException
(not those directly derived fromBaseException
will be caught internally rather than allowed to escape into the enclosing nursery. IfFalse
(the default), all exceptions will be allowed to escape into the enclosing context.
- Returns:
A new
ResultCapture
instance representing the result of the given routine.
- await run(**kwargs) None ¶
Runs the routine and captures its result.
This is where the magic of
ResultCapture
happens … except it’s not very magical (see How it works for details).Typically, you would use the
start_soon()
class method, which constructs theResultCapture
and arranges for thisrun()
method to be run in the given nursery. But it is reasonable to manually construct the object and call therun()
method in situations where extra control is needed.- Parameters:
kwargs – Keyword arguments to pass to the routine. This exists mainly to support usage with the task start protocol (see Wait for a task to finish starting). If you want to pass arguments to the routine then pass them as positional arguments to
start_soon()
or usefunctools.partial()
.- Returns:
Always
None
. To access the return value of the routine, useResultBase.result()
.- Raises:
BaseException – Whatever exception is raised by the routine.
Warning
Ensure this routine is not called more than once. In particular, do not call it at all if the instance was created with
start_soon()
, which already calls this once.
- property routine¶
The routine whose result will be captured. This is the
routine
argument that was passed to the constructor orstart_soon()
.
- property args¶
The arguments passed to the routine whose result will be captured. This is the
args
argument that was passed to the constructor orstart_soon()
.
- classmethod capture_start_and_done_results(run_nursery: Nursery | TaskGroup, routine, *args, start_nursery: Nursery | TaskGroup | None = None)¶
Captures both the startup and completion result of a task.
The first return value represents whether the task has finished starting yet (i.e., whether it has called
task_status.started()
), and itsResultBase.result()
value is the value passed as the argument totask_status.started()
(orNone
if no argument is passed). The second return value represents whether the routine has completed entirely (and its return value or exception).- Parameters:
run_nursery – The nursery to run the routine once it has finished starting.
routine – An async callable.
args – Positional arguments for
routine
. If you want to pass keyword arguments, usefunctools.partial()
.start_nursery – The nursery to run the routine until it has finished starting. If this is omitted then
run_nursery
is used.
- Returns:
A tuple
(start_result, done_result)
representing the value returned from the routine’s startup and its completion.
Note
The semantics are a little fiddly if the routine raises an exception before it completes startup (i.e., before it calls
task_status.started()
):The task is running (only) in the
start_nursery
until this point (seetrio.Nursery.start()
), so the exception is propagated out in onlystart_nursery
, rather thanrun_nursery
.The exception is recorded in
start_result
(because exceptions before startup is complete are recorded in the context of the startup nursery).The potentially surprising part is that the same exception is also recorded in
done_result
(becausedone_result.run()
directly wraps the call to the routine).
Another fiddly case is if the routine returns before it calls
task_status.started()
:This is considered an error so a
RuntimeError
exception is raised (seetrio.Nursery.start()
), and again this is propagated out in onlystart_nursery
, rather thanrun_nursery
.This
RuntimeError
exception is recordedstart_result
.However,
done_result
records whatever result was returned by the routine, anddone_result.exception()
isNone
(again, this is becausedone_result.run()
directly wraps the call to the routine).
Once the routine completes startup (i.e., after it has called
task_status.started()
), the semantics are simple: any return value or exception is associated withdone_result
, and the routine is now running in the context ofrun_nursery
so any exception is propagated out into therun_nursery
.A consequence of all the above cases is that
start_result.is_done()
anddone_result.is_done()
are eventually both true regardless of when and how the routine finished.