Note
This section is optional, the ROS2 tutorial starts at ROS2 Installation.
Python’s asyncio
Note
Asynchronous code is not the same as code that runs in parallel, even more so in Python because of the GIL (More info).
Basically, the async
framework allows us to not waste time waiting for results that we don’t know when will arrive.
It either allows us to attach a callback
for when the result is ready, or to run many service calls and await
for them all, instead of running one at a time.
There are two main ways to interact with async
code, the first being by await
ing the results or by handling those
results through callbacks
. Let’s go through both of them with examples.
Use a venv
We already know that it is a good practice to When you want to isolate your environment, use venv. So, let’s turn that into a reflex and do so for this whole section.
cd ~
source ros2tutorial_venv/bin/activate
Create the minimalist_async
package
In this step, we’ll work on these.
python/minimalist_package/minimalist_package/
└── minimalist_async/
└── __init__.py
As we learned in Minimalist package: something to start with, let’s make a package called minimalist_async
.
cd ~/ros2_tutorials_preamble/python/minimalist_package/minimalist_package
mkdir minimalist_async
cd minimalist_async
we then create an __init__.py
file with the following contents
1from minimalist_package.minimalist_async._unlikely_to_return import unlikely_to_return
Create the async
function
In this step, we’ll work on this.
python/minimalist_package/minimalist_package/
└── minimalist_async/
└── __init__.py
└── _unlikely_to_return.py
Let’s create a module called _unlikely_to_return.py
to hold a function used for this example at the ~/ros2_tutorials_preamble/python/minimalist_package/minimalist_package/minimalist_async
folder with the following contents
1import asyncio
2import random
3from textwrap import dedent
4
5
6async def unlikely_to_return(tag: str, likelihood: float = 0.1) -> float:
7 """
8 A function that is unlikely to return.
9 :return: When it returns, the successful random roll as a float.
10 """
11 while True:
12 a = random.uniform(0.0, 1.0)
13 if a < likelihood:
14 print(f"{tag} Done.")
15 return a
16 else:
17 print(f"{tag} retry needed (roll = {a} > {likelihood})")
18 await asyncio.sleep(0.1)
Because we’re using await
in the function, we start by defining an async
function.
Hint
If the function/method uses await
anywhere, it should be async
(More info).
This function was thought this way to emulate, for example, us waiting for something
external without actually having to. To do so, we add a while True:
and return only with 10% chance. Instead of using a time.sleep()
we
use await asyncio.sleep(0.1)
to unleash the power of async
. The main difference is that time.sleep()
is synchronous (blocking), meaning that the interpreter
will be locked here until it finishes. With await
, the interpreter is free to do other things and come back to this one later after the desired amount of time has elapsed.
The function by itself doesn’t do much, so let’s use it in another module.
Using await
TL;DR Using await
Run multiple
Task
s.Use
await
for them, after they were executed.
In this step, we’ll work on this.
python/minimalist_package/minimalist_package/
└── minimalist_async/
└── __init__.py
└── _unlikely_to_return.py
└── async_await_example.py
Differently from synchronous programming, using async
needs us to reflect on several tasks being executed at the same time(-ish).
The main use case is for programs with multiple tasks that can run concurrently and, at some point, we need the result of those tasks to either
end the program or further continue with other tasks.
The await
strategy we’re seeing now is suitable when either we need the results from all tasks before proceeding or when the order of results matters.
To illustrate this, let’s make a file called async_await_example.py
in minimalist_async
with the following contents.
1import asyncio
2from minimalist_package.minimalist_async import unlikely_to_return
3
4
5async def async_main() -> None:
6 tags: list[str] = ["task1", "task2"]
7 tasks: list[asyncio.Task] = []
8
9 # Start all tasks before awaiting them, otherwise the code
10 # will not be concurrent.
11 for task_tag in tags:
12 task = asyncio.create_task(
13 unlikely_to_return(tag=task_tag)
14 )
15 tasks.append(task)
16
17 # Alternatively, use asyncio.gather()
18 # At this point, the functions are already running concurrently. We are now (a)waiting for the
19 # results, IN THE ORDER OF THE AWAIT, even if the other task ends first.
20 print("Awaiting results...")
21 for (tag, task) in zip(tags, tasks):
22 result = await task
23 print(f"The result of task={tag} was {result}.")
24
25
26def main() -> None:
27 try:
28 asyncio.run(async_main())
29 except KeyboardInterrupt:
30 pass
31 except Exception as e:
32 print(e)
33
34
35if __name__ == "__main__":
36 main()
We start by importing the async
method we defined in the other module
from minimalist_package.minimalist_async import unlikely_to_return
The function will be run by an instance of asyncio.Task
. When the task is created, it is equivalent to calling the function and it starts running concurrently to the script that created the task. The example is a bit on the fancy side to make it easier to read and mantain, but the concept is simple. When using the await
paradigm, focus on the following
Make the function it should run, like our
unlikely_to_return()
.Run all concurrent tasks and keep a reference to them as
asyncio.Task
.await
on eachasyncio.Task
, in the order in which you want those results.
async def async_main() -> None:
tags: list[str] = ["task1", "task2"]
tasks: list[asyncio.Task] = []
# Start all tasks before awaiting them, otherwise the code
# will not be concurrent.
for task_tag in tags:
task = asyncio.create_task(
unlikely_to_return(tag=task_tag)
)
tasks.append(task)
# Alternatively, use asyncio.gather()
# At this point, the functions are already running concurrently. We are now (a)waiting for the
# results, IN THE ORDER OF THE AWAIT, even if the other task ends first.
print("Awaiting results...")
for (tag, task) in zip(tags, tasks):
result = await task
print(f"The result of task={tag} was {result}.")
Ok, enough with the explanation, let’s go to the endorphin rush of actually running the program with
cd ~/ros2_tutorials_preamble/python/minimalist_package/
python3 -m minimalist_package.minimalist_async.async_await_example
Which will result in something like shown below. The function is stochastic, so it might take more or less time to return and the order of the tasks ending might also be different.
However, in the await
framework, the results will ALWAYS be processed in the order that was specified
by the await
, EVEN WHEN THE OTHER TASK ENDS FIRST, as in the example below. This is neither good nor bad,
it will be proper for some cases and not proper for others.
We can also see that both tasks are running concurrently until task2
finishes, then only task1
is executed.
Awaiting results...
task1 retry needed (roll = 0.36896762068176037 > 0.1).
task2 retry needed (roll = 0.8429002838770375 > 0.1).
task1 retry needed (roll = 0.841018521652675 > 0.1).
task2 retry needed (roll = 0.1351152094825686 > 0.1).
task1 retry needed (roll = 0.9484654265361889 > 0.1).
task2 retry needed (roll = 0.3167046796566366 > 0.1).
task1 retry needed (roll = 0.7519672365071198 > 0.1).
task2 retry needed (roll = 0.38440407016827005 > 0.1).
task1 retry needed (roll = 0.23155484384953284 > 0.1).
task2 retry needed (roll = 0.6418306170261009 > 0.1).
task1 retry needed (roll = 0.532161975008607 > 0.1).
task2 Done.
task1 retry needed (roll = 0.448132225703992 > 0.1).
task1 retry needed (roll = 0.13504700640433664 > 0.1).
task1 retry needed (roll = 0.7404815278498079 > 0.1).
task1 retry needed (roll = 0.9830081693068259 > 0.1).
task1 retry needed (roll = 0.4070546146764875 > 0.1).
task1 retry needed (roll = 0.7474267487174882 > 0.1).
task1 Done.
The result of task=task1 was 0.038934769861482144.
The result of task=task2 was 0.06380247590535493.
Process finished with exit code 0
Hooray! May there be concurrency!
Using callback
TL;DR Using callbacks
Run multiple
Task
s.Add a
callback
to handle the result as soon as it is ready.Use
await
for eachTask
just so that the main loop does not return prematurely.
In this step, we’ll work on this.
python/minimalist_package/minimalist_package/
└── minimalist_async/
└── __init__.py
└── async_await_example.py
└── async_callback_example.py
Differently from await
ing for each task and then processing their result, we can define callbacks
in such a way that each result will be processed as they come. In that way, the results can be processed in an arbitrary
order. Once again, this is inherently neither a good strategy nor a bad one. Some frameworks will work with callbacks,
for example ROS1, ROS2, and Qt, but some others will prefer to use await
.
Enough diplomacy, let’s make a file called async_callback_example.py
in minimalist_async
with the following contents.
1from functools import partial
2import asyncio
3from minimalist_package.minimalist_async import unlikely_to_return
4
5
6def handle_return_callback(tag: str, future: asyncio.Future) -> None:
7 """
8 Callback example for asyncio.Future
9 :param tag: An example parameter, in this case, a tag
10 :param future: A asyncio.Future is expected to be the last parameter
11 of the callback.
12 :return: Nothing.
13 """
14 if future is not None and future.done():
15 print(f"The result of task={tag} was {future.result()}.")
16 else:
17 print(f"Problem with task={tag}.")
18
19
20async def async_main() -> None:
21 tags: list[str] = ["task1", "task2"]
22 tasks: list[asyncio.Task] = []
23
24 # Start all tasks before adding the callback
25 for task_tag in tags:
26 task = asyncio.create_task(
27 unlikely_to_return(tag=task_tag)
28 )
29 task.add_done_callback(
30 partial(handle_return_callback, task_tag)
31 )
32 tasks.append(task)
33
34 # Alternatively, use asyncio.gather()
35 # At this point, the functions are already running concurrently. And the result will be processed
36 # by the callback AS "SOON" AS THEY ARE AVAILABLE.
37 print("Awaiting results...")
38 for task in tasks:
39 await task
40
41
42def main() -> None:
43 try:
44 asyncio.run(async_main())
45 except KeyboardInterrupt:
46 pass
47 except Exception as e:
48 print(e)
49
50
51if __name__ == "__main__":
52 main()
In the callback
paradigm, besides the function that does the actual task, as in the prior example, we have to make
a, to no one’s surprise, callback function to process the results as they come.
We do so with
def handle_return_callback(tag: str, future: asyncio.Future) -> None:
"""
Callback example for asyncio.Future
:param tag: An example parameter, in this case, a tag
:param future: A asyncio.Future is expected to be the last parameter
of the callback.
:return: Nothing.
"""
if future is not None and future.done():
print(f"The result of task={tag} was {future.result()}.")
else:
print(f"Problem with task={tag}.")
In this case, the callback
must receive a asyncio.Future
and process it. Test the future for None
in
case the task fails for any reason.
Aside from that, there are only two key differences with the await
logic example we showed before,
The callback must be added with
task.add_done_callback()
, remember to usepartial()
if the callback has other parameters besides theFuture
await
for the tasks at the end, not because this script will process it (it will be processed as they come by itscallback
), but because otherwise the main script will return and (most likely) nothing will be done.
async def async_main() -> None:
tags: list[str] = ["task1", "task2"]
tasks: list[asyncio.Task] = []
# Start all tasks before adding the callback
for task_tag in tags:
task = asyncio.create_task(
unlikely_to_return(tag=task_tag)
)
task.add_done_callback(
partial(handle_return_callback, task_tag)
)
tasks.append(task)
# Alternatively, use asyncio.gather()
# At this point, the functions are already running concurrently. And the result will be processed
# by the callback AS "SOON" AS THEY ARE AVAILABLE.
print("Awaiting results...")
for task in tasks:
await task
But enough talk… Have at you! Let’s run the code with
cd ~/ros2_tutorials_preamble/python/
python3 -m minimalist_package.minimalist_async.async_callback_example
Depending on our luck, we will have a very illustrative result like the one below. This example shows that, with the callback
logic, when the second task
ends before the first one, it will be automatically processed by its callback
.
Awaiting results...
task1 retry needed (roll = 0.6248308966234916 > 0.1).
task2 retry needed (roll = 0.24259714032999036 > 0.1).
task1 retry needed (roll = 0.1996764883575476 > 0.1).
task2 Done.
The result of task=task2 was 0.09069407383542283.
task1 retry needed (roll = 0.6700777523785147 > 0.1).
task1 retry needed (roll = 0.7344216907108979 > 0.1).
task1 retry needed (roll = 0.4907223062034761 > 0.1).
task1 retry needed (roll = 0.20026037098687932 > 0.1).
task1 Done.
The result of task=task1 was 0.09676678954317675.
Can you feel the new synaptic connections?