---

# S06E03 : Concurrency - Synchronization primitives

Cyril Desjouy

--- 

## 1. Introduction

Il existe plusieurs moyens de synchroniser des ***threads*** ou des ***processes***. On parle souvent de primitives de synchronisation. Plusieurs primitives sont fournies par les modules `threading` et `multiprocessing`. 

Notez que dans la suite de ce notebook, on parlera toujours de ***threads*** et tous les exemples concerneront les ***threads***, mais tout le contenu de ce notebook est **aussi valable pour les *processes*** ! Les différentes primitives présentées ci-après sont fournies à la fois par le module `threading` et le module `multiprocessing`:

* **`Lock` et `Rlock`:** Permet de bloquer l'accès à des données tant qu'un ***thread*** ou ***process*** travaille dessus.
* **`Barrier`:** Permet de mettre en place une barrière bloquant l'exécution tant qu'un certain nombre de ***threads*** ou ***processes*** ne l'a pas atteint.
* **`Semaphore` et `BoundedSemaphore`:** Permet de limiter le nombre de ***threads*** ou ***processes*** travaillant sur un même bloc de code.
* **`Event`:** Permet à plusieurs ***threads*** ou ***processes*** de communiquer.
* **`Condition`:** Permet de notifier les autres ***threads*** ou ***processes* d'un évènement. 


## 2. Les primitives `Lock`et `Rlock`

Extrait de [python.org](https://docs.python.org/3.8/library/threading.html#lock-objects):


>***About `Lock`:*** *Once a thread has acquired a lock, subsequent attempts to acquire it block, until it is released; any thread may release it.*

>***About `Rlock`:*** *A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it.*

En d'autre termes les verrous (***locks***) sont des primitives de synchronisation permettant d'interdire l'accès simultané à des ressources. C'est une primitive essentielle car elle permet d'éviter que plusieurs `threads` écrivent simultanément sur les même données. Les verrous ont deux états: **verrouillé** et **déverrouillé**. 

* Quand le verrou est **déverrouillé**: 
    
    * la méthode `acquire()` le verrouille et retourne (`None`) aussitôt.
    * la méthode `release()` retourne l'exception `RuntimeError`

* Quand le verrou est **verrouillé**: 

    * la méthode `aquire()` bloque l'exécution jusqu'à ce qu'un autre ***thread*** appelle `release()` pour le déverrouiller. `acquire()` le vérouille alors et retourne (`None`) aussitôt.
    * la méthode `release()` déverrouille le verrou et retourne (`None`) aussitôt.



Les verrous s'utilisent de la manière suivante:
```python
lock = threading.Lock()

lock.acquire()
... part of the program that cannot be executed by more than 1 threads simultaneously...
lock.release()
```
Comme toutes primitives héritant des méthodes `acquire()/release()`, les verrous supportent également le protocole utilisé par les ***context managers***:
```python
lock = threading.Lock()

with lock:
    ... part of the program that cannot be executed by more than 1 threads simultaneously...
```


<div class="alert alert-block alert-info">
Testez l'exemple suivant avec et sans le verrou. Observez attentivement les résultats.
</div>

In [1]:
import threading
import sys, time, random

def avenger(name, vilains, lock):
    
    current = threading.currentThread().getName()
    
    while True:
        
        time.sleep(random.random())

        with lock:
            if not vilains:
                sys.stdout.write(f'{current}: Thanos is dead. {name} has no more vilain to kill!\n')
                break
        

            idx = random.randint(0, len(vilains)-1)
            time.sleep(random.random())
            if vilains[idx] == 'Thanos':
                sys.stdout.write(f'{current}: {name} killed Thanos!\n')
                vilains.clear()
                break
            else:
                sys.stdout.write(f'{current}: {name} killed {vilains[idx]} ({len(vilains)-1} vilains left!).\n')
                vilains.pop(idx)

def main():
    
    threads = []
    lock = threading.Lock()
    vilains = ['Thanos'] + [f'Skrull {i}' for i in range(1, 11)]
    
    for name in ['Ironman', 'Captain']:
        threads.append(threading.Thread(target=avenger, args=(name, vilains, lock,)))
        threads[-1].start()
    
    for thread in threads:
        thread.join()
    
main()

Thread-4: Ironman killed Skrull 10 (10 vilains left!).
Thread-5: Captain killed Skrull 4 (9 vilains left!).
Thread-4: Ironman killed Thanos!
Thread-5: Thanos is dead. Captain has no more vilain to kill!


## 3. La primitive `Barrier`

Extrait de [python.org](https://docs.python.org/3.8/library/threading.html#barrier-objects):

>*This class provides a simple synchronization primitive for use by a fixed number of threads that need to wait for each other. Each of the threads tries to pass the barrier by calling the wait() method and will block until all of the threads have made their wait() calls. At this point, the threads are released simultaneously.*

La classe `Barrier` permet de créer une primitive de synchronisation de type *barrière* comme son nom l'indique. Sa syntaxe est la suivante:
```
Barrier(n, action=None, timeout=None)
```
où `n` est le nombre de ***thread*** à attendre avant d'ouvrir la barrière. Les objets de type `Barrier` héritent en particulier de la méthode `wait()` qui permet aux ***threads*** de signaler leur arrivée à ce point exécution du programme. 

Une barrière permet de bloquer l'exécution de ***threads*** à un point d'exécution précis du programme jusqu'à ce que `n` ***threads*** aient appelé la méthode `wait(timeout=None)` (ou que le timeout soit atteint si celui si est précisé). Les `n` ***threads*** sont alors relâchés de manière simultanée et la barrière se referme derrière leur passage. Elle attend alors à nouveau que `n` ***threads*** aient appelé la méthode `wait` pour se réouvrir. Les barrières sont généralement utilisées pour synchroniser l'accès de plusieurs ***thread*** à une partie du code.

<div class="alert alert-block alert-info">
Testez l'exemple suivant puis modifier la barrière avec <code>n</code> valant 3 par exemple. Observez attentivement les résultats.
</div>

In [7]:
import threading
import sys, time, random

def avenger(name, barrier):
    current = threading.currentThread().getName()  
    sys.stdout.write(f'{current}: {name} is waiting for the barrier to open!\n')   
    barrier.wait()                               # barrier wait for 5 calls to wait() method
    sys.stdout.write(f'{current}: {name} crosses the barrier!\n')

def main():
    threads = []
    barrier = threading.Barrier(5)               # barrier initialised for 5 threads
    
    for name in ['Ironman', 'Hulk', 'Thor', 'Ant-man', 'Wasp']:
        threads.append(threading.Thread(target=avenger, args=(name, barrier,)))
        threads[-1].start()
        
    for thread in threads:              
        thread.join()           
    
main()

Thread-59: Ironman is waiting for the barrier to open!
Thread-60: Hulk is waiting for the barrier to open!
Thread-61: Thor is waiting for the barrier to open!
Thread-62: Ant-man is waiting for the barrier to open!
Thread-63: Wasp is waiting for the barrier to open!
Thread-63: Wasp crosses the barrier!
Thread-60: Hulk crosses the barrier!
Thread-59: Ironman crosses the barrier!
Thread-62: Ant-man crosses the barrier!
Thread-61: Thor crosses the barrier!


### 4. Les primitives `Semaphore` et `BoundedSemaphore`

Extrait de [python.org](https://docs.python.org/3.8/library/threading.html#semaphore-objects):

>*A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release().*

En d'autres termes, un sémaphore est une primitive de synchronisation (l'une des plus anciennes) permettant de limiter l'accès simultané à une partie du code. Les modules `threading` et `multiprocessing` fournissent les deux primitives de type sémaphore listées ci-dessous.

* Le `Semaphore(value=1)`: l'argument optionnel `value` précise la valeur initiale du compteur interne. La valeur de ce compteur peut être supérieure à cette valeur initiale.
* Le `BoundedSemaphore(value=1)`: l'argument optionnel `value` précise la valeur initiale du compteur interne. La valeur de ce compteur ne peut pas être supérieure à cette valeur initiale.

Ces deux primitives héritent des méthodes `acquire()` et `release()` qui permettent respectivement d'incrémenter et de décrémenter leur compteur interne. Lorsque ce compteur interne est égale à 0, la méthode `acquire()` devient bloquante jusqu'à ce que le compteur interne redevienne supérieur à 0, c'est à dire lorsqu'un autre ***thread*** appelle la méthode `release()`. Un `Semaphore` peut être libéré (méthode `release()`) plus de fois qu'il n'a été acquis (méthode `acquire()`). Ce n'est pas le cas des `BoundedSemaphore`. Les sémaphore s'utilisent de la manière suivante:
```python
semaphore = threading.Semaphore(N)

semaphore.acquire()
... part of the program that cannot be executed by more than N threads simultaneously...
semaphore.release()
```
Comme toutes primitives héritant des méthodes `acquire()/release()`, les sémaphores supportent également le protocole utilisé par les ***context managers***:
```python
semaphore = threading.Semaphore(N)

with semaphore:
    ... part of the program that cannot be executed by more than N threads simultaneously...
```

**Note 1:** *`Semaphore(1)` est équivalent à `Lock()` puisqu'il n'autorise qu'un seul thread à accéder à une ressource en même temps.*

**Note 2:** *La méthode `acquire()` peut prendre les arguments optionnels `blocking=True` et `timeout=None` permettant de préciser si `acquire()` est bloquant et si c'est le cas, qu'il sera bloquant au plus pour `timeout` secondes.*

<div class="alert alert-block alert-info">
Testez l'exemple suivant puis modifier le sémaphore avec <code>value</code> valant 1 par exemple. Observez attentivement les résultats.
</div>

In [4]:
import threading
import sys, time, random

def avenger(name, semaphore, barrier):
    
    current = threading.currentThread().getName()  
    sys.stdout.write(f'{current}: {name} is waiting at the barrier to enter semaphore!\n')
    barrier.wait()                     # Wait for all 5 threads to open the barrier
    
    with semaphore:                        # The following block of code is executed by 2 thread at a time
        sys.stdout.write(f'{current}: {name} aquire semaphore!\n')
        time.sleep(random.random())
        sys.stdout.write(f'{current}: {name} release semaphore!\n')

def main():

    threads = []
    semaphore = threading.Semaphore(2)    
    barrier = threading.Barrier(5)    
    
    for name in ['Ironman', 'Hulk', 'Thor', 'Ant-man', 'Wasp']:
        threads.append(threading.Thread(target=avenger, args=(name, semaphore, barrier,)))
        threads[-1].start()
        
    for thread in threads:              
        thread.join()           
    
main()

Thread-19: Ironman is waiting at the barrier to enter semaphore!
Thread-20: Hulk is waiting at the barrier to enter semaphore!
Thread-21: Thor is waiting at the barrier to enter semaphore!
Thread-22: Ant-man is waiting at the barrier to enter semaphore!
Thread-23: Wasp is waiting at the barrier to enter semaphore!
Thread-23: Wasp aquire semaphore!
Thread-19: Ironman aquire semaphore!
Thread-19: Ironman release semaphore!
Thread-21: Thor aquire semaphore!
Thread-23: Wasp release semaphore!
Thread-21: Thor release semaphore!
Thread-20: Hulk aquire semaphore!
Thread-22: Ant-man aquire semaphore!
Thread-20: Hulk release semaphore!
Thread-22: Ant-man release semaphore!


## 5. La primitive `Event`

Extrait de [python.org](https://docs.python.org/3.8/library/threading.html#event-objects):

>*An event object manages an internal flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true.*

En d'autres termes, un `Event` est un mécanisme de communication basique entre plusieurs ***threads***. Chaque objet de type `Event` hérite de la méthode `wait(timeout=None)` bloquant les threads l'ayant appelé jusqu'à ce que l'événement soit activé (ou que le *timeout* soit atteint). Un événement est activé en utilisant la méthode `set()` et désactivé en utilisant la méthode `clear()`. La méthode `is_set()` retourne le status d'un événement.

**Note:** *L'instanciation de `Event` n'accepte aucun argument.*

<div class="alert alert-block alert-info">
Testez l'exemple suivant. Observez attentivement les résultats.
</div>

In [2]:
import threading
import sys, time, random

def avenger(name, event):
    current = threading.currentThread().getName()     
    event.wait()
    sys.stdout.write(f'{current}: {name} fights!\n')

def main():

    threads = []
    event = threading.Event()
    
    for name in ['Ironman', 'Hulk', 'Thor', 'Ant-man', 'Wasp']:
        threads.append(threading.Thread(target=avenger, args=(name, event )))
        threads[-1].start()
    
    sys.stdout.write('3, 2, 1, ... fight\n')
    event.set()
    
    for thread in threads:
        thread.join()
    
main()

3, 2, 1, ... fight
Thread-10: Hulk fights!
Thread-11: Thor fights!
Thread-9: Ironman fights!
Thread-12: Ant-man fights!
Thread-13: Wasp fights!


## 6. La primitive `Condition`


Extrait de [python.org](https://docs.python.org/3.8/library/threading.html):


>*This class implements condition variable objects. A condition variable allows one or more threads to wait until they are notified by another thread.*

En d'autres termes, une `Condition` est une version améliorée d'un `Event`. Une `Condition` fournit un mécanisme de communication entre plusieurs ***threads***. Elle peut notifier des changements aux autres ***threads*** grâce à la méthode `notify(n=1)` où `n` est le nombre de ***threads*** à notifier (1 par défaut). Les autres ***threads*** attendent cette notification (ou qu'un *timeout* soit atteint) grâce à la méthode `wait(timeout=None)`.

Tout comme dans le cas des objets `Lock` ou `Sempahore`, le ***thread*** notifiant les changements doit utiliser les méthodes `acquire()`/`release()` pour encadrer `notify()`. Les ***threads*** attendant la notification doivent également utiliser ces méthodes pour encadrer la méthode `wait()`. La stratégie mis en place lors de l'utilisation d'une `Condition` repose sur une relation *producteur/consommateur*, le *producteur* étant le ***thread*** notifiant et le *consommateur* le ***thread*** attendant la notification. Une `Condition` s'utilisent de la manière suivante:
```python
condition = threading.Condition()

# In the producer thread:
condition.acquire()
... produces the notification...
condition.notify()
condition.release()

# In the consumer thread
condition.acquire()
condition.wait()
... something to do with this notification...
condition.release()
```
Comme toutes primitives héritant des méthodes `acquire()/release()`, une `Condition` supporte également le protocole utilisé par les ***context managers***:
```python
condition = threading.Condition()

# In the producer thread
with condition:
    ... produces the notification...
     condition.notify()
        
# In the consumer thread
with condition:
    condition.wait()
    ... something to do with this notification...
    
```

**Note:** *La méthode `acquire()` peut prendre les arguments optionnels `blocking=True` et `timeout=None` permettant de préciser si `acquire()` est bloquant et si c'est le cas, qu'il sera bloquant au plus pour `timeout` secondes.*

<div class="alert alert-block alert-info">
Testez l'exemple suivant. Observez attentivement les résultats.
</div>

In [8]:
import threading
import sys, time, random

def avenger(name, gems, condition):
    
    current = threading.currentThread().getName()     
    sys.stdout.write(f'{current}: {name} searching for gems...\n')

    while True:
        with condition:
            condition.wait(5)                 # Blocks until an item is available (5sec max)
            sys.stdout.write(f'{current}: {name} found {gems[-1]}!\n')
        if len(gems) == 6:
            break
            
def search_gems(ngems, gems, condition):
    for i in range(1, ngems+1):
        with condition:
            gems.append(f'Infinity gem {i}')
            condition.notify()                # Notifies about the availability.
        time.sleep(random.randrange(1, 3))

def main():

    threads, gems = [], []
    condition = threading.Condition()
    
    for name in ['Ironman', 'Hulk', 'Thor', 'Ant-man', 'Wasp']:
        threads.append(threading.Thread(target=avenger, args=(name, gems, condition, )))
        threads[-1].start()
    
    threads.append(threading.Thread(target=search_gems, args=(6, gems, condition, )))
    threads[-1].start()
    
    for thread in threads:
        thread.join()
    
main()

Thread-42: Ironman searching for gems...
Thread-43: Hulk searching for gems...
Thread-44: Thor searching for gems...
Thread-45: Ant-man searching for gems...
Thread-46: Wasp searching for gems...
Thread-42: Ironman found Infinity gem 1!
Thread-43: Hulk found Infinity gem 2!
Thread-44: Thor found Infinity gem 3!
Thread-45: Ant-man found Infinity gem 4!
Thread-46: Wasp found Infinity gem 5!
Thread-42: Ironman found Infinity gem 6!
Thread-43: Hulk found Infinity gem 6!
Thread-44: Thor found Infinity gem 6!
Thread-45: Ant-man found Infinity gem 6!
Thread-46: Wasp found Infinity gem 6!
