Waiting for a result

Motivation and usage

As noted elsewhere, the best way to wait for ResultCapture tasks is usually to run them in a nursery, and access the results after the nursery block. For example:

async with trio.open_nursery() as n:
    results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
print("results:", *[r.result() for r in results])

It is also possible to wait on an individual task by using the ResultBase.wait_done() method:

async with trio.open_nursery() as n:
    results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]

    await results[2].wait_done()
    # Task 2 is now done (while other tasks may still be running).
    print("result 2:", results[2].result()

# All tasks are now done.
print("results:", *[r.result() for r in results])

The ResultBase.wait_done() method is a method of the base class, so it is also available for Future objects, as are the utility functions below.

Sometimes, you may need some more control than this. This page documents a few utility functions in aioresult for slightly more complex waiting patterns. Under the hood, they all have very simple implementations in terms of ResultBase.wait_done().

If you wish to wait for a collection of tasks to complete without using a nursery, or have a collection of Future objects (which cannot be used in the above pattern), then you can use the wait_all() function:

async with trio.open_nursery() as n:
    results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]

    even_results = results[::2]
    await wait_all(even_results)
    # All tasks in even positions are now done.
    print("even results:", *[r.result() for r in even_results]

# All tasks are now done.
print("all results:", *[r.result() for r in results])

Similarly, wait_any() may be used to wait until (at least) one of the tasks is complete. The return value of the function is the ResultBase object that is completed:

async with trio.open_nursery() as n:
    results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]

    completed = await wait_any(results)
    print("task completed:", completed.args[0], "result:", completed.result())

print("all results:", *[r.result() for r in results])

If you wish to access results one at a time, as they complete, then it is tempting to call wait_any() in a loop, erasing the most recent result each time. But a simpler solution is to use the results_to_channel() function:

async with trio.open_nursery() as n:
    results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]

    # Create a channel and run aioresult.results_to_channel() in the nursery
    send_channel, receive_channel = trio.open_memory_channel(1)
    n.start_soon(results_to_channel, results, send_channel)

    async for r in receive_channel:
        print("task completed:", r.args[0], "result:", r.result())

Running a callback when a task is done

Another utility routine that was considered for aioresult is one that runs a callback when the task is done, similar to asyncio.Future.add_done_callback(). This would be straightforward to implement but is not in keeping with the spirit of structured concurrency. If needed, it is simple to write a little boilerplate to wait for ResultBase.wait_done() and then call the desired callback:

def my_callback(result):
    print("Task completed:", result.args[0], "result:", result.result())

async def wait_and_run(result):
    await result.wait_done()
    my_callback(result)

async def run_all():
    async with trio.open_nursery() as n:
        results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
        for r in results:
            n.start_soon(wait_and_run, r)
    # By the time this line is reached, my_callback() has been completed for each result.

An alternative, if my_callback() is a function you can modify, is to wait for ResultBase.wait_done() at the start. That way, it can be run in the nursery directly:

async def my_callback(result):
    await result.wait_done()
    print("Task completed:", result.args[0], "result:", result.result())

async def run_all():
    async with trio.open_nursery() as n:
        results = [ResultCapture.start_soon(n, foo, i) for i in range(10)]
        for r in results:
            n.start_soon(my_callback, r)
    # By the time this line is reached, my_callback() has been completed for each result.

Of course, if you can get this far, then you can probably modify your function to call the underlying routine directly and eliminate your usage of aioresult altogether:

async def my_fn(i):
    result = await foo(i)
    print("Task completed:", i, "result:", result)

async def run_all():
    async with trio.open_nursery() as n:
        for i in range(10):
            n.start_soon(my_fn, i)

Reference

await aioresult.wait_all(results: Iterable[ResultBase]) None

Waits until all tasks are done.

The implementation is extremely simple: it just iterates over the parameter and calls ResultBase.wait_done() on each in turn. (It does not matter if they finish in a different order than the iteration order because ResultBase.wait_done() returns immediately if the task is already done.)

Parameters:

results – The results to wait for.

await aioresult.wait_any(results: Iterable[ResultBase]) ResultBase

Waits until one of the tasks is complete, and returns that object.

Note that it is possible that, when this function returns, more than one of the tasks has actually completed (i.e., ResultBase.is_done() could return True for more than one of them).

Parameters:

results – The ResultBase objects to wait for.

Returns:

One of the objects in result.

Raises:

RuntimeError – If results is empty.

await aioresult.results_to_channel(results: Iterable[ResultBase], channel: MemorySendChannel | MemoryObjectSendStream, close_on_complete: bool = True) None

Waits for ResultBase tasks to complete, and sends them to an async channel.

The results are waited for in parallel, so they are sent to the channel in the order they complete rather than the order they are in the results iterable. As usual when waiting for async tasks, the ordering is not guaranteed for tasks that finish at very similar times.

This function does not return until all tasks in results have completed, so it would normally be used by being passed to trio.Nursery.start_soon(), rather than being directly awaited in the caller.

Parameters:
  • results – The ResultBase objects to send to the channel.

  • channel – The send end of the channel to send to.

  • close_on_complete – If True (the default), the channel will be closed when the function completes. This means that iterating over the receive end of the channel with async for will complete once all results have been returned.

Warning

If close_on_complete is True and this routine is cancelled then the channel is still closed. (The close is done in a finally: block.) This means that, in this situation, an async for loop over the receive end will complete without all results being returned. This is done to avoid a recipient waiting forever.

In practice, it will very often be the case that the same nursery is used for both this routine and the routine that iterates over the results. In that case, the async for would be interrupted anyway.