Cet article est en cours de rédaction, son contenu peut évoluer sans préavis et les informations qu'il contient peuvent manquer de précisions.
Introduction
Intégrer une librairie existante dans ZIO est souvent assez direct. Pour simplifier, dans les (très) grandes lignes, il suffit de wrapper toutes les fonctions qui causent des effets dans ZIO. Ces fonctions auront ainsi pleins de nouveaux superpouvoirs tels que la parallélisation, la gestion des erreurs intégrée, le scheduling et j’en passe.
Avec ceci en tête, nous avons donc décidé de faire zio-spark. Nous utilisions déjà ZIO et Spark dans nos pipelines et le but était ainsi d’avoir une meilleure syntaxe. Nous sommes aujourd’hui fiers de cette librairie, mais ceci ne s'est pas passé sans son lot de difficultés.
Je vous propose de ce fait de retracer son histoire afin de voir les différentes problématiques qui se sont posées et comment nous les avons traités.
La méthode naïve
Nous voulions une API qui se rapproche le plus possible de celle de Spark. Le but étant que l’utilisateur puisse aisément transformer du code Spark and du code zio-spark.
Ainsi si nous prenons le code suivant :
import org.apache.spark.sql._
val df: DataFrame = ???
val res: Int = df.filter("").count
Le but est d’avoir une API similaire pour zio-spark :
import zio._
import zio.spark.sql._
val df: DataFrame = ???
val res: Task[Int] = df.filter("").count
Pour ce faire, nous avons créé notre propre objet Dataset[T]
de la manière suivante, sachant qu’un DataFrame
n’est qu’un Dataset[Row]
:
package zio.spark.sql
import org.apache.spark.sql.{Dataset => UnderlyingDataset}
import zio._
case class Dataset[T](underlying: UnderlyingDataset[T]) {
def filter(conditionExpr: String): Dataset[T] =
Dataset(underlying.filter(conditionExpr))
def count(implicit trace: Trace): Task[Long] =
ZIO.attempt(underlying.count)
}
Cela fonctionne très bien et répondait à notre problématique puisque l’utilisateur pouvait à partir de maintenant manipuler des DataFrames
dans lesquels toutes actions étaient transformés en effet grâce à ZIO nous permettant
d’ores et déjà de scheduler nos jobs par exemple.
Toutes actions ? Pas vraiment… Dans l’état actuel des choses, notre nouveau Dataset
n’en a qu’une seule, la fonction count
. Le reste, que ce soit les transformations ou les actions, il va falloir les implémenter
nous même et autant vous dire que l’API de Spark ne rigole pas. Pour être précis, il y a, pour la version Spark 3.3.1 en Scala 2.13, 216 fonctions dans la class Dataset
(On ne prend pas en compte ses déclinaisons
telles que le RelationalGroupedDataset
par exemple).
Nous sommes vite arrivés à la conclusion qu’il fallait qu’on réfléchisse autrement.
La génération de code
Nous sommes partis d’un postulat simple : un Dataset
possède majoritairement deux types d’opérations, les actions et les transformations.
- Les transformations sont pures (ou presque) puisqu’elles servent juste à mettre à jour le
LogicalPlan
qui n’est rien d’autre qu’un ADT représentant un programme Spark. - Les actions sont impures parce qu’elles appliquent le
LogicalPlan
et doivent donc être wrapper dans ZIO.
C’est une façon de penser assez binaire et même un robot pourrait réécrire notre version d’un Dataset
. S’il pourrait, pourquoi ne le ferait-il pas ?
Il s’avère que pendant cette période, je suis tombé sur un article de Vigoo, un développeur Scala qui travailler actuellement pour Ziverge, la boite en charge de ZIO. Ce dernier expliquait comment il avait fait pour générer zio-aws, une librairie qui créer un client Scala pour les tous les services AWS basé sur ZIO. Cela nous a servi de base pour zio-spark !
Le plugin SBT
Nous n’avons pas réinventé la roue pour générer nos fichiers .scala
. En effet, nous avons suivi à la lettre l’article de blog de Vigoo à ce niveau-là. Nous avons créé un plugin SBT local nommé zio-spark-codegen charger
de générer ces fichiers.
Pour les générer, nous devions nous baser sur le code source Spark. Notre programme devait ainsi, à l’instar de zio-aws qui se basait sur des jsons, créer un programme capable de lire du code Scala pour en générer à son tour.
Scalameta à la rescousse
Nous avons tout d’abord testé la réflexion pour lire le code Spark. Malheureusement, ceci, c'est très vite avéré inefficace. Le Type Erasure nous empêchait de récupérer des informations primordiales sur les différents types, rendant la tâche très difficile voir impossible. Certaine chose nous semblait aussi impossible avec cette technique. On ne voyait par exemple pas comment on pourrait récupérer les docstrings des différentes fonctions sans une étape manuelle.
Après avoir essayé et raté, nous nous sommes donc tournés vers Scalameta. Scalameta n’utilise pas du tout la réflexion, cette librairie crée depuis un ADT représentant un programme depuis du code Scala (un String). Il permet de valider qu’un programme est correcte, que deux programmes soient synthétiquement équivalents (même si la forme n’est pas la même), de créer des règles de syntaxe (scalafmt et scalafix se base sur scalameta) et dans notre cas d’interpréter l’ADT pour régénérer du code Scala depuis ce dernier.
Jusqu’à présent, voici comme le plugin fonctionne dans les grandes lignes :
- On récupère le classpath
- On récupère le code source
Dataset.scala
- On récupère son ADT grâce à Scalameta
- On traverse cet ADT pour générer notre
Dataset.scala
sous la forme d’un simple String - On écrit ce String dans l’endroit de notre choix
Du code custom dans du code généré
Félicitation, nous avons notre Dataset
généré de A à Z en suivant la règle binaire érigé au-dessus (on wrap toutes les actions dans des effets et on retranscrit comme tel les transformations). Malheureusement pour nous,
le monde n’est pas si manichéen. Ce modèle automatique est devenu assez restrictif lorsque nous avons eu besoin de customiser certaine fonction de Dataset
.
En effet, au fur et à mesure que nous avancions, nous avons eu besoin d’avoir au sein de nos Dataset
, du code custom. Nous avions besoin plus précisément de cette feature pour deux cas particulier.
Créer nos propres fonctions
Créer nos fonctions se comprend assez facilement, nous voulions notamment rajouter une fonction headOption
afin de profiter du type system de Scala.
Éditer manuellement des fonctions existantes
Mais pourquoi voudrions-nous éditer manuellement les fonctions de Spark ? Car ZIO ne se résume pas qu’à wrapper les effets dans ZIO. Prenons l’exemple de la fonction show
. Cette dernière affiche par défaut les 20 premières
lignes d’un Dataset
dans la console. C'est une action, ainsi notre plugin va générer la fonction suivante :
def show(): Task[Unit] = ZIO.attempt(underlying.show())
Cette fonction est ennuyante à tester pour l’utilisateur, il va forcément devoir passer par un mock puisque show
se base sur la fonction println
de Scala. Un utilisateur de ZIO n’a pas l’habitude de mocker print
parce
qu’il utilise Console.printLine
à la place ! C’est chose faite dans zio-spark, la fonction show
a été réécrite pour se baser sur Console.printLine
. Ainsi, vous pouvez écrire le test suivant
sans encombre :
test("Dataset should implement show correctly") {
val result =
"""+---------+---+
>| name|age|
>+---------+---+
>| Maria| 93|
>| John| 24|
>| Peter| 19|
>|Cassandra| 46|
>+---------+---+
>
>""".stripMargin('>')
for {
df <- read
_ <- df.show
output <- TestConsole.output
representation = output.mkString("\n")
} yield assertTrue(representation == result)
} @@ silent
Ce comportement est très spécifique à la fonction show, la génération automatique de code n’est pas faite pour alors comment l’intégrer ?
Insérer du code custom dans la génération
Ce code, il va falloir l’écrire nous même pardi ! Rassurer vous, si c'était aussi simple, je n’aurais pas pris la peine d’écrire cette partie.
La façon la plus naïve de le faire est de tagger cette fonction afin qu’elle ne soit pas automatiquement générer puis de la rajouter à la main en l’écrivant sous forme de string pour l’insérer dans le string final. Cela nous enlève tous ce qui nous plaisait dans Scala, nous manipulons un string donc il n’y a pas d’autocomplétion, pas d’erreurs au compile time, pas de coloration syntaxique. Pour résumer, ce n’était pas plaisant.
Il nous fallait un moyen d’écrire du code Scala qui serve uniquement pour la génération de code, qui se base sur l’ancienne classe déjà générée d’un Dataset
pour pouvoir par-dessus générer des fonctions afin de générer
le prochain version de Dataset
. Désolé pour le nœud au cerveau, mais c’est exactement ce que nous avons fait et je vais vous expliquer pas à pas comment !