Introduction a zio-spark

Introduction

Il y a deux semaines, nous avons décidé d’annoncer zio-spark. C’est un projet qui nous tient particulièrement à cœur puisqu’il résulte de la combinaison de deux outils que nous affectionnons particulièrement, à savoir ZIO et Spark. Son objectif premier est de faciliter l’intégration de Spark dans l’écosystème ZIO afin de profiter des différents avantages de ce dernier.

Le but de cet article est de vous montrer comment la combinaison de ces deux librairies peut vous faciliter la vie au quotidien.

Installation

Cet article se base sur la version 0.10.0 de zio-spark. Les dépendances suivantes ont été utilisées :

val dependencies = Seq(
	"org.apache.spark" %% "spark-core" % "3.2.2",
	"org.apache.spark" %% "spark-sql" % "3.2.2",
	"io.univalence"    %% "zio-spark"  % "0.10.0"
)

Avantages liés à ZIO

Les caractéristiques de ZIO sont surtout utiles lorsque nous devons manipuler nos jobs. En effet, ici, on opère au niveau du driver, il n’a pas d’effet sur les workers et donc sur les traitements des jobs Spark.

L’objectif premier est d’optimiser l’allocation des ressources en permettant à l’utilisateur de facilement lancer plusieurs jobs en parallèle et de pouvoir programmer ses différents jobs comme il le sent.

Voyons voir ces différentes facettes.

Planification des jobs

Il y a un moment où il faut se demander comment faire pour planifier nos jobs. On veut, par exemple, pouvoir lancer un job en particulier chaque début de semaine, le lundi à 9h du matin.

Généralement, les ingénieurs choisissent d’utiliser Airflow pour répondre à cette problématique.

C’est un outil qui y répond parfaitement, mais qui peut paraitre un peu overkill pour certains projets. En effet, pour ce simple besoin, il va falloir :

  • Gérer une instance Airflow ou payer pour un service serverless équivalent
  • Former les ingénieurs à la notion de DAG et autres spécificités liées à l’outil

ZIO donne une alternative beaucoup moins coûteuse en ressources et en temps grâce aux Schedules. Il permet de répondre à cette problématique directement depuis notre code.

Voici un exemple qui répond à la problématique ci-dessus :

import zio._
import zio.spark.sql._

val job: ZIO[SparkSession, Throwable, Int] = ???
val eachMondayAt9AM = Schedule.dayOfWeek(1) && Schedule.hourOfDay(9)

val app = job schedule eachMondayAt9AM

Gestions des erreurs

On peut aussi simplement gérer nos erreurs de pipeline et réagir en conséquence. Dans l’exemple ci-dessous, on lance un autre job si le premier crash :

import zio._
import zio.spark.sql._

val job: ZIO[SparkSession, Throwable, Int] = ???
val fallbackJob: ZIO[SparkSession, Throwable, Int] = ???

val app = job orElse fallbackJob

Nous ne sommes pas obligés de régler un problème de job Spark en lançant un autre job Spark, on aurait aussi très bien pu envoyer un mail à un service de support ou autre à la place.

Parallélisation des jobs

Comme dit au-dessus, l’avantage de zio-spark est de pouvoir facilement lancer en parallèle nos différents jobs et ainsi permettre à nos workers de toujours être occupés lorsqu’il y a du travail.

Voici un exemple pour lancer deux jobs simultanément :

import zio._
import zio.spark.sql._

val job: ZIO[SparkSession, Throwable, Int] = ???
val otherJob: ZIO[SparkSession, Throwable, Int] = ???

val app = job zipPar otherJob

Bien évidemment, toutes ces caractéristiques ne sont pas propres à zio-spark mais à ZIO, j’ai notamment fait un article qui va un peu plus loin sur le sujet.

Qu’est-ce que zio-spark apporte en plus ?

Avant que zio-spark existe, de nombreux ingénieurs ont déjà utilisé Spark et ZIO ensemble pour jouir des caractéristiques citées ci-dessus. Nous étions aussi dans ce cas et c’est pour cela que nous avons eu envie d’aller un peu plus loin afin d’avoir une intégration plus intelligente et plus transparente.

Une API plus plaisante et safe

Le principal objectif de zio-spark est de wrapper l’api spark pour l’adapter à ZIO. Pour comprendre la différence, le mieux est de regarder du code avec zio-spark et sans zio-spark compatible avec Zio.

Sans Zio-spark compatible avec Zio :

import zio._
import org.apache.spark.sql._

val sparksession = SparkSession.builder().master("local[1]").getOrCreate()
val dataframe    = ZIO.attempt(sparksession.read.csv(""))
val result       = dataframe.flatMap(df => ZIO.attempt(df.count()))

val app = result

Avec zio-spark :

import zio.spark.sql._

val sparksession = SparkSession.builder.master("local[1]").asLayer
val dataframe    = SparkSession.read.csv("")
val result       = dataframe.flatMap(_.count)

val app = result.provide(sparksession)

Avec cet exemple, on peut noter plusieurs changements :

  • zio-spark utilise le système de Layer de ZIO pour passer la spark session. Cela a plusieurs avantages. Le premier étant que l’utilisateur n’a pas besoin de passer la spark session manuellement tout le long de son implication, il peut juste assumer qu’à la fin du monde une spark session sera donné et ne plus y penser. Cela permet aussi de scoper la spark session. Ainsi, en cas de crash ou autre, la spark session se ferme automatiquement.
  • zio-spark fait en sorte que toutes les actions soient automatiquement des effets. Ainsi, vous n’avez pas besoin de penser à ce qu’est ou non une action. Il suffit de regarder le type de retour pour s’en rendre compte. Cela rend aussi le code beaucoup plus lisible.

Une intégration pensée pour ZIO

zio-spark essaye d’intégrer au maximum les caractéristiques de ZIO dans Spark. Nous avons déjà vu l’utilisation de layer pour la spark session, mais ça ne s’arrête pas là.

À titre d’exemple, la fonction printSchema utilise la Console de ZIO et non print de Scala. Cela permet d’utiliser l’output dans nos tests. Voici un exemple de test rendu possible grâce à ce mécanisme (cela aurait aussi été possible avec Spark mais il aurait fallu faire un mock pour y arriver) :

import zio._
import zio.test._
import zio.test.TestAspect._
import zio.spark.sql._

test("Dataset should have the correct schema") {
  val result =
    """root
      > |-- name: string (nullable = true)
      > |-- age: integer (nullable = true)""".stripMargin('>')

  for {
    df     <- SparkSession.read.csv("")
    _      <- df.printSchema
    output <- TestConsole.output
    representation = output.mkString("\n")
  } yield assertTrue(representation.contains(result))
} @@ silent

Nous avons aussi essayé de rendre les jobs Spark annulable. En effet, ZIO joue beaucoup sur la concurrence et nous voulions que ceci s’applique aussi aux jobs Spark. Malheureusement, ceci ne fonctionne pas par le simple fait de wrapper notre action Spark dans ZIO. Le cas échéant, la fiber contenant le job sera supprimée, mais le job au niveau des workers continuera d’être traité. Nous fournissons un moyen pour répondre à cette problématique et je vous invite à regarder la documentation pour plus d’informations.

Conclusion

J’espère, après lecture de cette introduction, que vous vous sentez plus à l’aise avec cette librairie, pourquoi nous l’avons créé et ce qu’elle peut vous apporter dans votre quotidien.

Vous vous demandez peut-être comment tout cela fonctionne sous le capot, pour cela, il faudra attendre le prochain article où nous expliquerons comment faire une librairie zio (zio-spark, zio-notion, zio-aws, …) et où nous parlerons de code generation !