Note

This section is optional, the ROS2 tutorial starts at ROS2 Installation.

Python’s asyncio

Note

Asynchronous code is not the same as code that runs in parallel, even more so in Python because of the GIL (More info). Basically, the async framework allows us to not waste time waiting for results that we don’t know when will arrive. It either allows us to attach a callback for when the result is ready, or to run many service calls and await for them all, instead of running one at a time.

There are two main ways to interact with async code, the first being by awaiting the results or by handling those results through callbacks. Let’s go through both of them with examples.

Use a venv

We already know that it is a good practice to When you want to isolate your environment, use venv. So, let’s turn that into a reflex and do so for this whole section.

cd ~
source ros2tutorial_venv/bin/activate

Create the minimalist_async package

In this step, we’ll work on these.

python/minimalist_package/minimalist_package/
  └── minimalist_async/
        └── __init__.py

As we learned in Minimalist package: something to start with, let’s make a package called minimalist_async.

cd ~/ros2_tutorials_preamble/python/minimalist_package/minimalist_package
mkdir minimalist_async
cd minimalist_async

we then create an __init__.py file with the following contents

__init__.py

1from minimalist_package.minimalist_async._unlikely_to_return import unlikely_to_return

Create the async function

In this step, we’ll work on this.

python/minimalist_package/minimalist_package/
  └── minimalist_async/
        └── __init__.py
        └── _unlikely_to_return.py

Let’s create a module called _unlikely_to_return.py to hold a function used for this example at the ~/ros2_tutorials_preamble/python/minimalist_package/minimalist_package/minimalist_async folder with the following contents

_unlikely_to_return.py

 1import asyncio
 2import random
 3from textwrap import dedent
 4
 5
 6async def unlikely_to_return(tag: str, likelihood: float = 0.1) -> float:
 7    """
 8    A function that is unlikely to return.
 9    :return: When it returns, the successful random roll as a float.
10    """
11    while True:
12        a = random.uniform(0.0, 1.0)
13        if a < likelihood:
14            print(f"{tag} Done.")
15            return a
16        else:
17            print(f"{tag} retry needed (roll = {a} > {likelihood})")
18            await asyncio.sleep(0.1)

Because we’re using await in the function, we start by defining an async function.

Hint

If the function/method uses await anywhere, it should be async (More info).

This function was thought this way to emulate, for example, us waiting for something external without actually having to. To do so, we add a while True: and return only with 10% chance. Instead of using a time.sleep() we use await asyncio.sleep(0.1) to unleash the power of async. The main difference is that time.sleep() is synchronous (blocking), meaning that the interpreter will be locked here until it finishes. With await, the interpreter is free to do other things and come back to this one later after the desired amount of time has elapsed.

The function by itself doesn’t do much, so let’s use it in another module.

Using await

TL;DR Using await

  1. Run multiple Tasks.

  2. Use await for them, after they were executed.

In this step, we’ll work on this.

python/minimalist_package/minimalist_package/
  └── minimalist_async/
        └── __init__.py
        └── _unlikely_to_return.py
        └── async_await_example.py

Differently from synchronous programming, using async needs us to reflect on several tasks being executed at the same time(-ish). The main use case is for programs with multiple tasks that can run concurrently and, at some point, we need the result of those tasks to either end the program or further continue with other tasks.

The await strategy we’re seeing now is suitable when either we need the results from all tasks before proceeding or when the order of results matters.

To illustrate this, let’s make a file called async_await_example.py in minimalist_async with the following contents.

async_await_example.py

 1import asyncio
 2from minimalist_package.minimalist_async import unlikely_to_return
 3
 4
 5async def async_main() -> None:
 6    tags: list[str] = ["task1", "task2"]
 7    tasks: list[asyncio.Task] = []
 8
 9    # Start all tasks before awaiting them, otherwise the code
10    # will not be concurrent.
11    for task_tag in tags:
12        task = asyncio.create_task(
13            unlikely_to_return(tag=task_tag)
14        )
15        tasks.append(task)
16
17    # Alternatively, use asyncio.gather()
18    # At this point, the functions are already running concurrently. We are now (a)waiting for the
19    # results, IN THE ORDER OF THE AWAIT, even if the other task ends first.
20    print("Awaiting results...")
21    for (tag, task) in zip(tags, tasks):
22        result = await task
23        print(f"The result of task={tag} was {result}.")
24
25
26def main() -> None:
27    try:
28        asyncio.run(async_main())
29    except KeyboardInterrupt:
30        pass
31    except Exception as e:
32        print(e)
33
34
35if __name__ == "__main__":
36    main()

We start by importing the async method we defined in the other module

from minimalist_package.minimalist_async import unlikely_to_return

The function will be run by an instance of asyncio.Task. When the task is created, it is equivalent to calling the function and it starts running concurrently to the script that created the task. The example is a bit on the fancy side to make it easier to read and mantain, but the concept is simple. When using the await paradigm, focus on the following

  1. Make the function it should run, like our unlikely_to_return().

  2. Run all concurrent tasks and keep a reference to them as asyncio.Task.

  3. await on each asyncio.Task, in the order in which you want those results.

async def async_main() -> None:
    tags: list[str] = ["task1", "task2"]
    tasks: list[asyncio.Task] = []

    # Start all tasks before awaiting them, otherwise the code
    # will not be concurrent.
    for task_tag in tags:
        task = asyncio.create_task(
            unlikely_to_return(tag=task_tag)
        )
        tasks.append(task)

    # Alternatively, use asyncio.gather()
    # At this point, the functions are already running concurrently. We are now (a)waiting for the
    # results, IN THE ORDER OF THE AWAIT, even if the other task ends first.
    print("Awaiting results...")
    for (tag, task) in zip(tags, tasks):
        result = await task
        print(f"The result of task={tag} was {result}.")

Ok, enough with the explanation, let’s go to the endorphin rush of actually running the program with

cd ~/ros2_tutorials_preamble/python/minimalist_package/
python3 -m minimalist_package.minimalist_async.async_await_example

Which will result in something like shown below. The function is stochastic, so it might take more or less time to return and the order of the tasks ending might also be different.

However, in the await framework, the results will ALWAYS be processed in the order that was specified by the await, EVEN WHEN THE OTHER TASK ENDS FIRST, as in the example below. This is neither good nor bad, it will be proper for some cases and not proper for others.

We can also see that both tasks are running concurrently until task2 finishes, then only task1 is executed.

Awaiting results...
task1 retry needed (roll = 0.36896762068176037 > 0.1).
task2 retry needed (roll = 0.8429002838770375 > 0.1).
task1 retry needed (roll = 0.841018521652675 > 0.1).
task2 retry needed (roll = 0.1351152094825686 > 0.1).
task1 retry needed (roll = 0.9484654265361889 > 0.1).
task2 retry needed (roll = 0.3167046796566366 > 0.1).
task1 retry needed (roll = 0.7519672365071198 > 0.1).
task2 retry needed (roll = 0.38440407016827005 > 0.1).
task1 retry needed (roll = 0.23155484384953284 > 0.1).
task2 retry needed (roll = 0.6418306170261009 > 0.1).
task1 retry needed (roll = 0.532161975008607 > 0.1).
task2 Done.
task1 retry needed (roll = 0.448132225703992 > 0.1).
task1 retry needed (roll = 0.13504700640433664 > 0.1).
task1 retry needed (roll = 0.7404815278498079 > 0.1).
task1 retry needed (roll = 0.9830081693068259 > 0.1).
task1 retry needed (roll = 0.4070546146764875 > 0.1).
task1 retry needed (roll = 0.7474267487174882 > 0.1).
task1 Done.
The result of task=task1 was 0.038934769861482144.
The result of task=task2 was 0.06380247590535493.

Process finished with exit code 0

Hooray! May there be concurrency!

Using callback

TL;DR Using callbacks

  1. Run multiple Tasks.

  2. Add a callback to handle the result as soon as it is ready.

  3. Use await for each Task just so that the main loop does not return prematurely.

In this step, we’ll work on this.

python/minimalist_package/minimalist_package/
  └── minimalist_async/
        └── __init__.py
        └── async_await_example.py
        └── async_callback_example.py

Differently from awaiting for each task and then processing their result, we can define callbacks in such a way that each result will be processed as they come. In that way, the results can be processed in an arbitrary order. Once again, this is inherently neither a good strategy nor a bad one. Some frameworks will work with callbacks, for example ROS1, ROS2, and Qt, but some others will prefer to use await.

Enough diplomacy, let’s make a file called async_callback_example.py in minimalist_async with the following contents.

async_callback_example.py

 1from functools import partial
 2import asyncio
 3from minimalist_package.minimalist_async import unlikely_to_return
 4
 5
 6def handle_return_callback(tag: str, future: asyncio.Future) -> None:
 7    """
 8    Callback example for asyncio.Future
 9    :param tag: An example parameter, in this case, a tag
10    :param future: A asyncio.Future is expected to be the last parameter
11    of the callback.
12    :return: Nothing.
13    """
14    if future is not None and future.done():
15        print(f"The result of task={tag} was {future.result()}.")
16    else:
17        print(f"Problem with task={tag}.")
18
19
20async def async_main() -> None:
21    tags: list[str] = ["task1", "task2"]
22    tasks: list[asyncio.Task] = []
23
24    # Start all tasks before adding the callback
25    for task_tag in tags:
26        task = asyncio.create_task(
27            unlikely_to_return(tag=task_tag)
28        )
29        task.add_done_callback(
30            partial(handle_return_callback, task_tag)
31        )
32        tasks.append(task)
33
34    # Alternatively, use asyncio.gather()
35    # At this point, the functions are already running concurrently. And the result will be processed
36    # by the callback AS "SOON" AS THEY ARE AVAILABLE.
37    print("Awaiting results...")
38    for task in tasks:
39        await task
40
41
42def main() -> None:
43    try:
44        asyncio.run(async_main())
45    except KeyboardInterrupt:
46        pass
47    except Exception as e:
48        print(e)
49
50
51if __name__ == "__main__":
52    main()

In the callback paradigm, besides the function that does the actual task, as in the prior example, we have to make a, to no one’s surprise, callback function to process the results as they come.

We do so with

def handle_return_callback(tag: str, future: asyncio.Future) -> None:
    """
    Callback example for asyncio.Future
    :param tag: An example parameter, in this case, a tag
    :param future: A asyncio.Future is expected to be the last parameter
    of the callback.
    :return: Nothing.
    """
    if future is not None and future.done():
        print(f"The result of task={tag} was {future.result()}.")
    else:
        print(f"Problem with task={tag}.")

In this case, the callback must receive a asyncio.Future and process it. Test the future for None in case the task fails for any reason.

Aside from that, there are only two key differences with the await logic example we showed before,

  1. The callback must be added with task.add_done_callback(), remember to use partial() if the callback has other parameters besides the Future

  2. await for the tasks at the end, not because this script will process it (it will be processed as they come by its callback), but because otherwise the main script will return and (most likely) nothing will be done.

async def async_main() -> None:
    tags: list[str] = ["task1", "task2"]
    tasks: list[asyncio.Task] = []

    # Start all tasks before adding the callback
    for task_tag in tags:
        task = asyncio.create_task(
            unlikely_to_return(tag=task_tag)
        )
        task.add_done_callback(
            partial(handle_return_callback, task_tag)
        )
        tasks.append(task)

    # Alternatively, use asyncio.gather()
    # At this point, the functions are already running concurrently. And the result will be processed
    # by the callback AS "SOON" AS THEY ARE AVAILABLE.
    print("Awaiting results...")
    for task in tasks:
        await task

But enough talk… Have at you! Let’s run the code with

cd ~/ros2_tutorials_preamble/python/
python3 -m minimalist_package.minimalist_async.async_callback_example

Depending on our luck, we will have a very illustrative result like the one below. This example shows that, with the callback logic, when the second task ends before the first one, it will be automatically processed by its callback.

Awaiting results...
task1 retry needed (roll = 0.6248308966234916 > 0.1).
task2 retry needed (roll = 0.24259714032999036 > 0.1).
task1 retry needed (roll = 0.1996764883575476 > 0.1).
task2 Done.
The result of task=task2 was 0.09069407383542283.
task1 retry needed (roll = 0.6700777523785147 > 0.1).
task1 retry needed (roll = 0.7344216907108979 > 0.1).
task1 retry needed (roll = 0.4907223062034761 > 0.1).
task1 retry needed (roll = 0.20026037098687932 > 0.1).
task1 Done.
The result of task=task1 was 0.09676678954317675.

Can you feel the new synaptic connections?