ZIO : Programmation concurrente à la carte

On a déjà parlé plusieurs fois de ZIO sur le blog, (et on va continuer ! 😃).

Au-delà d'être la bibliothèque que l'on attendait pour faire du Functional Programming, ou une façon de faire du Scala sans rentrer dans le polymorphisme ad-hoc ou d'autres concepts fins néanmoins abstraits, ZIO ouvre un nouveau chemin de traverse dans la programmation concurrente.


Le programmation concurrente est l'idée de gérer plusieurs exécutions hétérogènes simultanées de manière collaborative, une façon de faire plus courante dans le monde moderne avec le multi-cœur et les nouvelles architectures. Pour le moment, la boîte à outil de référence pour faire de la programmation concurrente sur la JVM, c'est Akka.

Akka permet de pousser assez loin la programmation concurrente, mais va être parfois assez irritant. D'un côté, on code du Akka, de l'autre du Scala. Puis, quand on a appris d'autres modèles de programmation concurrente que le modèle d'acteur (avec Golang ou Clojure), cela devient légèrement usant !

object AkkaAppp extends App {
  val greeterMain: ActorSystem[GreeterMain.SayHello] = 
    ActorSystem(GreeterMain(), "AkkaHelloWorld")

  greeterMain ! SayHello("Jon")
  // bot : Hello Jon !

  greeterMain ! Bye
}

L'année dernière, on a eu le droit à un peu de magie avant Noël avec l'implémentation du modèle d'acteur au-dessus de ZIO : Basic ZIO actors.

Ainsi on pourrait faire à terme avec ZIO ce que l'on pouvait faire en utilisant Akka. Cela étant dit, ZIO va plus loin sur la base, en particulier on peut faire de la composition en programmation concurrente.

En termes de composition, vous avez quelques types de base : ZIO, Ref, Fiber... Et au-dessus, on va avoir de nombreux outils pour combiner ces différents types. Si vous voulez taper un peu dans la liste, voici le résumé approximatif :

 /**
   * @tparam R ce dont j'ai besoin
   * @tparam E quand cela va mal
   * @tparam A quand tout se passe bien
   */
  trait ZIO[-R, +E, +A] {
    private[zio] def unsafeRun: R => Either[E, A]

    def flatMap[R1 <: R, E1 >: E, B](f0: A => ZIO[R1, E1, B]): ZIO[R1, E1, B]
    def fork: URIO[R, Fiber[E, A]]
  }

  //besoin de rien
  type IO[+E, +A] = ZIO[Any, E, A]

  //tout va bien se passer
  type UIO[+A] = ZIO[Any, Nothing, A]

  //avec R, on peut faire un A
  type URIO[-R, A] = ZIO[R, Nothing, A]

  //pourquoi ?
  type Exit[+E, +A] = Either[Cause[E], A]

  sealed trait Cause[+E]
  case class Fail[+E](value: E)               extends Cause[E]
  case class Die(value: Throwable)            extends Cause[Nothing]
  case class Interrupt(fiberId: (Long, Long)) extends Cause[Nothing]

  trait Fiber[+E, +A] {
    def interrupt: UIO[Exit[E, A]]
    def join: IO[E, A]
  }

  trait Promise[E, A] {
    def await: IO[E, A]
    def complete(io: IO[E, A]): UIO[Boolean]
  }

  trait Queue[-R, +E, A] {
    def isShutdown: UIO[Boolean]
    def offer(a: A): ZIO[R, E, Boolean]
    def take: ZIO[R, E, A]
  }

  trait Ref[A] {
    def get: UIO[A]
    def modify[B](f: A => (B, A)): UIO[B]
  }

  case class Reservation[-R, +E, +A](
    acquire: ZIO[R, E, A], 
    release: Exit[Any, Any] => ZIO[R, Nothing, Any])

  trait ZManaged[-R, +E, +A] {
    def reservation: ZIO[R, E, Reservation[R, E, A]]

    def flatMap[R1 <: R, E1 >: E, B](f0: A => ZManaged[R1, E1, B]): ZManaged[R1, E1, B]
  }

  trait Stream[-R, +E, +A] {
    //1. on "acquire" la stream
    //2. on "pull" les éléments un par un (ZIO[R, Option[E], A])
    //3. si on a un None dans le Option[E], on arrête
    //4. on "release"
    def process: ZManaged[R, Nothing, ZIO[R, Option[E], A]]

    def flatMap[R1 <: R, E1 >: E, B](f0: A => Stream[R1, E1, B]): Stream[R1, E1, B]
  }

  trait Semaphore {
    def available: UIO[Long]
    def withPermits[R, E, A](n: Long)(task: ZIO[R, E, A]): ZIO[R, E, A]
  }

Au-dessus de ces types, on va pouvoir construire le reste des opérateurs pour manipuler les effets (même si on va plutôt "inliner" pour des histoires de performance).


Ces dernières semaines, on a eu quelques articles intéressants sur des cas d'utilisation ou des combinateurs au-dessus de ZIO.

Happy Eyeballs

Happy eyeballs algorithm using ZIO
While researching structured concurrency for the article on Project Loom & Fibers, one of the most valuable resource were blogs (1, 2) by Nathaniel J. Smith, covering the design of the Python library…

L'article part d'un code qui a été fait avec Trio (Python), pour voir comment on peut le faire en Scala.

L'idée est de pouvoir mettre dans la course n tâches (dans une List), pour récupérer le premier résultat, sans lancer plus de tâches que nécessaire, en optimisant le temps d'exécution. C'est interruptible, donc les tâches qui ne sont pas finies sont stoppées. Il n'y a pas de limite dans le panache ! 😃

"Suivez les types"

def happyEyeballs[R, E, T](tasks: List[ZIO[R, E, T]], delay: Duration): 
                           ZIO[R with Clock, Option[E], T]

Pour les types R, E et T:

  • L’algorithme prend une liste de tâches ZIO[R, E, T], donc des tâches qui dépendent d'un R, vont renvoyer plus tard un T si cela se passe bien ou un E si cela se passe mal.
  • Un délai
  • Pour renvoyer une seule tâche ZIO[R with Clock, Option[E], T], donc une tâche qui dépend de R et d'une horloge Clock (pour gérer le délai), qui va renvoyer le premier T si cela s'est passé au moins une fois ou Some[E] si cela se passe mal (Le None est pour le cas où la liste de tâche est vide).

L'article d'Adam est très précis et on peut porter quelques petites modifications à la solution proposée. Je suis parti sur une version :

  • Plus typée : avec un paramètre de type E en plus et en mettant cette histoire de liste vide dans un IO.fail(None). C'est un choix, je trouve que cela paye après (même dans les articles de blog).
  • Plus documentée : les variables pour les effets que l'on enchaîne ont à la fois un nom et un type (signalFailed, waitOrAlreadyFailed).
def happyEyeballs[R, E, A](tasks: List[ZIO[R, E, A]], delay: Duration): ZIO[R with Clock, Option[E], A] =
    tasks match {
      case Nil         => IO.fail(None)
      case task :: Nil => task.asSomeError
      case task :: otherTasks =>
        Promise.make[Nothing, Unit].flatMap { isFailed =>
          val signalFailed: UIO[Unit]                = isFailed.succeed(Unit).ignore
          val waitOrAlreadyFailed: URIO[Clock, Unit] = isFailed.await.timeout(delay).ignore
          
          task
            .tapError(_ => signalFailed)
            .asSomeError
            .race(waitOrAlreadyFailed *> happyEyeballs(otherTasks, delay))
        }

Ce qui est assez fascinant, c'est comment un algorithme relativement complexe compose avec si peu de lignes de ZIO.

CountDownLatch

Creating a dead simple CountDownLatch with ZIO
In this post I’ll explain what a CountDownLatch is, and how ZIO enables you to create concurrency primitives which are efficient, non-blocking and simple. I recently needed to write a test which…

L'article se présente dans le contexte des tests autour de ZIO, où l'on doit attendre plusieurs signaux pour débloquer la barrière, afin de passer à la suite du test. (🤔 Mais pourquoi il n'y a pas juste un ZIO.race ? Le cas d'utilisation doit être plus complexe dans la base de code.)

Voici un "ZIOLatch", pour le reste (timeout, ...), on peut passer par les combinateurs classiques de ZIO !

trait CountDownLatch {
  def countDown: UIO[Unit]
  def await: UIO[Unit]
}

Pour faire l’implémentation, on pourrait faire une coquille autour de l'API Java existante ou utiliser les briques de base de ZIO pour faire le CountDownLatch.

L'article propose cette implémentation :

object CountDownLatch {
  def make(count: Int): UIO[CountDownLatch] = for {
    ready <- Promise.make[Nothing, Unit]
    ref   <- Ref.make(count)
  } yield new CountDownLatch {
    override def countDown: UIO[Unit] =
      ref.updateAndGet(_ - 1).flatMap {
        case 0 => ready.succeed(()).unit
        case _ => ZIO.unit
      }

    override def await: UIO[Unit] = ready.await
  }
}

Je ne suis pas vraiment convaincu. Qu'est-ce qu'il arrive si l'appel à countDown est interrompu au mauvais moment ? C'est-à-dire dans le chat de l’aiguille entre le succès de l'appel à updateAndGet et la suite où pour 0, on va compléter la Promise ready.

On pourrait se dire que si un interrupt se glisse ici, c'est vraiment la faute à pas de chance, jusqu'à que dans d'autres cas :

  • La prod est bloquée dans un état de stase,
  • Des philosophes meurent de faim,
  • Que l'on perde plusieurs 100k€ ...
  • Ou la vie ? 🙀

Donc on va essayer de tester le truc que l'on vient de créer pour tester du code... #Meta

Testons les tests

Voici la spec du CountDownLatch en utilisant zio-tests

  • Le scénario basique
    def make(n: Int): UIO[CountDownLatch]
    
    testM("basic scenario")(for {
      l <- make(2)
      _ <- l.countDown
      _ <- l.countDown
      _ <- l.await
    } yield {
      assertCompletes
    })

    Donc à la base, on initialise un countDownLatch à 2, on fait 2 countDown. Normalement, on doit pouvoir passer tranquillement.

  • Le scénario dans le domaine
    testM("make 0 should be available")({
      for {
        countDownLatch <- make(0)
        r              <- completedOrError(countDownLatch.await, "should be completed")
      } yield r
    })

    Un peu différent, ici on teste que si l'on part de 0, alors on doit pouvoir passer la barrière directement, sinon c'est une erreur. Ce scénario n'est pas prévu pour l’implémentation actuelle, mais ce n'est pas complexe à rajouter.

  • On vérifie que cela bloque comme attendu
    testM("keep the gate closed")(for {
      l  <- make(2)
      _  <- l.countDown
      r1 <- runningOrError(l.await, "shoud be running")
      _  <- l.countDown
      r2 <- completedOrError(l.await, "should be completed")
    } yield {
      r1 && r2
    })
  • On perturbe les effets
    testM("basic scenario with disturb")(
      checkAllM(Gen.fromIterable(0 to 10))(
        steps =>
          for {
            cdl <- make(2)
            _   <- cdl.countDown
            _   <- Disturb.interruptAfterNSteps(cdl.countDown, steps).orElse(cdl.countDown)
            r   <- completedOrError(cdl.await, "should be completed")
          } yield {
            r
          }
      )
    )

    Ce test est beaucoup plus complexe. On perturbe la deuxième exécution de .countDown, et si cette exécution est en erreur, on relance un .countDown avant de vérifier si .await est disponible.

    Afin de définir ce test, il faut réussir à pouvoir perturber des effets. ZIO est basé sur de la composition de représentations différentes des effets, que l'on peut déstructurer en exécution pas à pas, dans du code (super-)utilisateur.

    Sans expliquer jusqu'au bout l'approche, on peut taper dans le code interne de ZIO pour faire une perturbation des effets, qui va arrêter l'exécution après N étapes.

    object Disturb {
      def interruptAfterNSteps[R, E, A](effect: ZIO[R, E, A], maxExecutionSteps: Int): ZIO[R, Option[E], A]
    }

    Et caler ça dans un test avec un peu de property-based-testing :

    testM("basic scenario with disturb")(
      checkAllM(Gen.fromIterable(0 to 10))(
        steps =>
          for {
            cdl <- make(2)
            _   <- cdl.countDown
            _   <- Disturb.maxSteps(cdl.countDown, steps).orElse(cdl.countDown)
            r   <- completedOrError(cdl.await, "should be completed")
          } yield {
            r
          }
      )
    )

La suite de tests permet de confirmer les 2 "erreurs" (on les a cherchées).

- finish
  + basic scenario
  + keep the gate closed
  - make 0 should be available
  - basic scenario with disturb

Fix de l’implémentation

Un moyen facile pour fixer l'implémentation, c'est de :

  • Faire une implémentation pour la valeur manquante dans le domaine (make(0)).
  • Passer en non-interruptible la section du code qui ne doit pas être coupé en deux.
object CountDownLatch {
  def make(count: Int): UIO[CountDownLatch] =
    if (count <= 0)
      UIO(new CountDownLatch {
        override def countDown: UIO[Unit] = ZIO.unit
        override def await: UIO[Unit]     = ZIO.unit
      })
    else
      for {
        ready <- Promise.make[Nothing, Unit]
        ref   <- Ref.make(count)
      } yield new CountDownLatch {
        final override def countDown: UIO[Unit] =
          (ref.updateAndGet(_ - 1) >>= {
            case 0 => ready.succeed(()).ignore
            case _ => ZIO.unit
          }).uninterruptible

        final override def await: UIO[Unit] = ready.await
      }
}

Néanmoins si on reprend le problème à la base, il y a probablement moyen de faire plus simple. La version Java utilise "juste" une AtomicRef pour garder le comptage et fait une "petite" boucle pour l'attente. On peut faire pareil en ZIO, ce qui donne un truc relativement propre.

object CountDownLatchJavaLike {
  def make(count: Int): UIO[CountDownLatch] =
    for {
      ref <- Ref.make(count)
    } yield new CountDownLatch {
      override def countDown: UIO[Unit] = ref.update(_ - 1)
      override def await: UIO[Unit]     = ref.get.doUntil(_ <= 0).unit
    }
}

Les perfs sur le await ne sont pas tops, c'est dommage de lancer un "Schedule" si le countDownLatch est déjà libéré. On pourrait faire un appel direct avant de faire un appel récurrent.

def await: UIO[Unit] = ref.get flatMap {
  x => if(x <= 0) ZIO.unit 
             else ref.get.doUntil(_ <= 0).unit
}
//ou faire un autre combinateur
@inline
def fastDoUntil[R,E,A](e: ZIO[R,E,A])(p: A => Boolean) = 
  e flatMap {
    x => if(p(x)) ZIO.succeed(x)
         else     e.doUntil(p)
  }

def await: UIO[Unit] = fastDoUntil(ref.get)(_ <= 0).unit

Voici les liens vers le code :

Conclusion

ZIO permet de faire beaucoup plus en programmation concurrente et s'il manque des éléments dans le menu, on peut toujours construire par-dessus à la carte.

⚠️ Néanmoins cela nécessite de faire des tests plus poussés en environnement concurrent.

<fiction>Pour le prochain article on regardera pour la conversion de TLA+ (PlusCal) vers ZIO. </fiction>

Quelques liens utiles :

La version depuis java.util.concurrent.CountDownLatch.

import zio.blocking._

object CountDownLatchWrapJava {
  def make(count: Int): UIO[CountDownLatch] =
    for {
      cdl <- WrapImpure.wrapTotal(new java.util.concurrent.CountDownLatch(count))
    } yield {
      new CountDownLatch {
        override def countDown: UIO[Unit] = cdl.execTotal(_.countDown())
        override def await: UIO[Unit]     = cdl.execTotal(_.getCount()).doUntil(_ <= 0L).unit
      }
    }
}

//Ou une version plus technique avec l'aide de Adam Fraser
object CountDownLatchWrapJavaBlock {
  def make(count: Int): URIO[Blocking, CountDownLatch] =
    for {
      blocking <- ZIO.environment[Blocking]
      cdl      <- WrapImpure.wrapTotal(new java.util.concurrent.CountDownLatch(count))
    } yield {
      new CountDownLatch {
        override def countDown: UIO[Unit] = cdl.execTotal(_.countDown())
        override def await: UIO[Unit]     = cdl.execBlockingInterrupt(_.await()).orDie.provide(blocking)
      }
    }
}

//regarde maman, sans Kleisli
final class WrapImpure[B](private val value: B) {
  def exec[C](f: B => C): Task[C]                           = ZIO.effect(f(value))
  def execTotal[C](f: B => C): UIO[C]                       = ZIO.effectTotal(f(value))
  def execBlockingInterrupt[C](f: B => C): RIO[Blocking, C] = effectBlockingInterrupt(f(value))
}

object WrapImpure {
  def wrap[B](b: => B): Task[WrapImpure[B]]     = ZIO.effect(new WrapImpure(b))
  def wrapTotal[B](b: => B): UIO[WrapImpure[B]] = ZIO.effectTotal(new WrapImpure(b))
}

Crédit photo : unsplash / Sunrise Photos