Amélioration du lead time des chaînes en Spark avec un peu de Monix

Monix

Monix est un framework Scala permettant la gestion des effets grâce à l'abstraction Task, qui peut englober tout ce qui relève du code à effet, comme print ou la lecture de fichier.

Nous utilisons ici la version 3.0.0 (qui est toujours en RC), car c'est la première version de Monix à implémenter les fibers. Tout comme les coroutines, les fibers (ou fibres) sont des petites unités de traitement coopératifs, pouvant s'intégrer au sein d'un Thread.

Expérience sur Monix avec Spark

En lançant deux jobs Spark en parallèle avec Monix dans une même application Spark, nous voulions montrer (dans le cadre d'un audit Univalence) qu'il était possible de réduire significativement la durée totale d'un traitement qui lance plusieurs jobs.

Deux dataframes sont créées et sont mises dans des Task de Monix. En utilisant une for-comprehension, ces deux Task peuvent être utilisées afin de pouvoir montrer qu'il est possible grâce à Monix d'avoir des gains de temps sur les jobs Spark.

Dans l'application, la jointure est mise en mémoire cache afin de pouvoir ensuite lancer deux jobs en parallèle depuis ce cache. Ces deux jobs sont lancés grâce à la méthode start() de Monix, qui permet de lancer Task avec des Fibers.

En ce qui concerne le slow, cette fonction sert juste à ralentir le count car il serait sinon trop rapide.

for{
      df1 <- loadDF1.memoize
      df2 <- loadDF2.memoize

      join = df1.join(df2).repartition(16).cache()

      job1 <- Task(join.slow.count).start
      job2 <- Task(join.slow.count).start

      end <- Task.parZip2(job1.join,job2.join)
} yield end

Voici le DAG des différentes étapes du code donné ci-dessus.

Sur l'image ci-dessous, nous voyons que nous avons bien réussi à lancer deux jobs en parallèle au sein d'une seule instance de SparkContext.

Sur l'image ci-dessous, nous remarquons que même si le premier count n'est pas terminé, le deuxième est déjà lancé.

Au final : le deuxième count est lancé en même temps, avec un temps de mise en œuvre total de 7 minutes, au lieu de 2 * 6.1 minutes (le lancement en concurrence dans ce cas à fait gagner 85% du temps d’exécution sans demander de ressource supplémentaires).

Conclusion

Monix est une librairie vraiment intéressante à combiner avec Spark cependant nous avons aussi trouvé qu'il était impossible d'annuler un job Spark se trouvant dans une fiber que ce soit avec Monix ou ZIO. Quand bien même l'use case d'annuler des jobs Spark en cours n'est pas très commun, il aurait été plus sympathique de pouvoir le faire car c'est normalement une des fonctionnalités majeures lorsque nous utilisons des fibers que ce soit avec Monix ou tout autre librairie similaire.