Capturing a result
Motivation and usage
The main class of aioresult is the ResultCapture class. If you are directly awaiting a task
then there is no need to use this class – you can just use the return value:
result1 = await foo(1)
result2 = await foo(2)
print("results:", result1, result2)
If you want to run your tasks in parallel then you would typically use a nursery, but then it’s harder to get hold of the results:
async with trio.open_nursery() as n:
n.start_soon(foo, 1)
n.start_soon(foo, 2)
# At this point the tasks have completed, but the results are lost
print("results: ??")
To get access to the results, the usual advice is to either modify the routines so that they
store their result somewhere rather than returning it or to create a little wrapper function
that stores the return value of the function you actually care about. ResultCapture is a
simple helper to do this:
async with trio.open_nursery() as n:
result1 = ResultCapture.start_soon(n, foo, 1)
result2 = ResultCapture.start_soon(n, foo, 2)
# At this point the tasks have completed, and results are stashed in ResultCapture objects
print("results", result1.result(), result2.result())
You can get very similar effect to asyncio.gather() by using a nursery and an array
of ResultCapture objects:
async with trio.open_nursery() as n:
results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
print("results:", *[r.result() for r in results])
Unlike asyncio’s gather, you benefit from the safer behaviour of Trio nurseries if one of the tasks
throws an exception. ResultCapture is also more flexible because you don’t have to use a
list, for example you could use a dictionary:
async with trio.open_nursery() as n:
results = {i: ResultCapture.start_soon(n, foo, i) for i in range(10)}
print("results:", *[f"{i} -> {r.result()}," for i, r in results.items()])
It is also possible to check whether the task is done, wait for it to be done, and check whether it finished with an exception:
async with trio.open_nursery() as n:
result1 = ResultCapture.start_soon(n, foo, 1)
result2 = ResultCapture.start_soon(n, foo, 2)
await result1.wait_done()
# At this point the first task is done but the second could still be running
assert result1.is_done()
print("results:", *[f"{i} -> {r.result()}," for i, r in results.items()])
How it works
The implementation of ResultCapture is very simple. Rather than running your routine
directly in a nursery, you instead run ResultCapture.run() in the nursery, which runs the
routine and saves the result (or exception) in a member variable:
# ResultCapture.run() method
def run(self, **kwargs):
try:
self._result = await self._fn(*self._args, **kwargs)
except BaseException as e:
self._exception = e
raise
finally:
self._done_event.set()
The actual implementation looks slightly different, but that’s just so it can share some code with
the Future class, and so that it can avoid calling raise if
suppress_exception=True. It still fundamentally works like this.
The typical way to run this is to start it in a nursery with the ResultCapture.start_soon()
class method. This is even more simple! It just constructs an instance and then runs it in the
nursery:
# ResultCapture.start_soon() class method
@classmethod
def start_soon(cls: type, nursery: Nursery, routine, *args):
task = cls(routine, *args) # cls is ResultCapture, so this constructs an instance
nursery.start_soon(task.run)
return task
Exception handling
Behaviour
A key design decision about the ResultCapture class is that exceptions are allowed to
escape out of the task so they propagate into the task’s nursery.
To illustrate this behaviour, here is an async function that raises an exception:
async def raises_after(n):
print(f"throws_after({n}) starting")
await trio.sleep(n)
print(f"throws_after({n}) raising")
raise RuntimeError(n)
Consider its use in the following snippet:
try:
async with trio.open_nursery() as n:
result1 = ResultCapture.start_soon(n, raises_after, 1)
result2 = ResultCapture.start_soon(n, raises_after, 2)
print("Completed without exception")
except Exception as e:
print("Exception caught")
print(f"result1 exception: {repr(result1.exception())}")
print(f"result2 exception: {repr(result2.exception())}")
This results in the following output:
throws_after(1) starting
throws_after(2) starting
throws_after(1) raising
Exception caught
result1 exception: RuntimeError(1)
result2 exception: Cancelled()
This happens because the first task raises an exception after 1 second, which ResultCapture
allows to propagate out into the nursery n and causes the other task to be cancelled. Once the
cancellation is complete, the nursery re-raises the exception, so "Exception caught" is printed.
The next two print statements show that result1 finished with the exception it raised, while the
second was cancelled before it had the chance to raise its own exception.
If a task raises an exception and you attempt to retrieve its return value by calling
ResultBase.result(), then it will raise an instance of TaskFailedException rather
than the original exception. For example, if the following is run after the above snippet then it
will print "TaskFailedException from RuntimeError(1)" rather than "RuntimeError":
try:
print(result1.result())
except RuntimeError as e:
print("RuntimeError")
except TaskFailedException as e:
print(f"TaskFailedException from {repr(e.__cause__)}")
Overriding this behaviour
It is possible to override the behaviour of ResultCapture so that it suppresses exceptions
from propagating out into the nursery. For example, consider the following snippet, which is like
the one above but with suppress_exception set to True:
try:
async with trio.open_nursery() as n:
result1 = ResultCapture.start_soon(n, raises_after, 1, suppress_exception=True)
result2 = ResultCapture.start_soon(n, raises_after, 2, suppress_exception=True)
print("Completed without exception")
except Exception as e:
print("Exception caught")
print(f"result1 exception: {repr(result1.exception())}")
print(f"result2 exception: {repr(result2.exception())}")
The output will instead look like this:
throws_after(1) starting
throws_after(2) starting
throws_after(1) raising
throws_after(2) raising
Completed without exception
result1 exception: RuntimeError(1)
result2 exception: RuntimeError(2)
This option is most useful when using the utility functions to wait for a result. Note
that it only suppresses exceptions of type Exception, not those that directly derive from
BaseException (e.g., KeyboardInterrupt and cancellation exceptions), since it
usually would not make sense to suppress those. It also does not change the fact that any exception
raised by ResultBase.result() is wrapped in a TaskFailedException.
Motivation
Some related libraries, such as Outcome and trio-future, consume any exception thrown by the task and reraise it when the result is retrieved. This gives the calling code more control: it can choose at what point to retrieve the result, and therefore at what point the exception is thrown. However, it has some disadvantages:
The calling code must ensure the exception is always retrieved, otherwise the exception is silently lost. That would be particularly problematic if it is an injected exception such as
trio.CancelledorKeyboardInterrupt. This can be difficult to arrange reliably, especially if multiple tasks like this raise exceptions. The whole point of structured concurrency was meant to be that you don’t have to worry about this problem!For many exceptions, it does not make sense to be raised more than once, so the calling code must be careful to retrieve the result only once. For example, Outcome raises an error if it is unwrapped more than once.
The calling code must be careful about whether the exception still makes sense in the context in which the result is retrieved. For example, if the exception is a
trio.Cancelledthen its corresponding nursery must still be live.
The simpler semantics of aioresult avoids the above complexities: the exception always makes sense in its context (because it’s in the original place it was raised) and you are free to retrieve the result once, many times, or not at all.
Type hints
The classes and functions in aioresult include type hints. These allow IDEs and static type checkers to detect potential errors before your program is run; see the introduction in the mypy docs for more information on Python type hints in general. For example, a type checker would find the two errors in the following code:
async def double_int(x: int) -> int:
await trio.sleep(1)
return x * 2
async with trio.open_nursery() as n:
# Error! Parameter should be int not str
rc = ResultCapture.start_soon(n, double_int, "seven")
# Error! Assigning int into str variable
x: str = rc.result()
The ResultCapture, Future and ResultBase classes have a type parameter,
which represents the type of value being captured. This allows you to use these classes in type
hints for variables and function parameters in your own code. For example:
async def do_something(rc: ResultCapture[str]): ...
# Error! Assigning ResultCapture[int] into ResultCapture[str]
await do_something(ResultCapture.start_soon(n, double_int, 7))
You do not need to specify the type parameter when using ResultCapture.start_soon(). For
that function, the type paremeter of ResultCapture is inferred from the return type of the
function passed to it.
The type parameter of ResultCapture and ResultBase is covariant; this means that you can
assign to a variable where the parameter is a looser type. For example, if Animal is a base
class with derived classes Cat and Dog, then you would see this behaviour:
rc_dog = ResultCapture.start_soon(n, get_dog) # Inferred to be ResultCapture[Dog]
rc_animal: ResultCapture[Animal] = rc_dog # This is OK
rc_cat: ResultCapture[Cat] = rc_animal # This causes an error
The Future class does not behave this way because, if it did, you would be able to use the
Future.set_result() method on the looser type to put the wrong type of value in (e.g., if
you have a Future[Dog] and were able to use it as a Future[Animal] then you could use that
to put a Cat in it). You can always put it into a ResultBase variable if you need this
sort of behaviour (e.g., you can assign a Future[Dog] into a ResultBase[Animal] variable).
Not all types can be perfectly type checked in Python. In aioresult, there are two main limitations of the type checking:
The function given to
ResultCaptureis only checked for compatibility with the arguments that you give if you useResultCapture.start_soon(). If you manually constructResultCaptureand callResultCapture.run(), or if you useResultCapture.capture_start_and_done_results(), the parameter types are not checked:rc = ResultCapture[int](double_int, "seven") # Oops, no error
The
ResultCapture.argsproperty does not remember the type of the arguments passed in. Its type hint is simply a tuple oftyping.Any:rc = ResultCapture.start_soon(n, double_int, 7) a: tuple[str, float] = rc.args # Oops, no error
Waiting for a task to finish starting
Trio and anyio support waiting until a task has finished starting with trio.Nursery.start()
and anyio.abc.TaskGroup.start(). For example, a routine that supports this could look like
this:
async def my_fn(i, task_status=trio.TASK_STATUS_IGNORED):
await trio.sleep(i)
task_status.started(i * 2)
await trio.sleep(i)
return i * 3
It could be used as follows:
async with trio.open_nursery() as n:
start_result = await n.start(my_fn, 1)
# At this point task is running in background
# Another 1 second passes before we get here
For example, trio.serve_tcp() signals that it has finished starting when the port is open
for listening, and it returns which port number it is listening on (which is useful because the
port can be assigned automatically).
The peculiar-looking default value of trio.TASK_STATUS_IGNORED is there so that the function can
be called without using it as part of this special dance. In particular, this means you can use
these functions with ResultCapture as usual:
async with trio.open_nursery() as n:
rc = ResultCapture.start_soon(n, my_fn, 1)
print("Done result:", rc.result())
If you need to wait until the task has just finished starting or retrieve its start value then the
trick is to construct the ResultCapture instance and use its ResultCapture.run()
method explicitly, rather than using the ResultCapture.start_soon() wrapper function as
usual. This allows you to pass ResultCapture.run() to trio.Nursery.start() rather
than trio.Nursery.start_soon():
async with trio.open_nursery() as n:
rc = ResultCapture(my_fn, 1)
start_value = await n.start(rc.run)
print("Start value:", start_value)
print("Done result:", rc.result())
Note
For most usages, you just need code like the snippet above: construct ResultCapture
explicitly and its ResultCapture.run() method to trio.Nursery.start(). To some
extent, the ResultCapture.capture_start_and_done_results() function described below was
written just to show it could be done.
In some rare cases, it may be useful to run the startup code for several routines concurrently. In
that case, as always, the solution is to use a nursery, which this time executes the startup code.
The ResultCapture.capture_start_and_done_results() function allows this. It returns two
ResultCapture instances, with the first representing the start result and the second
representing the done result. It can be used like so:
async with trio.open_nursery() as rn:
async with trio.open_nursery() as sn:
sr1, dr1 = ResultCapture.capture_start_and_done_results(rn, my_fn, 1, start_nursery=sn)
sr2, dr2 = ResultCapture.capture_start_and_done_results(rn, my_fn, 2, start_nursery=sn)
# The nursery sn is done, so both tasks are now started
print("Start results:", sr1.result(), sr2.result())
# The nursery rn is done, so both tasks are now done
print("Done results:", dr1.result(), dr2.result())
The implementation of ResultCapture.capture_start_and_done_results() is fairly simple,
although it is awkward enough that it is useful not to have to write it out every time:
@classmethod
def capture_start_and_done_results(
cls, run_nursery: Nursery, routine, *args, start_nursery: Optional[Nursery] = None
):
if start_nursery is None:
start_nursery = run_nursery
done_result = cls(routine, *args) # cls is ResultCapture, so this creates an instance
start_result = cls(run_nursery.start, done_result.run) # As does this
start_nursery.start_soon(start_result.run)
return start_result, done_result
String representation
Converting a ResultBase object to a string shows whether it completed, and if so its
result:
rc = ResultCapture.start_soon(nursery, foo, "arg1", 2)
print(rc) # prints: ResultCapture(is_done=False)
# Later, might print: ResultCapture(result=3)
# Or it might print: ResultCapture(exception=KeyError(3))
For ResultCapture objects, passing # as the format string will also include the
routine name and its arguments in the string:
print(f"{rc:#}") # prints: ResultCapture(routine=foo, args=('arg1', 2), is_done=False)
Reference
- class aioresult.ResultBase[ResultT_co]
Base class for
ResultCaptureandFuture.The two main classes in aioresult,
ResultCaptureandFuture, have almost identical interfaces: they both allow waiting for and retrieving a value. That interface is contained in this base class.- Type Parameters:
ResultT_co – Type of the value returned from
result(). See Type hints.
Note
If you are returning a
ResultCaptureorFuturefrom a function then you may wish to document the return type as justResultBasebecause that has all the relevant interface for retrieving a result.- result()
Returns the captured result of the task.
- Returns:
ResultT_co – The value returned by the task.
- Raises:
TaskNotDoneException – If the task is not done yet; use
is_done()orwait_done()to check or wait for completion to avoid this exception.TaskFailedException – If the task failed with an exception; see Exception handling.
- exception()
Returns the exception raised by the task.
It is usually better design to use the
result()method and catch the exception it raises. However,exception()can be useful in some situations e.g. filtering a list ofResultCaptureobjects.- Returns:
BaseException – The exception raised by the task, if it completed by raising an exception. Note that it is the original exception returned, not a
TaskFailedExceptionas raised byresult().None – If the task completed by returning a value.
- Raises:
TaskNotDoneException – If the task is not done yet.
- is_done()
Returns whether the task is done, i.e., the result (or an exception) is captured.
- Returns:
bool –
Trueif the task is done;Falseotherwise.
- await wait_done()
Waits until the task is done.
This is mainly useful for
Futureinstances.For
ResultCaptureinstances, there are specialised situations where it may be useful to use this method to wait until the task is done, typically where you are writing library code and you want to start a routine in a user supplied nursery but wait for it in some other context. Typically, though, it is better design to wait for the task’s nursery to complete. Consider a nursery-based approach before using this method.- Returns:
None – To access the result use
result().
Note
If the underlying routine raises an exception (which includes the case that it is cancelled) then this routine will not raise an exception; it will simply return to indicate that the task is done. Similarly, cancelling a call to this routine will not cancel the task running in this object. This is in contrast to awaiting an
asyncio.Futureinstance, where cancellations propagate directly. See Exception handling for more information.
- class aioresult.ResultCapture[ResultT_co](routine, *args, suppress_exception=False)
Bases:
ResultBase[ResultT_co]Captures the result of a task for later access.
Most usually, an instance is created with the
start_soon()class method. However, it is possible to instantiate directly, in which case you will need to arrange for therun()method to be called.- Type Parameters:
ResultT_co – Type of the value returned from tbe task. See Type hints.
- Parameters:
routine (Callable[..., Awaitable[ResultT_co]]) – An async callable.
args – Positional arguments for
routine.suppress_exception (bool) – If
True, exceptions derived fromException(not those directly derived fromBaseExceptionwill be caught internally rather than allowed to escape into the enclosing nursery. IfFalse(the default), all exceptions will be allowed to escape into the enclosing context.
Note
ResultCaptureinherits most of its methods from its base classResultBase; see the documentation for that class for the inherited methods.- classmethod start_soon(nursery, routine, *args, suppress_exception=False)
Runs the task in the given nursery and captures its result.
Under the hood, this simply constructs an instance with
ResultCapture(routine, *args), starts the routine by callingnursery.start_soon(rc.run), then returns the new object. It’s literally three lines long! But it’s the preferred way to create an instance.- Type Parameters:
ResultT – The return type of
routine, which will be captured in the returnedResultCapture.*ArgsT – The type of the arguments to
routine.
- Parameters:
nursery (trio.Nursery | anyio.abc.TaskGroup) – A nursery to run the routine.
routine (Callable[[*ArgsT], Awaitable[ResultT]]) – An async callable.
args (*ArgsT) – Positional arguments for
routine. If you want to pass keyword arguments, usefunctools.partial().suppress_exception (bool) – If
True, exceptions derived fromException(not those directly derived fromBaseExceptionwill be caught internally rather than allowed to escape into the enclosing nursery. IfFalse(the default), all exceptions will be allowed to escape into the enclosing context.
- Returns:
ResultCapture[ResultT] – A new instance representing the result of the given routine.
- await run(**kwargs)
Runs the routine and captures its result.
This is where the magic of
ResultCapturehappens … except it’s not very magical (see How it works for details).Typically, you would use the
start_soon()class method, which constructs theResultCaptureand arranges for thisrun()method to be run in the given nursery. But it is reasonable to manually construct the object and call therun()method in situations where extra control is needed.- Parameters:
kwargs – Keyword arguments to pass to the routine. This exists mainly to support usage with the task start protocol (see Waiting for a task to finish starting). If you want to pass arguments to the routine then pass them as positional arguments to
start_soon()or usefunctools.partial().- Returns:
None – To access the return value of the routine, use
ResultBase.result().- Raises:
BaseException – Whatever exception is raised by the routine.
Warning
Ensure this routine is not called more than once. In particular, do not call it at all if the instance was created with
start_soon(), which already calls this once.
- property routine
The routine whose result will be captured. This is the
routineargument that was passed to the constructor orstart_soon().
- property args
The arguments passed to the routine whose result will be captured. This is the
argsargument that was passed to the constructor orstart_soon().
- classmethod capture_start_and_done_results(run_nursery, routine, *args, start_nursery=None)
Captures both the startup and completion result of a task.
The first return value represents whether the task has finished starting yet (i.e., whether it has called
task_status.started()), and itsResultBase.result()value is the value passed as the argument totask_status.started()(orNoneif no argument is passed). The second return value represents whether the routine has completed entirely (and its return value or exception).- Parameters:
run_nursery – The nursery to run the routine once it has finished starting.
routine (Callable[..., Awaitable[ResultT]]) – An async callable.
args – Positional arguments for
routine. If you want to pass keyword arguments, usefunctools.partial().start_nursery (trio.Nursery | anyio.abc.TaskGroup | None) – The nursery to run the routine until it has finished starting. If this is omitted then
run_nurseryis used.
- Returns:
tuple[ResultCapture[Any], ResultCapture[ResultT]] –
(start_result, done_result)representing the value returned from the routine’s startup and its completion.
Note
The semantics are a little fiddly if the routine raises an exception before it completes startup (i.e., before it calls
task_status.started()):The task is running (only) in the
start_nurseryuntil this point (seetrio.Nursery.start()), so the exception is propagated out in onlystart_nursery, rather thanrun_nursery.The exception is recorded in
start_result(because exceptions before startup is complete are recorded in the context of the startup nursery).The potentially surprising part is that the same exception is also recorded in
done_result(becausedone_result.run()directly wraps the call to the routine).
Another fiddly case is if the routine returns before it calls
task_status.started():This is considered an error so a
RuntimeErrorexception is raised (seetrio.Nursery.start()), and again this is propagated out in onlystart_nursery, rather thanrun_nursery.This
RuntimeErrorexception is recordedstart_result.However,
done_resultrecords whatever result was returned by the routine, anddone_result.exception()isNone(again, this is becausedone_result.run()directly wraps the call to the routine).
Once the routine completes startup (i.e., after it has called
task_status.started()), the semantics are simple: any return value or exception is associated withdone_result, and the routine is now running in the context ofrun_nurseryso any exception is propagated out into therun_nursery.A consequence of all the above cases is that
start_result.is_done()anddone_result.is_done()are eventually both true regardless of when and how the routine finished.
- exception aioresult.TaskFailedException
Bases:
ExceptionException raised when accessing
ResultBase.result()for a task that raised an exception. This exception is raised as a chained exception:raise TaskFailedException(self) from original_exception
This allows access to the original exception and the relevant
ResultCaptureorFutureas attributes:- __cause__: BaseException
The original exception that was raised by the task.
- args: tuple[ResultBase]
1-tuple of the
ResultCaptureorFuturethat raised this exception.
- exception aioresult.TaskNotDoneException
Bases:
ExceptionException raised when attempting to access the result (using
ResultBase.result()orResultBase.exception()) that is not complete yet.It is raised with the capture instance passed as the argument to the exception:
raise TaskNotDoneException(self)
This allows access to the
ResultCaptureorFuturefrom the exception using theargsattribute:- args: tuple[ResultBase]
1-tuple of the
ResultCaptureorFuturethat raised this exception.