Python’s asyncio#
Note
Asynchronous code is not the same as code that runs in parallel, even more so in Python because of the GIL (More info).
Basically, the async framework allows us to not waste time waiting for results that we don’t know when will arrive.
It either allows us to attach a callback for when the result is ready, or to run many service calls and await
for them all, instead of running one at a time.
There are two main ways to interact with async code, the first being by awaiting the results or by handling those
results through callbacks. Let’s go through both of them with examples.
Use a venv#
We already know that it is a good practice to When you want to isolate your environment, use venv. So, let’s turn that into a reflex and do so for this whole section.
cd ~
source ros2tutorial_venv/bin/activate
Create the minimalist_async package#
In this step, we’ll work on these.
python/
`-- minimalist_package
|-- minimalist_package
| |-- __init__.py
| |-- _minimalist_class.py
| |-- minimalist_async
| | |-- __init__.py
| | |-- _unlikely_to_return.py
| | |-- async_await_example.py
| | `-- async_callback_example.py
| `-- minimalist_script.py
|-- setup.py
`-- test
`-- test_minimalist_class.py
As we learned in Minimalist package: something to start with, let’s make a package called minimalist_async.
cd ~/ros2_tutorials_preamble/python/minimalist_package/minimalist_package
mkdir minimalist_async
cd minimalist_async
we then create an __init__.py file with the following contents
1from minimalist_package.minimalist_async._unlikely_to_return import unlikely_to_return
Create the async function#
In this step, we’ll work on these.
python/
`-- minimalist_package
|-- minimalist_package
| |-- __init__.py
| |-- _minimalist_class.py
| |-- minimalist_async
| | |-- __init__.py
| | |-- _unlikely_to_return.py
| | |-- async_await_example.py
| | `-- async_callback_example.py
| `-- minimalist_script.py
|-- setup.py
`-- test
`-- test_minimalist_class.py
Let’s create a module called _unlikely_to_return.py to hold a function used for this example at the ~/ros2_tutorials_preamble/python/minimalist_package/minimalist_package/minimalist_async folder with the following contents
1import asyncio
2import random
3from textwrap import dedent
4
5
6async def unlikely_to_return(tag: str, likelihood: float = 0.1) -> float:
7 """
8 A function that is unlikely to return.
9 :return: When it returns, the successful random roll as a float.
10 """
11 while True:
12 a = random.uniform(0.0, 1.0)
13 if a < likelihood:
14 print(f"{tag} Done.")
15 return a
16 else:
17 print(f"{tag} retry needed (roll = {a} > {likelihood})")
18 await asyncio.sleep(0.1)
Because we’re using await in the function, we start by defining an async function.
Hint
If the function/method uses await anywhere, it should be async (More info).
This function was thought this way to emulate, for example, us waiting for something
external without actually having to. To do so, we add a while True: and return only with 10% chance. Instead of using a time.sleep() we
use await asyncio.sleep(0.1) to unleash the power of async. The main difference is that time.sleep() is synchronous (blocking), meaning that the interpreter
will be locked here until it finishes. With await, the interpreter is free to do other things and come back to this one later after the desired amount of time has elapsed.
The function by itself doesn’t do much, so let’s use it in another module.
Using await#
TL;DR Using await
Run multiple
Tasks.Use
awaitfor them, after they were executed.
In this step, we’ll work on these.
python/
`-- minimalist_package
|-- minimalist_package
| |-- __init__.py
| |-- _minimalist_class.py
| |-- minimalist_async
| | |-- __init__.py
| | |-- _unlikely_to_return.py
| | |-- async_await_example.py
| | `-- async_callback_example.py
| `-- minimalist_script.py
|-- setup.py
`-- test
`-- test_minimalist_class.py
Differently from synchronous programming, using async needs us to reflect on several tasks being executed at the same time(-ish).
The main use case is for programs with multiple tasks that can run concurrently and, at some point, we need the result of those tasks to either
end the program or further continue with other tasks.
The await strategy we’re seeing now is suitable when either we need the results from all tasks before proceeding or when the order of results matters.
To illustrate this, let’s make a file called async_await_example.py in minimalist_async with the following contents.
1import asyncio
2from minimalist_package.minimalist_async import unlikely_to_return
3
4
5async def async_main() -> None:
6 tags: list[str] = ["task1", "task2"]
7 tasks: list[asyncio.Task] = []
8
9 # Start all tasks before awaiting them, otherwise the code
10 # will not be concurrent.
11 for task_tag in tags:
12 task = asyncio.create_task(
13 unlikely_to_return(tag=task_tag)
14 )
15 tasks.append(task)
16
17 # Alternatively, use asyncio.gather()
18 # At this point, the functions are already running concurrently. We are now (a)waiting for the
19 # results, IN THE ORDER OF THE AWAIT, even if the other task ends first.
20 print("Awaiting results...")
21 for (tag, task) in zip(tags, tasks):
22 result = await task
23 print(f"The result of task={tag} was {result}.")
24
25
26def main() -> None:
27 try:
28 asyncio.run(async_main())
29 except KeyboardInterrupt:
30 pass
31 except Exception as e:
32 print(e)
33
34
35if __name__ == "__main__":
36 main()
We start by importing the async method we defined in the other module
from minimalist_package.minimalist_async import unlikely_to_return
The function will be run by an instance of asyncio.Task. When the task is created, it is equivalent to calling the function and it starts running concurrently to the script that created the task. The example is a bit on the fancy side to make it easier to read and maintain, but the concept is simple. When using the await paradigm, focus on the following
Make the function it should run, like our
unlikely_to_return().Run all concurrent tasks and keep a reference to them as
asyncio.Task.awaiton eachasyncio.Task, in the order in which you want those results.
async def async_main() -> None:
tags: list[str] = ["task1", "task2"]
tasks: list[asyncio.Task] = []
# Start all tasks before awaiting them, otherwise the code
# will not be concurrent.
for task_tag in tags:
task = asyncio.create_task(
unlikely_to_return(tag=task_tag)
)
tasks.append(task)
# Alternatively, use asyncio.gather()
# At this point, the functions are already running concurrently. We are now (a)waiting for the
# results, IN THE ORDER OF THE AWAIT, even if the other task ends first.
print("Awaiting results...")
for (tag, task) in zip(tags, tasks):
result = await task
print(f"The result of task={tag} was {result}.")
Ok, enough with the explanation, let’s go to the endorphin rush of actually running the program with
cd ~/ros2_tutorials_preamble/python/minimalist_package/
python3 -m minimalist_package.minimalist_async.async_await_example
Which will result in something like shown below. The function is stochastic, so it might take more or less time to return and the order of the tasks ending might also be different.
However, in the await framework, the results will ALWAYS be processed in the order that was specified
by the await, EVEN WHEN THE OTHER TASK ENDS FIRST, as in the example below. This is neither good nor bad,
it will be proper for some cases and not proper for others.
We can also see that both tasks are running concurrently until task2 finishes, then only task1 is executed.
Awaiting results...
task1 retry needed (roll = 0.36896762068176037 > 0.1).
task2 retry needed (roll = 0.8429002838770375 > 0.1).
task1 retry needed (roll = 0.841018521652675 > 0.1).
task2 retry needed (roll = 0.1351152094825686 > 0.1).
task1 retry needed (roll = 0.9484654265361889 > 0.1).
task2 retry needed (roll = 0.3167046796566366 > 0.1).
task1 retry needed (roll = 0.7519672365071198 > 0.1).
task2 retry needed (roll = 0.38440407016827005 > 0.1).
task1 retry needed (roll = 0.23155484384953284 > 0.1).
task2 retry needed (roll = 0.6418306170261009 > 0.1).
task1 retry needed (roll = 0.532161975008607 > 0.1).
task2 Done.
task1 retry needed (roll = 0.448132225703992 > 0.1).
task1 retry needed (roll = 0.13504700640433664 > 0.1).
task1 retry needed (roll = 0.7404815278498079 > 0.1).
task1 retry needed (roll = 0.9830081693068259 > 0.1).
task1 retry needed (roll = 0.4070546146764875 > 0.1).
task1 retry needed (roll = 0.7474267487174882 > 0.1).
task1 Done.
The result of task=task1 was 0.038934769861482144.
The result of task=task2 was 0.06380247590535493.
Process finished with exit code 0
Hooray! May there be concurrency!
Using callback#
TL;DR Using callbacks
Run multiple
Tasks.Add a
callbackto handle the result as soon as it is ready.Use
awaitfor eachTaskjust so that the main loop does not return prematurely.
In this step, we’ll work on these.
python/
`-- minimalist_package
|-- minimalist_package
| |-- __init__.py
| |-- _minimalist_class.py
| |-- minimalist_async
| | |-- __init__.py
| | |-- _unlikely_to_return.py
| | |-- async_await_example.py
| | `-- async_callback_example.py
| `-- minimalist_script.py
|-- setup.py
`-- test
`-- test_minimalist_class.py
Differently from awaiting for each task and then processing their result, we can define callbacks
in such a way that each result will be processed as they come. In that way, the results can be processed in an arbitrary
order. Once again, this is inherently neither a good strategy nor a bad one. Some frameworks will work with callbacks,
for example ROS1, ROS2, and Qt, but some others will prefer to use await.
Enough diplomacy, let’s make a file called async_callback_example.py in minimalist_async with the following contents.
1from functools import partial
2import asyncio
3from minimalist_package.minimalist_async import unlikely_to_return
4
5
6def handle_return_callback(tag: str, future: asyncio.Future) -> None:
7 """
8 Callback example for asyncio.Future
9 :param tag: An example parameter, in this case, a tag
10 :param future: A asyncio.Future is expected to be the last parameter
11 of the callback.
12 :return: Nothing.
13 """
14 if future is not None and future.done():
15 print(f"The result of task={tag} was {future.result()}.")
16 else:
17 print(f"Problem with task={tag}.")
18
19
20async def async_main() -> None:
21 tags: list[str] = ["task1", "task2"]
22 tasks: list[asyncio.Task] = []
23
24 # Start all tasks before adding the callback
25 for task_tag in tags:
26 task = asyncio.create_task(
27 unlikely_to_return(tag=task_tag)
28 )
29 task.add_done_callback(
30 partial(handle_return_callback, task_tag)
31 )
32 tasks.append(task)
33
34 # Alternatively, use asyncio.gather()
35 # At this point, the functions are already running concurrently. And the result will be processed
36 # by the callback AS "SOON" AS THEY ARE AVAILABLE.
37 print("Awaiting results...")
38 for task in tasks:
39 await task
40
41
42def main() -> None:
43 try:
44 asyncio.run(async_main())
45 except KeyboardInterrupt:
46 pass
47 except Exception as e:
48 print(e)
49
50
51if __name__ == "__main__":
52 main()
In the callback paradigm, besides the function that does the actual task, as in the prior example, we have to make
a, to no one’s surprise, callback function to process the results as they come.
We do so with
def handle_return_callback(tag: str, future: asyncio.Future) -> None:
"""
Callback example for asyncio.Future
:param tag: An example parameter, in this case, a tag
:param future: A asyncio.Future is expected to be the last parameter
of the callback.
:return: Nothing.
"""
if future is not None and future.done():
print(f"The result of task={tag} was {future.result()}.")
else:
print(f"Problem with task={tag}.")
In this case, the callback must receive a asyncio.Future and process it. Test the future for None in
case the task fails for any reason.
Aside from that, there are only two key differences with the await logic example we showed before,
The callback must be added with
task.add_done_callback(), remember to usepartial()if the callback has other parameters besides theFutureawaitfor the tasks at the end, not because this script will process it (it will be processed as they come by itscallback), but because otherwise the main script will return and (most likely) nothing will be done.
async def async_main() -> None:
tags: list[str] = ["task1", "task2"]
tasks: list[asyncio.Task] = []
# Start all tasks before adding the callback
for task_tag in tags:
task = asyncio.create_task(
unlikely_to_return(tag=task_tag)
)
task.add_done_callback(
partial(handle_return_callback, task_tag)
)
tasks.append(task)
# Alternatively, use asyncio.gather()
# At this point, the functions are already running concurrently. And the result will be processed
# by the callback AS "SOON" AS THEY ARE AVAILABLE.
print("Awaiting results...")
for task in tasks:
await task
But enough talk… Have at you! Let’s run the code with
cd ~/ros2_tutorials_preamble/python/minimalist_package/
python3 -m minimalist_package.minimalist_async.async_callback_example
Depending on our luck, we will have a very illustrative result like the one below. This example shows that, with the callback logic, when the second task
ends before the first one, it will be automatically processed by its callback.
Awaiting results...
task1 retry needed (roll = 0.6248308966234916 > 0.1).
task2 retry needed (roll = 0.24259714032999036 > 0.1).
task1 retry needed (roll = 0.1996764883575476 > 0.1).
task2 Done.
The result of task=task2 was 0.09069407383542283.
task1 retry needed (roll = 0.6700777523785147 > 0.1).
task1 retry needed (roll = 0.7344216907108979 > 0.1).
task1 retry needed (roll = 0.4907223062034761 > 0.1).
task1 retry needed (roll = 0.20026037098687932 > 0.1).
task1 Done.
The result of task=task1 was 0.09676678954317675.
Can you feel the new synaptic connections?