---

# S06E05 : Concurrency - The asyncio module

Cyril Desjouy

--- 

## Préambule

En septembre 2019, il existe encore certains problèmes lors de l'utilisation des *event_loop* dans Jupyter. Les exemples fournis dans ce notebook ont été testés avec:

* IPython>7
* ipykernel>5
* tornado==4.5.3
* notebook==5.6.0

Si vous rencontrez des problèmes pour les exécuter, upgradez/downgradez les packages précisés ci dessus afin de reproduire le même setup.

Par ailleurs, vous noterez que la première partie de ce notebook est fortement inspirée du très bon article (en français) de **nohar** appelé [La puissance cachée des coroutines](https://zestedesavoir.com/articles/152/la-puissance-cachee-des-coroutines/). Vous pourrez vous y référer pour plus de détails. Cette partie n'est cependant pas indispensable pour comprendre comment utiliser le module `asyncio` mais permet d'introduire la notion de *coroutine* sous Python.

La programmation asynchrone avec `asyncio` est un vaste sujet, qui peut rapidement devenir très complexe. Les coroutines sont d'ailleurs réputées être l'un des sujets les plus complexes sous Python avec les *métaclasses*. Ce notebook a pour objectif de donner un premier aperçu de ce puissant outil qu'est la coroutine. Si vous souhaitez allez plus loin, je vous conseille fortement la [page de David Beazley](https://www.dabeaz.com/coroutines/index.html) qui est un peu *le papa des coroutines sous Python*.

## 1. Le mot clé `yield` et la naissance des coroutines [A compléter]

### 1.1. Les générateurs et coroutines

Nous avons vu dans un notebook précédent comment créer un générateur en utilisant le mot clé `yield` dans une fonction. Par exemple:

```python
def dwarfs():
    for name in ['Grumpy', 'Happy', 'Sleepy', 'Bashful', 'Sneezy', 'Dopey', 'Doc']:
        yield name
```

Cette fonction, appelée *fonction génératrice*, a la particularité de pouvoir se mettre en pause entre chaque appel de `__next__` (ou entre deux itérations) sans bloquer le fil d'exécution en cours.

Allons maitenant un peu plus loin avec l'exemple suivant:

```python
def talk():
    print('Get ready for this')
    while True:
        data = yield
        print(data)
        if data == 'stop':
            break
```


```python
>> s = talk()
>> next(s)
Get ready for this
>> s.send('Surprise!')
Surprise!
```

La fonction `talk` peut recevoir des données grâce au mot-clé `yield` (depuis Python 3.3). Sous Python, les générateurs ne sont pas uniquement des *producteurs* de données mais peuvent également servir de *consommateurs*. Le mot clé `yield` permet de créer un ***point de suspension dans l'exécution d'une fonction, que l'on peut utiliser pour échanger des données*** [[zestedesavoir.com]](https://zestedesavoir.com/articles/152/la-puissance-cachee-des-coroutines/) et cela n'est pas sans rappeler le principe de fonctionnement des *threads*. C'est ce mécanisme qu'on appelle mécanisme de ***coroutine***.

In [2]:
def talk():
    print('Get ready for this')
    while True:
        data = yield
        print(data)
        if data == 'stop':
            break

Notez qu'une exception est levée si vous essayez d'envoyer quelque chose au générateur sans avoir itéré une fois dessus:
```python
>> gen = talk()
>> gen.send('Surprise!')
TypeError: can't send non-None value to a just-started generator
```
En fait lorsque le générateur est crée, la fonction n'est pas démarrée tout de suite. Il  est nécessaire d'itérer une fois dessus pour entrer dans la boucle. Pour faciliter un peu les choses, il est possible de créer un décorateur permettant d'automatiser cette tâche:

In [3]:
def coroutine(func):
    def wrapper(*args, **kwargs):
        generator = func(*args, **kwargs)
        next(generator)
        return generator
    return wrapper

@coroutine
def talk():
    print('Get ready for this')
    while True:
        data = yield
        print(data)
        if data == 'stop':
            break

In [4]:
s = talk()
s.send('Surprise!')

Get ready for this
Surprise!


### 1.2. Encore un peu plus loin avec `yield`

Depuis Python 3.3, il est possible d'utiliser la syntaxe `yield from` pour définir un générateur. L'exemple de la fonction `dwarfs` de la section 1.1 peut alors se réécrire bien plus simplement:
```python
def dwarfs():
    yield from ['Grumpy', 'Happy', 'Sleepy', 'Bashful', 'Sneezy', 'Dopey', 'Doc']
```
Extrait de [[zestedesavoir.com]](https://zestedesavoir.com/articles/152/la-puissance-cachee-des-coroutines/):
>*Même si cela a juste l'air d'un simple raccourcis, ce n'est absolument pas le cas : lorsque qu'une coroutine utilise yield from, les données que l'on envoie à cette coroutine seront automatiquement transmises à celle appelée par le yield from. Ça a l'air d'un détail à première vue, mais grâce à cela, on est en mesure de passer dynamiquement le relais à une autre coroutine en fonction des données reçues.*

En fait `yield from` est plus généralement utilisé avec une coroutine et permet de dispatcher plus facilement les tâches. En Python 3.4, une coroutine est définie avec `yield from` et le décorateur <code>@asyncio.coroutine</code> fourni par le module `asyncio`:
```python
@asyncio.coroutine
def coroA():
    yield from coroB()
```

C'est grâce à ces coroutines que le module `asyncio` propose des outils pour réaliser des opérations asynchrones. Ce module est en développement très actif depuis la parution de Python 3 et il est important de préciser que vous trouverez sur internet des exemples et tutoriels qui ne sont pas nécessairement à jour. Restez donc vigilants concernant la version de Python utilisée dans les exemples trouvés sur internet.

## 2. Le module `asyncio`

### 2.1. La boucle d'événements

Prenons l'exemple de tâches liées à la communication réseau. Un ensemble de requêtes est envoyé à différents sites internet. En programmation synchrone, il faudra attendre que la première requête ait abouti afin de passer à la requête suivante car chaque requête est bloquante. En programmation asynchrone, la première requête est lancée et attend une réponse. Pendant de temps, une autre requête peut prendre le relai. L'alternance des requêtes est gérée par ce qu'on appelle une **boucle d'événements** (***event loop***). Cette boucle communique avec toutes les requêtes afin de permettre à chacune de se relayer au moment optimal. Il s'agit ici d'optimiser les périodes pendant lesquelles les requêtes sont en attente pour permettre à d'autres de s'exécuter. 

### 2.2. Les coroutines

Python 3.5 introduit les instructions `async` et `await` pour déclarer nativement des coroutines destinées à des applications de programmation asynchrone. Considérons l'exemple suivant:

```python
async def coroA():
    # Wait here and come back to coroA() when coroB() is ready
    r = await coroB()
    return r
```

* L'instruction `async def` permet de déclarer une coroutine native (avec `await` et/ou `return`). Elle peut également permettre de déclarer un générateur asynchrone avec `yield` à la place de `return`/`await`. Notons que les expressions `async with` et `async for` sont également valides mais ne peuvent être utilisées que dans une coroutine native.

* L'instruction `await` **ne peut pas être utilisée ailleurs que dans le corps d'une coroutine native**. Cette instruction permet de renvoyer le contrôle de la fonction `coroA` à la **boucle d'événements** et suspend momentanément l'exécution de la coroutine `coroB`.


**Note 1:** *L'utilisation du décorateur <code>@asyncio.coroutine</code> présentée dans la section 1.2. est fortement découragée. Cette formulation sera supprimée dans la version 3.10 de Python.*

**Note 2:** *L'instruction `yield from` est interdite dans une coroutine native.*


Par exemple [[extrait de python.org](https://docs.python.org/3.8/library/asyncio-task.html#coroutines)]:

In [4]:
import asyncio

async def say_after(delay, what):       # async defines a coroutine
    await asyncio.sleep(delay)          # waiting for the results of the coroutine asyncio.sleep()
    print(what)
    
say_after(1, 'Hello')

<coroutine object say_after at 0x7f36480ce8c0>

Il n'est pas possible d'exécuter directement la *coroutine* `say_after`. Il faut en effet l'exécuter dans une **boucle d'événements**. Pour ce faire, trois méthodes sont proposées par `asyncio`.
    
* Utiliser la fonction `asyncio.run()` prenant en argument d'entrée la *coroutine* à exécuter et permettant de lancer cette coroutine en gérant la boucle d'événements:

In [5]:
asyncio.run(say_after(1, 'hello'))

hello


* Appeler une *coroutine* dans une autre *coroutine* avec l'instruction `await`:

In [6]:
import time 

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(2, 'hello')
    await say_after(1, 'world')

    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

started at 17:30:36
hello
world
finished at 17:30:39


* Depuis Python 3.7, utiliser la fonction `asyncio.create_task()` qui ***wrap*** la coroutine dans une **tâche** et planifie son exécution:

In [7]:
async def main():
    task1 = asyncio.create_task(
        say_after(2, 'hello'))

    task2 = asyncio.create_task(
        say_after(1, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

started at 17:30:39
world
hello
finished at 17:30:41


Il convient de noter que les tâches ne s'exécutent pas nécessairement dans l'ordre dans lequel elles ont été appelée. La sortie montre que cet exemple s'exécute 1 seconde plus rapidement que le précédent. Les deux tâches s'exécutent en effet de manière concurrente. `task2` s'exécute en attendant que `task1` soit prête.

### 2.3. Les ***awaitables***

Comme nous l'avons vu dans les exemples précédents, les coroutines (crées avec `async def` par exemple) et les tâches (crées avec `async.create_task`) peuvent être utilisées en tant qu'*éléments en attente* avec l'instruction `await`. On appelle communément ces éléments ***awaitables***. Les objets `Futures` peuvent également être utilisés en tant qu'***awaitables***. Comme nous l'avons vu précédemment, une `Future` est un objet qui représente un résultat éventuel d'une opération asynchrone. Quand un objet `Future` est attendu (avec l'instruction `await`), la coroutine attend que la `Future` soit exécutée. 

En résumé, il est possible d'utiliser `await` uniquement avec:
* une coroutine
* une tâche
* une `Future`

En pratique, il est possible de créer une `Future` à partir de n'importe quel objet à l'aide de la méthode `loop.run_in_executor()` héritée par les boucles d'événements. Il faut alors s'occuper soit même de la création d'une telle boucle avec `asyncio.get_event_loop()`. Ceci permet d'exécuter une portion de code de manière asynchrone:

In [8]:
import asyncio
import functools
import time

def say_after(delay, what):
    time.sleep(delay)
    return what

async def main():
    loop = asyncio.get_event_loop()    
    future = loop.run_in_executor(None, functools.partial(say_after, 1, 'Hello'))
    return await future

future = asyncio.run(main())
print(future)

Hello


Dans ce cas, l'implémentation de `say_after()` reste synchrone mais cette fonction est appelée de manière asynchrone. Notons l'utilisation de `functools.partial()` pour passer ses arguments à la fonction `say_after()`.

### 2.4. Exécuter des tâches concurrentes

La fonction `asyncio.gather(awaitables)` permet d'exécuter une séquence d'*éléments en attente* (***awaitables***) de manière concurrente. Si tous les *éléments en attente* sont exécutés avec succès, `asyncio.gather()` retourne une liste des valeurs retournées par tous ces éléments dans l'ordre dans lequel ils sont déclarés dans la séquence.

**Note:** *Si l'un des ***awaitables*** de la séquence est une coroutine, elle est automatiquement planifiée en tant que ***tâche***.*

Par exemple:

In [9]:
import random 

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return what

async def main():
    return await asyncio.gather(*(say_after(random.random(), what) for what in ['Hello', 'World']))

tasks = asyncio.run(main())
print('results:', tasks)

World
Hello
results: ['Hello', 'World']


### 2.4. `async for`

Depuis Python 3.6, il est possible de déclarer un générateur asynchrone en utilisant l'instruction `yield` dans une coroutine native:

In [10]:
import asyncio

async def dwarfs():
    for name in ['Grumpy', 'Happy', 'Sleepy', 'Bashful', 'Sneezy', 'Dopey', 'Doc']:
        yield name
    
dwarfs()

<async_generator object dwarfs at 0x7f36480794d0>

Pour itérer sur le générateur asynchrone, il faut alors utiliser `async for` comme illustré ci dessous:

In [None]:
async for dwarf in dwarfs():
    print(dwarf)

La syntaxe `async for` s'étend aussi aux compréhension:

In [11]:
async def main():
    all_dwarfs = [name async for name in dwarfs()]
    dwarfs_with_e = [name async for name in dwarfs() if 'e' in name]
    return all_dwarfs, dwarfs_with_e

all_dwarfs, dwarfs_with_e = asyncio.run(main())
print(all_dwarfs)
print(dwarfs_with_e)

['Grumpy', 'Happy', 'Sleepy', 'Bashful', 'Sneezy', 'Dopey', 'Doc']
['Sleepy', 'Sneezy', 'Dopey']


Il est important de noter que ni les générateurs asynchrones ni les compréhensions ne rendent l'itération simultanée. Ils sont simplement conçus pour permettre à la coroutine qui les entoure d'autoriser d'autres tâches à prendre leur tour.

**Note:** *Pour pouvoir itérer sur une compréhension ou un générateur asynchrone, celui ci doit implémenter les méthodes `__aiter__` et `__anext__`. Cette dernière doit lever l'exception `StopAsyncIteration` pour arrêter l'itération.*

## 3. Applications I/0 bounds

Il s'agit ici de reprendre l'exemple du téléchargement de fichiers présenté dans le notebook sur `concurrent.futures` en utilisant:

* le module `asyncio` seul,
* le module `asyncio` accompagné des modules tiers `aiohttp` et `aiofiles`.

### 3.1. En utilisant les built-ins

In [12]:
import asyncio
import time, random
import urllib.request
import functools

async def download(url, loop):
    filename = url.split('/')[-1]
    await loop.run_in_executor(None, functools.partial(urllib.request.urlretrieve, url, filename))
    print(f'{filename} downloaded and saved')

async def fetch(urls):
    loop = asyncio.get_running_loop()
    await asyncio.gather(*(download(url, loop) for url in urls))
    
def main():    
    with open('links', 'r') as file:
        urls = file.readlines()                      # make a list of all links
        urls = [url.rstrip('\n') for url in urls]    # remove \n at end of links
    
    ti = time.perf_counter()
    asyncio.run(fetch(urls))
    print('Elapsed : ', time.perf_counter() -ti)
    
main()

C02-Shared_References.pdf downloaded and saved
C03-Input-Output.pdf downloaded and saved
C07-1-Functions.pdf downloaded and saved
C04-Compound_Instructions.pdf downloaded and saved
C06-Script.pdf downloaded and saved
C05-Scientific.pdf downloaded and saved
C08-2-UI.pdf downloaded and saved
C01-Objects.pdf downloaded and saved
C07-2-Classes.pdf downloaded and saved
C08-1-Std_Library.pdf downloaded and saved
Elapsed :  12.861319185998582


### 3.2. Les modules `aiohttp` et `aiofiles`

Deux modules tiers permettent de gérer de manière asynchrone les requêtes *http* et les *E/S fichiers*. Il s'agit des modules `aiohttp` et `aiofiles`. 

A titre d'exemple, le programme précédent est réimplémenté en utilisant ces modules tiers (à installer si vous souhaitez tester l'exemple suivant):

In [13]:
import asyncio
import aiohttp        
import aiofiles
import time

async def download(url):
    filename = url.split('/')[-1]
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            if resp.status == 200:
                f = await aiofiles.open(filename, mode='wb')
                print(f'{filename} downloaded')
                await f.write(await resp.read())
                await f.close()
                print(f'{filename} saved')

async def fetch(urls):
    await asyncio.gather(*(download(url) for url in urls))
    
def main():    
    with open('links', 'r') as file:
        urls = file.readlines()                      # make a list of all links
        urls = [url.rstrip('\n') for url in urls]    # remove \n at end of links
    
    ti = time.perf_counter()
    asyncio.run(fetch(urls))
    print('Elapsed : ', time.perf_counter() -ti)

main()

C01-Objects.pdf downloaded
C04-Compound_Instructions.pdf downloaded
C05-Scientific.pdf downloaded
C07-1-Functions.pdf downloaded
C02-Shared_References.pdf downloaded
C03-Input-Output.pdf downloaded
C06-Script.pdf downloaded
C08-2-UI.pdf downloaded
C08-1-Std_Library.pdf downloaded
C07-2-Classes.pdf downloaded
C04-Compound_Instructions.pdf saved
C07-1-Functions.pdf saved
C02-Shared_References.pdf saved
C06-Script.pdf saved
C05-Scientific.pdf saved
C03-Input-Output.pdf saved
C08-2-UI.pdf saved
C08-1-Std_Library.pdf saved
C07-2-Classes.pdf saved
C01-Objects.pdf saved
Elapsed :  12.613549990001047


Dans cet exemple, les requêtes *http* et l'écriture des fichiers sont réalisées de manière asynchrone. Vous remarquerez également l'utilisation du ***context manager asynchrone*** `async with` permettant de ne pas bloquer la coroutine. Il est tout à fait possible d'implémenter un *context manager asynchrone* en implémentant les méthodes spéciales `__aenter__` et `__aexit__` en utilisant bien l'instruction `async def`.

## Bibliographie


* [Python.org - asyncio task](https://docs.python.org/3.8/library/asyncio-task.html)
* [David Beazley - Coroutines](https://www.dabeaz.com/coroutines/Coroutines.pdf)
* [zestedesavoir - La puissance cachée des coroutines](https://zestedesavoir.com/articles/152/la-puissance-cachee-des-coroutines/)
* [marcarea.com - Coroutines via générateurs](https://marcarea.com/weblog/2016/04/06/coroutines-via-generateurs-ameliores-en-python)
* [Marcarea.com - Coroutines avec asyncio](https://marcarea.com/weblog/2016/04/11/coroutines-avec-asyncio-en-python)
* [Realpython.org - asyncio](https://realpython.com/async-io-python/)