Benchmark - Concurrence en Clojure/Scala/Java

Comparatif Java/Clojure/Scala

Avant propos

Lorsque l'on souhaite avoir plusieurs processus faisant des modifications sur des données (concurrence), il nous faut donner un attention toute particulière au code qui permet ces modifications.

Les données modifiables par plusieurs thread sont appelés Ressources critiques

Le bout de code dans lequel les modifications des ressources critiques sont faites s'appelle une Section critique.

Pour assurer un maintient cohérent des ressources critiques, il va nous falloir trouver des mécanisme qui permettent d'y accéder de manière Synchronisée

Plusieurs modèles et formalismes existent pour assurer cette synchronicité, nous allons en survoler 4:

  • L'exclusion mutuelle
  • Le modèle CSP
  • Le modèle STM
  • Le modèle acteur

Dans cet article nous allons voir comment utiliser ces modèles en Java Scala et Clojure.

Concepts

Model 1: Mutex (Exclusion mutuelle)

Explication du concept

Le terme de mutex ou exclusion mutuelle permet de désigner un mécanisme de base en concurrence: faire en sorte qu'un seul thread puisse lire ou écrire une ressource critique.

Il existe deux design patterns complémentaires pour écrire des mutex:

Monitor

Le principe du monitor est certainement la manière la plus simple d'implanter une classe thread-safe.

L'idée derrière le monitor est que chaque Objet possède son propre jeton. Lorsqu'un thread attrape le jeton, aucun autre thread ne peux accéder à cet objet.

Cet objet sert donc de verrou pour la synchronisation de l'accès à la ressource critique. On l'appel lock par convention.

Sémaphores

Le sémaphore est un mutex avec une petite subtilité. Un sémaphore peut permettre non pas a un mais plusieurs thread de faire des exécution simultanées.

Imaginez un boite avec un cadenas où il peut y avoir n clés, permettant a n threads de modifier l'état de la boite simultanément.

Model 2: Communicating Sequential Processes (CSP)

Explication du concept

Le model CSP a été introduit par Sir Tony Hoare en 1978 http://www.usingcsp.com/cspbook.pdf sous forme d'un formalisme mathématique décrivant un algèbre de processus.

A l'inverse du model d'exclusion mutuelle, le model CSP permet la synchronisation par principe de messagerie

Do not communicate by sharing memory; instead, share memory by communicating. 🤯

L'idée phare est de permettre à des processus sequentiels (c'est à dire qu'ils executent leur actions une à une comme un thread) indépendants de communiquer au travers de couduits, qui permettent de convenir d'un rendez-vous.

En effet, pour qu'une communication se fasse entre un processus et une ressource critique, il faut que le processus soit en mesure d'écrire et la ressource critique en mesure de recevoir.

Ce principe se base sur un formalisme anterieur Dijkstra’s guarded command.

Ce mutex "civilisé" présente l'avantage de pouvoir implémenter un formalisme de haut niveau très clair et simple. La programmation concurrente à la réputation d'être compliquée notemment a cause des mécanisme de bas niveau (mutex, barrières en mémoire..)

Model 3: Software transactional memory (STM)

Explication du concept

A l'inverse du mutex, le model STM à une approche optimiste.

Dis Jamy, c'est quoi une approche optimiste ?

En gros, c'est quand un thread fait son opération dans son coin sans faire attention aux autres.

Par exemple, imaginez qu'un instituteur demande a ses élèves (optimistes ou insouciant) de lui écrire une description d'un animal et de venir ensuite l'écrire au tableau (ressource critique).

Chaque élève s'execute sans penser qu'un autre élève puisse avoir choisi le même animal et se plonge dans sa feuille.

Mateo écrit une description du Lion, puis s'en va l'écrire au tableau (commit).

Quentin écrit la description de l'Autruche, puis s'en va l'écrire au tableau (commit).

Antoine écrit la description du Lion, puis regarde le tableau et s'apperçoit qu'il y a déjà cette description, autrement dit qu'il y a un conflit. Antoine vert de rage, déchire sa feuille (avorte) et recommence avec la description de l'élan.

Ici bien qu'un Thread élève ait du avorter, on eu très rapidement deux resultats au tableau.

La complexité ici est O(2) et au pire dès cas la complexité aurait été O(3)

Imaginons maintenant la même situation basé sur le principe d'exclusion mutuelle. La ressource critique ici ne peux se faire que par un élève à la fois.

On peut aisément imaginer que le fait d'écrire 3 description au tableau aura une complexité en temps O(n) (avec n = 3)

Je finirais en citant wikipédia:

Le bénéfice d'une approche optimiste est la concurrence accrue : aucun thread ne doit attendre pour accéder à une ressource, et différents threads peuvent modifier de manière sûre et simultanée des parties disjointes des structures de données qui seraient normalement protégées sous le même verrou. Malgré le surcoût de réessayer des transactions qui échouent, dans la plupart des programmes réalistes, les conflits sont suffisamment rares pour qu'il y ait un gain de performance immense par rapport aux protocoles à base de verrous sur un grand nombre de processeurs.

Model 4: Actor model

Explication du concept

Le modèle acteur à l'instar du model CSP est une abstraction à assez haut niveu, mais qui diffère du CSP par le fait que le système de messagerie est asynchrone.

Chaque acteur à une identité, zero ou plusieurs "boites aux lettres" (pour la messagerie) et peuvent s'auto-envoyer des messages.

Mon collègue François Sarradin l'a expliqué mieux que moi:

The actor model is a model of concurrent computation, based on components, named actors, that communicate by asynchronously sending messages to each other.An actor is a component that can represent any software components of an application (from the local thread to a distant process). When it receives a message, it stores it in a local queue before processing it.
A message is composed of a reference to the sending actor (sender) and a payload. It may be sent using memory sharing (if actors are in the same memory space), by direct network transmission, or by using a messaging platform (eg. Kafka).Actor model is a concurrency model that fits well applications made of distributed services.
Actor abstraction allows to address distributed computing challenges with a higher degree of expressivity and conciseness. Nevertheless, we will see that other concurrent tools will be necessary to complete this abstraction.
It is closed to object-oriented model: each actor has a state and a behavior and actors communicate by sending messages to each other.

Les avantages ici sont les mêmes les mêmes qu'avec le CSP. A la nuance près qu'ici les la communication est asynchrone.

Les concepts par technologie

Java

Mutex

Synchronized blocks

Prenons un exemple où plusieurs thread tentent de mettre à jour une valeur numérique.

Trois Threads tentent de lire le chiffre, d'ajouter 1 et de mettre une nouveau chiffre dans la boite.

Rappel: Si les actions ne sont pas synchronisées le résultat a des chances d'être erroné. Par exemple si deux thread lisent le chiffre (admettons 0) présent dans la boite en même temps puis écrivent chacun leur tour chiffre + 1, le résultat dans la boite sera 1.. D'où l'utilité d'avoir un accès synchronisé à la ressource critique.

En java pour définir une section critique à l'aide d'un monitor, on utilise le mot clé synchronized.

package fr.univalence.concurrence;

public class ConcurrentBox {
	private Integer numericValue = 0;
	private final Object lock = new Object();
	
	/**
	 * Adds one to our value
	 * @return
	 */
	public int addOne() {
		synchronized (lock) {
			this.numericValue += 1;
			return this.numericValue;
		}
	}
	
	/**
	 * Spawn 10 000 threads trying to add one to the current value
	 * @param args
	 */
	public static void main(String[] args) {
		var box = new ConcurrentBox();
		Runnable runnable = () -> {
			var currentValue = box.addOne();
			System.out.println(currentValue);
		};
		for (int i = 0; i < 10000; i++) {
			new Thread(runnable).start();
		}
	}
}

Vous pouvez essayer de retirer le bloc synchronized pour vérifier que des valeurs sont bien perdues en route.

Les objet contiennent aussi des méthodes pour gérer les signaux. En effet, parfois les threads souhaitent s'échanger des données ou attendre un certain état.

Les objets java sont tous dotés des méthodes wait() notify() et notifyAll().

package fr.univalence.concurrence;

public class Holder {
	private int value; 
	private boolean done; 
	private final Object lock = new Object();

	/**
	 * Initialize numeric value then notifies another thread
	 * @throws InterruptedException
	 */
	private void init() throws InterruptedException {
		synchronized (lock) {
			Thread.sleep(1000);
			value = 42;
			done = true;
			lock.notify();
		}
	}

	/**
	 * Waits for the value to be received then prints it
	 * @throws InterruptedException
	 */
	private void display() throws InterruptedException {
		synchronized (lock) {
			while (!done) { 
				lock.wait();
			}
			System.out.println(value); 
		}
	}

	public static void main(String[] args) throws InterruptedException {
		var holder = new Holder();
		new Thread(() -> {
			try {
				holder.init();
			} catch (InterruptedException e) {
				// Deal with interruption
			}
		}).start();
		holder.display();
	}
}

Implantation (rapide) d'un semaphore en Java

package fr.univalence.concurrence;

/**
 * 
 * @author bastienguihard
 *
 */
public class Semaphore {
	private int availablePermits;
	private final Object lock = new Object();
	private int waitingThreads = 0;

	public Semaphore(int initialPermitsAmount) {
		this.availablePermits = initialPermitsAmount;
	}

	public int waitingForPermits() {
		synchronized (lock) {
			return this.waitingThreads;
		}
	}

	public void release() {
		synchronized (lock) {
			this.availablePermits += 1;
			lock.notifyAll();
		}
	}

	public boolean tryAcquire() {
		synchronized (lock) {
			if (this.availablePermits == 0) {
				return false;
			}
			this.availablePermits -= 1;
			return true;
		}
	}

	public void acquire() throws InterruptedException {
		synchronized (lock) {
			this.waitingThreads += 1;
			while (this.availablePermits == 0) {
				lock.wait();
			}
			this.waitingThreads -= 1;
			this.availablePermits -= 1;
		}
	}

	public static void main(String[] args) {
		var semaphore = new Semaphore(5);
		for (int i = 0; i < 10; i++) {
			new Thread(() -> {
				try {
					semaphore.acquire();
					System.out.println(Thread.currentThread().getName() + "took permit");
					Thread.sleep(1000);
					semaphore.release();
					System.out.println(Thread.currentThread().getName() + "released permit");
				} catch (InterruptedException e) {
					throw new AssertionError(e);
				}
			}).start();
		}
	}
}

Lock - ReentrantLocks

Il existe un autre mécanisme permettant de définir des sections critiques introduit en Java 5 :

Il sont a privilégier en cas de forte contention (beaucoup de threads).

Quels sont les avantages par rapport a synchronized ?

Les ré-entrant ont une version Fair, c'est a dire que cela garantit que si plusieurs threads sont en attente sur un verrou c'est celui qui attend depuis le plus longtemps qui le prend (sinon c'est un thread au hasard)

l'objet possède une méthode trylock qui permet d’acquérir un verrou si possible mais d'être non bloquant.

Aussi, les reentrantlocks peuvent attendre sur plusieurs conditions.

public class Holder {
	private int value;
	private boolean done;
	private final ReentrantLock lock = new ReentrantLock();
	private final Condition condition = lock.newCondition();

	private void init() {
		lock.lock();
		try {
			value = 12;
			done = true;
			condition.signal();
		} finally {
			lock.unlock();
		}
	}

	private void display() throws InterruptedException {
		lock.lock();
		try {
			while (!done) {
				condition.await();
			}
			System.out.println(value);
		} finally {
			lock.unlock();
		}
	}
}

Sémaphore

 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

CSP

TODO

STM

TODO

Actor

TODO

Clojure

Pour aborder ce sujet, je vous propose de commencer par lire le précédent sujet: La concurrence avec Clojure - Introduction

Mutex

Clojure permet un mutex "lock-based" identique au block synchronized de java grâce au mot clé locking

(def lock (Object.))
(future (locking lock 
          (Thread/sleep 5000) 
          (println "done1")))
(Thread/sleep 1000)
(locking lock 
  (Thread/sleep 1000)
  (println "done2"))

STM

En clojure, vous utiliserez le modele STM sans même vous en rendre compte.

En effet, si vous souhaitez faire varier une valeur numérique par plusieurs threads vous pourrez avoir un code ressemblant à ça:

;; spawning 10 threads without using core/async
(let [my-atom (atom 0)]
  (loop [x 10]
    (when (> x 0)
      (future (Thread/sleep 100)
              (swap! my-atom + 1))
      (recur (- x 1)))))
Cet exemple est similaire à l'exemple du monitor en java (voir partie précédente)

Sous le capot, swap! utilise une methode de type compare and set pour venir modifier la ressource critique.

Dans le cadre d'une transaction, une alternative a locking vu précédemment est le bloc dosync.

En effet dosync assure la synchronisation coordonnée de ressources critiques lors de transactions.

(def me (ref {:coins 2 :sushi 0}))
(def sushi-master (ref {:coins 0 :sushi 2}))

(defn trade [client server]
  (dosync
    (when (< 0 (:coins @me))
      (alter me update :coins dec)
      (alter me update :sushi inc)
      (alter sushi-master update :coins inc)
      (alter sushi-master update :sushi dec))))

(trade me sushi-master)
(println @me)
(println @sushi-master)

Ici alter est aussi une fonction de type CAS (compare and set)

L’exécution dosync permet une atomicité du processus et est nécessaire dans le cadre d'une transaction.

CSP

Rich hickey a collaboré sur une librairie implémentant un ensemble de mécanismes de concurrence "CSP-like".

Introducing core/async

Core/async est un fondamental de la concurrence en clojure.

Je vous invite à aller lire la section dédiée dans l'article précédent: La concurrence avec Clojure - Introduction

Ainsi ue l'implantation du problème des dinning philosophers dans la section ci dessous : Comparons des implantations

Actor

TODO

Scala

TODO

Comparons des implantations

l'idée ici n'est pas forcément de faire des implantations exactement équivalente dans toutes les technologies, mais plutôt de voir pour une solution similaire à un même problème comment on l'implanterai naturellement..

Ici pas question de performance mais plutôt de syntaxe.

Clojure a une interropérabilité avec Java, en soit tous les problèmes écrit en clojure peuvent être résolus en Java.

Dining philosophers

Comparons une implantation naive et "au plus simple" du problème des dining philosophers.

Java

Avec un mutex monitor

package fr.univalence.concurrence;

import java.util.Arrays;
import java.util.stream.IntStream;

public class PhilosopherDinner {
	private final Object[] forks;

	public PhilosopherDinner(int forkCount) {
		Object[] forks = new Object[forkCount];
		Arrays.setAll(forks, i -> new Object());
		this.forks = forks;
	}

	public void eat(int index) {
		var index2 = (index + 1) % forks.length;
		Object fork1 = index2 > index ? forks[index] : forks[index2];
		Object fork2 = index2 > index ? forks[index2] : forks[index];
		synchronized (fork1) {
			synchronized (fork2) {
				System.out.println("philosopher " + index + " eat");
			}
		}
	}

	public static void main(String[] args) {
		var dinner = new PhilosopherDinner(5);
		IntStream.range(0, 5).forEach(i -> {
			new Thread(() -> {
				for (;;) {
					dinner.eat(i);
				}
			}).start();
		});
	}
}

Clojure

Avec core/async

L'équivalent de cette implantation en Java reviendrait à utiliser des ArrayBlockingQueue de taille 1

(let [forks [(chan 1) (chan 1) (chan 1) (chan 1) (chan 1)]
      assigned-forks [[0 1] [1 2] [2 3] [3 4] [4 0]]]

  (defn eat [philosopher [fork-1 fork-2]]
    (<!! (get forks fork-1))
    (<!! (get forks fork-2))
    (println (str philosopher " starts eating"))
    (println (str philosopher " is done eating"))
    (>!! (get forks fork-2) :fork)
    (>!! (get forks fork-1) :fork)
    (eat philosopher [fork-1 fork-2]))

  ;; put forks on table
  (mapv (fn [fork-chan] (>!! fork-chan :fork)) forks)
  (dotimes [n 5]
    (thread (eat n (get assigned-forks n)))))

Sacla

TODO

Le mot de la fin…

Pour résumer une bonne partie de cet article à l'aide d'une image :

Ressources