Spark & ZIO : rencontre du 3e type

Zed Aïe Oh

Avec l'arrivée imminente de ZIO 1.0, une grande nouveauté apparaît dans cette bibliothèque : à savoir l'adoption d'un troisième type paramétré pour l'IO. Le type ZIO[R, E, A] fut ainsi créé.

Auparavant, le type utilisé par ZIO pour encapsuler votre code avec effet était tout simplement un IO[E, A] avec :

  • E → Le type utilisé en cas d'échec, souvent un Throwable ou des erreurs métiers. Mais il peut aussi être Nothing, si votre fonction n'est pas amené à échouer.
  • A → Le type retourné en cas de succès, avec Nothing indiquant que la fonction ne se finit jamais ou un Unit si la fonction à des effets et ne retourne pas de valeur.

Maintenant nous avons accès à un troisième type paramétré noté R par convention. R représente tout simplement l'environnement / le contexte nécessaire pour pouvoir faire fonctionner le code à effet dans vos fonctions, comme par exemple la connexion à une base de donnée ou la session Web. Cet environnement peut être composé de plusieurs modules, d'une simple configuration ou bien si vous n'avez besoin de rien, dans ce cas vous utiliserez le type Any.

Et Spark dans tout ça... ?

Que se passe-t-il si vous mettez ZIO et Spark dans un grand bol de céréales ? Paf, ça ne fait pas des ChocapicsTM, mais plutôt un ZIO[SparkEnv, E, A] que nous pouvons habilement renommer TaskS[A] pour les fans de Monix 😉.

Mais vous pouvez aussi prendre des Chocapics™️ !
type TaskS[X] = TaskR[SparkEnv, X]
// Avec TaskR[SparkEnv, X] = ZIO[SparkEnv, Throwable, X]

Maintenant que vous détenez le pouvoir de Spark et ZIO dans vos mains, vous pouvez maintenant vous amuser à écrire des for comprehension partout.

L'avantage des for comprehension dans Scala est la facilité de manipulation de tout votre code englobé dans des Task.

Un exemple de programme utilisant ZIO ayant pour environnement Spark :

//Mise en place d'une session Spark
val ss: SparkSession   =
  SparkSession.builder.master("local[*]").getOrCreate()
//SparkZIO est une classe qui étend le trait SparkEnv et qui embarque une session Spark
val sparkEnv: SparkZIO = new SparkZIO(ss)

//Votre programme
val prg: TaskS[(DataFrame, DataFrame)] =
  for {
    //Supposons que totofile contienne de la donnée texte
    df  <- sparkEnv.read.textFile("totofile")
    _   <- Task(df.createTempView("totoview"))
    df2 <- SparkEnv.sql(s"""SELECT * FROM totoview""")
  } yield (df, df2)

//Ici nous donnons à notre programme l'environnement Spark
val liveProgram: IO[Throwable, (DataFrame, DataFrame)] = prg.provide(sparkEnv)

//Un runtime par défaut est fourni par ZIO qui sera utilisé pour exécuter les effets
val runtime: DefaultRuntime = new DefaultRuntime {}
//L'exécution des effets a effectivement lieu ci-dessous. On récupère aussi les retours de ces effets
val resRun: (DataFrame, DataFrame) = runtime.unsafeRun(liveProgram)

//resRun._1 et resRun._2 ont le même contenu

SparkEnv.sql() est un helper qui masque l'accès à l'environnement Spark fourni à ZIO :

def sql(queryString: String): TaskS[DataFrame] =
    ZIO.accessM(_.query.sql(queryString))

ZIO.accessM est LA fonction qui vous permet d'accéder à l'environnement. L'environnement SparkEnv est représenté sous la forme d'un trait Scala. L'inspiration venant principalement du blog post sur cette nouvelle fonctionnalité décrit par John de Goes.

Si vous voulez en savoir plus sur le mariage entre ZIO et Spark, je vous invite à aller voir notre repo git spark-tools qui contient divers outils utiles pour le data engineering. Spark-zioest l'un des projets qui se trouve dans spark-tools et contient notre implémentation. Spark-zio est aussi disponible sous Maven Central (nous utilisons sbt-dynver pour déterminer le numéro de version).

Nous avons choisi de wrapper la méthode sql(sqlText: String): DataFrame de SparkSession, car nous n'avons aucun contrôle sur la requête faite. Elle peut en effet faire des drop tables ou autres actions à effet dans Spark.

Nous avons aussi wrappé toutes les méthodes à effet du type load et write.

En se limitant à cela et en ne se préoccupant pas des méthodes du type .select(), .where() que nous pouvons considérer comme raisonnablement pur, nous obtenons ainsi une bibliothèque beaucoup plus maintenable, sachant qu'il aurait été très coûteux de devoir wrapper toutes les APIs Spark.