---

# S06E01 : Concurrency - the `multiprocessing` module

Cyril Desjouy

--- 

## 1. Introduction

Le module `multiprocessing` fournit des outils permettant l'instanciation de *processus* avec une API similaire à celle de `threading` que nous verrons dans le notebook suivant. Le module `multiprocessing` permet de contourner le ***GIL*** en utilisant des *processus* plutôt que des *threads*. 

Ce module fournit en particulier:

* La classe `Process` (analogue à la classe `Thread` de `threading`) permettant d'instancier des processus.
* Les objets `Queue` et `Pipe` permettant d'assurer la communication entre les processus.
* Toutes les primitives de synchronisation de processus (analogues à celle founies par `threading`) que nous étudierons dans un prochain notebook.
* Des outils pour partager des données entre processus (`Value`, `Array`, `Manager`, ...).
* La classe `Pool` représentant un pool de processus.


**Note:** *Il existe de grandes similitudes entre ce notebook et le suivant portant sur le module `threading`. Les classes `multiprocessing.Process` et `threading.Thread` sont effectivement très similaires comme vous pourrez le voir.*

## 2. La classe `Process`

Le coeur du module `multiprocessing` est la classe `Process`. Cette classe permet  de créer des *processus* de deux manières différentes:

* en passant un objet ***callable*** en tant qu'argument d'entrée au constructeur,
* en remplaçant la méhode `run()` dans une sous-classe de `Thread` (et uniquement cette méthode).

### 2.1. Avec un ***callable***

Pour démarrer un ***thread***, il suffit de créer une instance de `Thread` initialisée avec un objet ***callable*** puis d'utiliser la méthode `start()` héritée par l'instance. La syntaxe est la suivante:
```python
Thread(target=None, name=None, args=(), kwargs={}, *, daemon=None)
```
où:
* `target`: ***callable***,
* `args` et `kwargs`: arguments à passer au ***callable***,
* `name`: le nom du ***process***,
* `daemon`: le ***process*** doit il être lancé en tant que ***daemon***.

***Note:*** *Un programme ne s'arrête que lorsque tous les ***process*** lancés par ce programme sont terminés. Un **daemon** est un **process** qui sera tué automatiquement, peu importe son état, lorsque le programme s'arrêtera.*

Dans l'exemple suivant, l'objet ***callable*** est ici la fonction `avenger`. 

<div class="alert alert-block alert-info">
Exécuter cet exemple plusieurs fois et observez attentivement la sortie standard. Essayez à nouveau avec la ligne 6 commentée (le premier <code>sleep</code>).
</div>



In [8]:
import multiprocessing
import sys, time, random

def avenger(name):
    current = multiprocessing.current_process().name
    time.sleep(random.random())
    sys.stdout.write(f'{current}: {name} comes to save the world!\n')
    time.sleep(random.random())
    sys.stdout.write(f'{current}: {name} finished his work.\n')

if __name__ == '__main__':
    
    current = multiprocessing.current_process().name
    process1 = multiprocessing.Process(target=avenger, args=('Ironman',))
    process2 = multiprocessing.Process(target=avenger, args=('Captain',))

    sys.stdout.write(f'{current}: Calling Ironman.\n')
    process1.start()                     # Start thead1
    sys.stdout.write(f'{current}: Calling Captain.\n')
    process2.start()                     # Start thead2
    
    sys.stdout.write(f'{current}: Waiting Ironman & Captain for saving the world!\n')
    process1.join()                      # wait for thread1 to finish
    process2.join()                      # wait for thread2 to finish
    sys.stdout.write(f'{current}: Oh yeah! They saved us!\n')

MainProcess: Calling Ironman.
MainProcess: Calling Captain.
Process-26: Captain comes to save the world!
MainProcess: Waiting Ironman & Captain for saving the world!
Process-25: Ironman comes to save the world!
Process-25: Ironman finished his work.
Process-26: Captain finished his work.
MainProcess: Oh yeah! They saved us!


Les instances de `Process` sont créées lignes 14-15 avec l'objet ***callable*** `avenger` prenant pour arguments `args`. Les deux instances sont ensuite démarrées à l'aide de la méthode `start()`. La méthode `join()` signifie à l'interpréteur qu'il doit attendre que `process1` et `process2` soient terminés avant de passer aux instructions suivantes du programme principal. Cette fonction est bloquante. Elle ne retourne (`None`) que lorsque le thread a effectivement fini son exécution. 

<div class="alert alert-block alert-info">
Sur l'exemple précédent, essayez de commenter les lignes 23-24 (les deux <code>join()</code>) et observez la différence.
</div>

Lorsqu'il est nécesaire de lancer de nombreux *processus*, il peut être plus pratique d'automatiser ceci en utilisant des boucles comme le montre l'exemple suivant:

In [14]:
import multiprocessing
import sys, time, random

def avenger(name):
    current = multiprocessing.current_process().name
    time.sleep(random.random())
    sys.stdout.write(f'{current}: {name} comes to save the world!\n')
    time.sleep(random.random())
    sys.stdout.write(f'{current}: {name} finished his work.\n')

if __name__ == '__main__':
    
    processes = []
    
    for name in ['Ironman', 'Hulk', 'Thor', 'Ant-man', 'Wasp']:
        processes.append(multiprocessing.Process(target=avenger, args=(name,)))
        processes[-1].start()
        
    for process in processes:
        process.join()

Process-12: Hulk comes to save the world!
Process-15: Wasp comes to save the world!
Process-12: Hulk finished his work.
Process-11: Ironman comes to save the world!
Process-13: Thor comes to save the world!
Process-14: Ant-man comes to save the world!
Process-15: Wasp finished his work.
Process-13: Thor finished his work.
Process-11: Ironman finished his work.
Process-14: Ant-man finished his work.


### 2.2. En subclassant `Process`

Il est également possible de créer un *process* en subclassant `Process` et en remplaçant la méthode `run()`:

In [15]:
import multiprocessing
import sys, time, random

class Avenger(multiprocessing.Process):
    
    def __init__(self, name):
        super().__init__()           # super is mandatory!
        self.name = name
        
    def run(self):
        current = multiprocessing.current_process().name
        time.sleep(random.random())
        sys.stdout.write(f'{current}: {self.name} comes to save the world!\n')
        time.sleep(random.random())
        sys.stdout.write(f'{current}: {self.name} finished his work.\n')

if __name__ == '__main__':
    
    ironman = Avenger('Ironman')
    captain = Avenger('Captain')
    
    ironman.start()
    captain.start()
    
    ironman.join()
    captain.join()

Captain: Captain comes to save the world!
Ironman: Ironman comes to save the world!
Captain: Captain finished his work.
Ironman: Ironman finished his work.


La création et le lancement des *processus* suit la même logique que dans les exemples précédents. La seule subtilité ici réside dans le remplacement de la méthode `__init__` de la superclasse `Process`. Afin de préserver l'initialisation de notre *processus*, il faudra impérativement appeler la méthode `__init__` de la superclasse à l'aide de `super`.

## 3. Communication entre les processus

Afin d'utiliser efficacement les processus, il est généralement indispensable d'établir des canaux de communication entre eux pour que le travail puisse être divisé et les résultats aggrégés. Deux types de communication sont supportées par les processus sous Python:

* Les `Queue` (*file*): peut avoir plusieurs producteurs et consommateurs.
* Les `Pipe` (*tube*): ne peut avoir que 2 *endpoints* (extrémités).

### 3.1. L'objet `Queue`

La `Queue` est un moyen simple d'assurer la communication entre les processus. Elle permet de passer les données dans les deux sens, mais il faut cependant que ces données soient sérialisables. Les objets de type `Queue` héritent entre autres des méthodes `put()` et `get()` pour mettre un objet dans la file et le récupérer respectivement.

**Note 1:** *La classe `multiprocessing.Queue` est très similaire à la classe `queue.Queue`.*

**Note 2:** *Le module `multithreading` founit également les objets `SimpleQueue` et `JoinableQueue` qui sont des variantes de `Queue`.*

In [3]:
import multiprocessing
import sys, time, random

def avenger(name, queue):
    current = multiprocessing.current_process().name
    msg = f'{current}: {name} says something secret!\n'
    queue.put(msg)

def listen_to_avengers(queue):
    while not queue.empty():
        sys.stdout.write(queue.get())   
    print("Avengers have nothing more to say!") 
    
if __name__ == '__main__':
    
    queue = multiprocessing.Queue()
    processes = []
    
    for name in ['Ironman', 'Hulk', 'Thor', 'Ant-man', 'Wasp']:
        processes.append(multiprocessing.Process(target=avenger, args=(name, queue,)))
        processes[-1].start()
    
    processes.append(multiprocessing.Process(target=listen_to_avengers, args=(queue,)))
    processes[-1].start()
    
    for process in processes:
        process.join()

Process-13: Ironman says something secret!
Process-14: Hulk says something secret!
Process-15: Thor says something secret!
Process-16: Ant-man says something secret!
Process-17: Wasp says something secret!
Avengers have nothing more to say!


### 3.2. L'objet `Pipe`

Le `Pipe` est préféré à la `Queue` lorsqu'il s'agit d'une communication entre deux processus. Il retourne une paire d'objets connectés qui représentent les deux extrémités du tube. Chacun de ces deux objets hérite entre autres des méthodes `send()` et `recv`() pour envoyer et recevoir des informations.

**Note 1:** *Les données dans un `Pipe` peuvent être corrompues si deux processus tentent de lire ou d'écrire à la même extrémité du `Pipe` en même temps.* 

**Note 2:** *Les `Pipe` sont en moyenne 3 fois plus rapides que les `Queue`.* 

In [8]:
import multiprocessing
import sys, time, random

def avenger(conn):
    current = multiprocessing.current_process().name
    life = 10
    while life > 0:
        conn.send(random.randint(1, 2))
        msg = conn.recv()
        if isinstance(msg, int):
            life -= msg
            sys.stdout.write(f'{current} has now {life} life.\n')
        else:
            sys.stdout.write(f'{msg}\n')
            break
    conn.send(f'{current} is dead!')
    
def thanos(conn):
    current = multiprocessing.current_process().name
    life = 20
    while life > 0:
        msg = conn.recv()
        if isinstance(msg, int):
            life -= msg
            sys.stdout.write(f'{current} has now {life} life.\n')
        else:
            sys.stdout.write(f'{msg}\n')
            break
        conn.send(random.randint(1, 5))
    conn.send(f'{current} is dead!')

    
if __name__ == '__main__':
    
    conn1, conn2 = multiprocessing.Pipe()

    process1 = multiprocessing.Process(target=avenger, name='Ironman', args=(conn1,))
    process2 = multiprocessing.Process(target=thanos, name='Thanos', args=(conn2,))
    
    process1.start()
    process2.start()
    
    process1.join()
    process2.join()

Thanos has now 18 life.
Ironman has now 9 life.
Thanos has now 16 life.
Ironman has now 4 life.
Thanos has now 14 life.
Ironman has now 2 life.
Thanos has now 13 life.
Ironman has now -1 life.
Ironman is dead!


## 4. Partager un état entre processus

Afin de ne pas nuire à la performance, il est préférable d'éviter d'utiliser des états partagés en programmation concurrente. Si cela est nécessaire, le module `multiprocessing` propose deux méthode pour le faire:

* **La mémoire partagée:** stockage de données dans une mémoire partagé en utilisant `Value`, `Array` ou un objet personnalisé crée avec `multiprocessing.sharedctypes`.
* **Le processus serveur:** un gestionnaire contrôle un processus serveur qui détient les objets Python et autorise les autres processus à les manipuler à l'aide de mandataires.


### 4.1. Mémoire partagé : `Value`, `Array`, ...

Les objets `Value` et `Array` permettent de créer des objets partageable entre processus.

* `Array`: a ctypes array allocated from shared memory.
* `Value`: a ctypes object allocated from shared memory.

Par exemple:

In [1]:
from multiprocessing import Process, Value, Array
import numpy as np

def f(num, arr, lst):
    num.value = 3.1415927
    for i in range(len(arr)):
        arr[i] = -arr[i]
    lst[0] = 10

if __name__ == '__main__':
    num = Value('d', 0.0)
    lst = Array('i', range(10))
    arr = Array('d', np.arange(5))

    p = Process(target=f, args=(num, arr, lst))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])
    print(lst[:])

3.1415927
[-0.0, -1.0, -2.0, -3.0, -4.0]
[10, 1, 2, 3, 4, 5, 6, 7, 8, 9]


### 4.2. Processus serveur : `Manager`

Un processus serveur peut héberger des objets et permet à d'autres processus de les manipuler en utilisant des proxies. Les processus serveurs sont bien plus flexible que les objets à mémoire partagée car ils peuvent contenir des objets de nombreux types d'objets (`dict`, `list`, `Queue`, ...). Ils sont cependant plus lent que les objets à mémoire partagée. 

Pour créer un processus serveur, il suffit d'utiliser `Manager()` et d'y ajouter les objets partagées en utilisant les méthodes adéquates. Par exemple:

In [2]:
from multiprocessing import Process, Manager
import numpy as np

def f(d, l, a):
    a[0] = 1
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

with Manager() as manager:
    a = manager.Array('d', np.zeros(5))        # 'i'(nteger) or 'd'(ecimal) with 1d array only
    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f, args=(d, l, a))
    p.start()
    p.join()

    print(d)
    print(l)
    print(a)

{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
array('d', [1.0, 0.0, 0.0, 0.0, 0.0])


### 4.3. `multiprocessing.shared_memory` (Python > 3.8)

***Soon***

## 5. Les bassins d'exécution avec la classe `Pool`

Le module `multiprocessing` fournit également la classe `Pool` qui permet de créer un bassin de processus. Les différentes tâches à exécuter sont automatiquement distribuées parmi les processus du bassin en utilisant (entre autres) les méthodes `map()` ou `starmap()` équivalente à celle fournies par le module `itertools`. 

**Note:** *Nous verrons dans un prochain notebook que cette classe est très proche de la classe `concurrent.futures.Executor`.*

Les `Pool` sont généralement utilisés avec un *context manager*. Par exemple:

In [3]:
import multiprocessing

def multiply(x, y=2):
    return x*y

with multiprocessing.Pool() as pool:
    res1 = pool.map(multiply, [i for i in range(10)])

with multiprocessing.Pool() as pool:
    res2 = pool.starmap(multiply, [(1, 2), (3, 4), (4, 5)])
    
print('map    :', res1)
print('starmap:', res2)

map    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
starmap: [2, 12, 20]


## 6. Les fonctions utilitaires

Le module `multiprocessing` défini un certain nombre de fonctions utilitaires pour la gestions des *processus*. Les plus utiles sont listées ci-dessous.

* `cpu_count()`: Retourne le nombre de CPU sur le système
* `current_process()`: Retourne l'objet `Process` courrant.
* ...

## References

* [kth.se - multiprocessing part 1](https://www.kth.se/blogs/pdc/2019/02/parallel-programming-in-python-multiprocessing-part-1/)
* [kth.se - multiprocessing part 2](https://www.kth.se/blogs/pdc/2019/03/parallel-programming-in-python-multiprocessing-part-2/)

## Application

<div class="alert alert-block alert-info">
Soon.
</div>