---

# S06E04 : Concurrency - The `concurrent.futures` module

Cyril Desjouy

--- 

## 1. Introduction

Le module `concurrent.futures` disponible dans la bibliothèque standard fournit une interface uniformisée de haut niveau pour l'exécution concurrente. Il est construit autour des modules `threading` et `multiprocessing` et apporte une simplification de leur utilisation. Notons que cette simplification d'utilisation supprime  une grande partie de la flexibilité de `threading` et de `multiprocessing`. Selon les applications, il peut donc s'avérer préférable d'utiliser ces deux modules de base.

Notons que le terme `future` a une signification particulière en informatique. Ci-dessous la définition extraite de [wikipedia](https://en.wikipedia.org/wiki/Futures_and_promises):

>*In computer science, **future**, **promise**, **delay**, and **deferred** refer to constructs used for synchronizing program execution in some concurrent programming languages. They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is not yet complete.*

En d'autres termes, une ***future*** est un moyen de décrire le résultat d'un bloc de code avant qu'il ne soit connu.

## 2. `Executor` et `Future`

### 2.1. Les objets `Executor`

Extrait de [python.org](https://docs.python.org/3.8/library/concurrent.futures.html):

>*The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.*


Le module `concurrent.futures` fournit une classe abstraite (cf. notebook sur le module `abc`) appelée ` Executor`. Cette classe ne peut pas être utilisée directement. Il est nécessaire de passer par l'une de ces deux sous-classes :

* `ThreadPoolExecutor` qui fournit un ***pool*** dans lequel mettre des ***threads***,
* `ProcessPoolExecutor` qui fournit un ***pool*** dans lequel mettre des ***processes***.

Les **arguments optionnels** communs à ces deux sous-classes sont:
* `max_workers`: le nombre de ***threads***/***processes*** désiré, `None` par défaut,
* `initializer`: un ***callable*** qui sera appelé au démarrage de chaque ***worker***, `None` par défaut,
* `initarg`: tuple réunissant les arguments passés à `initializer`.

Les objets de type `Executor` héritent également des **3 méthodes** décrites ci-dessous.
* `submit(func, *args, **kwargs)`: planifie l'appel de `func(*args, **kwargs)` et retourne un objet `Future` représentant son exécution.
* `map(func, *iterables, timeout=None, chunksize=1)`: Similaire à la fonction *built-in* `map` mais les appels à `func` peuvent être fait de manière concurrente. Retourne un itérateur contenant les valeurs retournées par les appels à `func`.
* `shutdown(wait=True)`: Indique à l'`Executor` de libérer toutes les ressources qu'il utilise lorsque l'exécution des `Futures` en cours est terminée. Cette méthode n'a pas besoin d'être explicitement appelée si l'`Executor` est appelé avec le mot clé `with`.

### 2.2. Les objets `Futures`

Extrait de [python.org](https://docs.python.org/3.8/library/concurrent.futures.html#future-objects):

>*The Future class encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit().*


Les objets `Future` héritent de plusieurs méthodes. Parmi elles, citons en particulier:

* `cancel()`: Tente d'annuler l'appel. Retourne `True` si il est annulé, `False` si il est déjà en exécution.
* `result(timeout=None)`: Retourne la valeur retournée par l'appel. Si l'exécution n'est pas finie, attend `timeout` secondes et retourne la valeur retournée par l'appel ou TimeoutError si l'exécution n'est pas finie dans ce laps de temps.
* `exception(timeout=None)`: Idem que `result`, mais retourne l'exception au lieu de la valeur si l'appel en a levée une. Sinon retourne `None`.
* `cancelled()`, `running()`, `done()`: Retournent `True` si l'appel est annulé, en cours d'exécution, ou terminé respectivement.

### 2.3. Exemples

#### ---> Utilisation de `submit`

In [6]:
import concurrent.futures
import sys, time, random, threading

def avenger(name):
    current = threading.currentThread().getName()
    time.sleep(random.random())
    sys.stdout.write(f'{current}: {name} comes to save the world!\n')
    time.sleep(random.random())
    sys.stdout.write(f'{current}: {name} finished his work.\n')

with concurrent.futures.ThreadPoolExecutor() as executor:
    for name in ['Ironman', 'Hulk', 'Thor', 'Ant-man', 'Wasp']:
        executor.submit(avenger, name)

ThreadPoolExecutor-5_0: Ironman comes to save the world!
ThreadPoolExecutor-5_1: Hulk comes to save the world!
ThreadPoolExecutor-5_3: Ant-man comes to save the world!
ThreadPoolExecutor-5_1: Hulk finished his work.
ThreadPoolExecutor-5_3: Ant-man finished his work.
ThreadPoolExecutor-5_0: Ironman finished his work.
ThreadPoolExecutor-5_4: Wasp comes to save the world!
ThreadPoolExecutor-5_2: Thor comes to save the world!
ThreadPoolExecutor-5_2: Thor finished his work.
ThreadPoolExecutor-5_4: Wasp finished his work.


#### ---> Utilisation de `map`

In [9]:
with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(avenger, ['Ironman', 'Hulk', 'Thor', 'Ant-man', 'Wasp'])

ThreadPoolExecutor-7_1: Hulk comes to save the world!
ThreadPoolExecutor-7_3: Ant-man comes to save the world!
ThreadPoolExecutor-7_0: Ironman comes to save the world!
ThreadPoolExecutor-7_2: Thor comes to save the world!
ThreadPoolExecutor-7_4: Wasp comes to save the world!
ThreadPoolExecutor-7_2: Thor finished his work.
ThreadPoolExecutor-7_1: Hulk finished his work.
ThreadPoolExecutor-7_3: Ant-man finished his work.
ThreadPoolExecutor-7_4: Wasp finished his work.
ThreadPoolExecutor-7_0: Ironman finished his work.


## 3. Threading vs. multiprocessing

L'objectif de cette partie est de montrer que :

* le `threading` est plutôt adapté aux tâches liées aux entrées/sorties,
* le `multiprocessing` est plutôt adapté aux tâches liées au processeur.

### 3.1. I/O-bound tasks

In [5]:
import concurrent.futures
import urllib.request

def download(url):
    """ I/O Bound task. """
    filename = url.split('/')[-1]
    urllib.request.urlretrieve(url, filename)

def synchronous(urls):
    _ = [download(url) for url in urls]
        
def thread_pool(urls):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(download, urls)
    
def process_pool(urls):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(download, urls)

with open('links', mode='r') as file:
    urls = file.readlines()                  # make a list of all links


urls = [url.rstrip('\n') for url in urls]    # remove \n at end of links

In [6]:
%%time
synchronous(urls)

CPU times: user 594 ms, sys: 431 ms, total: 1.03 s
Wall time: 30.1 s


In [7]:
%%time
thread_pool(urls)

CPU times: user 769 ms, sys: 579 ms, total: 1.35 s
Wall time: 28.9 s


In [8]:
%%time
process_pool(urls)

CPU times: user 16.8 ms, sys: 15.7 ms, total: 32.5 ms
Wall time: 25.8 s


### 3.2. Cpu-bound tasks

In [14]:
import concurrent.futures

def cpu_bound(number):
    """ CPU bound task"""
    return sum(i**2 for i in range(number))

def synchronous(numbers):
    tmp = []
    for number in numbers:
        tmp.append(cpu_bound(number))
    return tmp 

def thread_pool(numbers):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        tmp = executor.map(cpu_bound, numbers)
    return list(tmp)

def process_pool(numbers):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        tmp = executor.map(cpu_bound, numbers)
    return list(tmp)

numbers = [x + 1_000_000 for x in range(20)]

In [15]:
%timeit synchronous(numbers)

6.03 s ± 61.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
%timeit thread_pool(numbers)

6.23 s ± 98.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [17]:
%timeit process_pool(numbers)

3.08 s ± 54.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
