Introduction
Intégrer une librairie existante dans ZIO est souvent assez direct. Pour simplifier, dans les (très) grandes lignes, il suffit de wrapper toutes les fonctions qui causent des effets dans ZIO. Ces fonctions auront ainsi pleins de nouveaux superpouvoirs tels que la parallélisation, la gestion des erreurs intégrée, le scheduling et j’en passe.
Avec ceci en tête, nous avons donc décidé de faire zio-spark. Nous utilisions déjà ZIO et Spark dans nos pipelines et le but était ainsi d’avoir une meilleure syntaxe. Nous sommes aujourd’hui fiers de cette librairie, mais ceci ne s'est pas passé sans son lot de difficultés.
Je vous propose de ce fait de retracer son histoire afin de voir les différentes problématiques qui se sont posées et comment nous les avons traités.
La méthode naïve
Nous voulions une API qui se rapproche le plus possible de celle de Spark. Le but étant que l’utilisateur puisse aisément transformer du code Spark and du code zio-spark.
Ainsi si nous prenons le code suivant :
import org.apache.spark.sql._
val df: DataFrame = ???
val res: Int = df.filter("").count
Le but est d’avoir une API similaire pour zio-spark :
import zio._
import zio.spark.sql._
val df: DataFrame = ???
val res: Task[Int] = df.filter("").count
Pour ce faire, nous avons créé notre propre objet Dataset[T]
de la manière suivante, sachant qu’un DataFrame
n’est qu’un Dataset[Row]
:
package zio.spark.sql
import org.apache.spark.sql.{Dataset => UnderlyingDataset}
import zio._
case class Dataset[T](underlying: UnderlyingDataset[T]) {
def filter(conditionExpr: String): Dataset[T] =
Dataset(underlying.filter(conditionExpr))
def count(implicit trace: Trace): Task[Long] =
ZIO.attempt(underlying.count)
}
Cela fonctionne très bien et répondait à notre problématique puisque l’utilisateur pouvait à partir de maintenant manipuler des DataFrame
dans lesquels toutes actions étaient transformées en effets grâce à ZIO nous permettant d’ores et déjà de scheduler nos jobs par exemple.
Toutes actions ? Pas vraiment… Dans l’état actuel des choses, notre nouveau Dataset
n’en a qu’une seule, la fonction count
. Le reste, que ce soit les transformations ou les actions, il va falloir les implémenter nous même et autant vous dire que l’API de Spark ne rigole pas. Pour être précis, il y a, pour la version Spark 3.3.1 en Scala 2.13, 216 fonctions dans la class Dataset
(On ne prend pas en compte ses déclinaisons telles que le RelationalGroupedDataset
par exemple).
Nous sommes vite arrivés à la conclusion qu’il fallait qu’on réfléchisse autrement.
La génération de code
Nous sommes partis d’un postulat simple : un Dataset
possède majoritairement deux types d’opérations, les actions et les transformations.
- Les transformations sont pures (ou presque) puisqu’elles servent juste à mettre à jour le
LogicalPlan
qui n’est rien d’autre qu’un ADT représentant un programme Spark. - Les actions sont impures parce qu’elles appliquent le
LogicalPlan
et doivent donc être “wrappées” dans ZIO.
C’est une façon de penser assez binaire et même un robot pourrait réécrire notre version d’un Dataset
. S’il pourrait, pourquoi ne le ferait-il pas ?
Il s’avère que pendant cette période, je suis tombé sur un article de Vigoo, un développeur Scala qui travaillait actuellement pour Ziverge, la boite en charge de ZIO. Ce dernier expliquait comment il avait fait pour générer zio-aws, une librairie qui créé un client Scala pour tous les services AWS basés sur ZIO. Cela nous a servi de base pour zio-spark !
Le plugin SBT
Nous n’avons pas réinventé la roue pour générer nos fichiers .scala
. En effet, nous avons suivi à la lettre l’article de blog de Vigoo à ce niveau-là. Nous avons créé un plugin SBT local nommé zio-spark-codegen chargé de générer ces fichiers.
Pour les générer, nous devions nous baser sur le code source Spark. Notre programme devait ainsi, à l’instar de zio-aws qui se basait sur des jsons, créer un programme capable de lire du code Scala pour en générer à son tour.
Scalameta à la rescousse
Nous avons tout d’abord testé la réflexion pour lire le code Spark. Malheureusement, ceci s'est très vite avéré inefficace. Le Type Erasure nous empêchait de récupérer des informations primordiales sur les différents types, rendant la tâche très difficile voir impossible. Certaines choses nous semblaient aussi impossible avec cette technique. On ne voyait pas, par exemple, comment on aurait pu récupérer les docstrings des différentes fonctions sans une étape manuelle.
Après avoir essayé et raté, nous nous sommes donc tournés vers Scalameta. Scalameta n’utilise pas du tout la réflexion, cette librairie crée depuis un ADT représentant un programme depuis du code Scala (un String). Il permet de valider qu’un programme est correct, que deux programmes soient synthétiquement équivalents (même si la forme n’est pas la même), de créer des règles de syntaxe (scalafmt et scalafix se basent sur scalameta) et dans notre cas d’interpréter l’ADT pour regénérer du code Scala depuis ce dernier.
Jusqu’à présent, voici comment le plugin fonctionne dans les grandes lignes :
- On récupère le classpath
- On récupère le code source
Dataset.scala
- On récupère son ADT grâce à Scalameta
- On traverse cet ADT pour générer notre
Dataset.scala
sous la forme d’un simple String - On écrit ce String dans l’endroit de notre choix
Du code custom dans du code généré
Félicitations, nous avons notre Dataset
généré de A à Z en suivant la règle binaire érigé au-dessus (on wrap toutes les actions dans des effets et on retranscrit comme tels les transformations). Malheureusement pour nous, le monde n’est pas si manichéen. Ce modèle automatique est devenu assez restrictif lorsque nous avons eu besoin de customiser certaines fonctions de Dataset
.
En effet, au fur et à mesure que nous avancions, nous avons eu besoin d’avoir, au sein de nos Dataset
, du code custom. Nous avions besoin plus précisément de cette feature pour deux cas particulier.
Créer nos propres fonctions
Créer nos fonctions se comprend assez facilement, nous voulions notamment rajouter une fonction headOption
afin de profiter du type system de Scala.
Éditer manuellement des fonctions existantes
Mais pourquoi voudrions-nous éditer manuellement les fonctions de Spark ? Car ZIO ne se résume pas qu’à wrapper les effets dans ZIO. Prenons l’exemple de la fonction show
. Cette dernière affiche par défaut les 20 premières lignes d’un Dataset
dans la console. C'est une action, ainsi notre plugin va générer la fonction suivante :
def show(): Task[Unit] = ZIO.attempt(underlying.show())
Cette fonction est ennuyante à tester pour l’utilisateur, il va forcément devoir passer par un mock puisque show
se base sur la fonction println
de Scala. Un utilisateur de ZIO n’a pas l’habitude de mocker print
parce qu’il utilise Console.printLine
à la place ! C’est chose faite dans zio-spark, la fonction show
a été réécrite pour se baser sur Console.printLine
. Ainsi, vous pouvez écrire le test suivant sans encombre :
test("Dataset should implement show correctly") {
val result =
"""+---------+---+
>| name|age|
>+---------+---+
>| Maria| 93|
>| John| 24|
>| Peter| 19|
>|Cassandra| 46|
>+---------+---+
>
>""".stripMargin('>')
for {
df <- read
_ <- df.show
output <- TestConsole.output
representation = output.mkString("\n")
} yield assertTrue(representation == result)
} @@ silent
Ce comportement est très spécifique à la fonction show, la génération automatique de code n’est pas faite pour cela, alors comment l’intégrer ?
Insérer du code custom dans la génération
Ce code, il va falloir l’écrire nous même pardi ! Rassurez vous, si c'était aussi simple, je n’aurais pas pris la peine d’écrire cette partie.
La façon la plus naïve de le faire est de tagger cette fonction afin qu’elle ne soit pas automatiquement générée puis de la rajouter à la main en l’écrivant sous forme de string pour l’insérer dans le string final. Avec cette méthode, nous manipulons un string ainsi, il n’y a pas d’autocomplétion, pas d’erreurs au compile time, pas de coloration syntaxique. Pour résumer, ce n’est pas plaisant.
Il nous fallait un moyen d’écrire du code Scala qui serve uniquement pour la génération de code. Ce dernier doit se baser sur l’ancienne classe déjà générée d’un Dataset
afin de pouvoir réutiliser des fonctions déjà générées auparavant.
Voici la marche à suivre que nous avons adoptée :
- Nous nous sommes servis du dossier
it
pour entreposer le code custom de nos générations. Ce dossier est censé être destiné aux tests d’intégrations comme l’indique la documentation de SBT. Nous l’avons un peu détourné de son but premier. Il a l’avantage d’être reconnu aisément par l’IDE et de pouvoir utiliser notre librairie zio-spark. - Dans ce dossier, nous avons créé des fichiers “overlay”. Ces fichiers contiennent les fonctions qui seront insérées lors de la prochaine génération de code. Il existe deux types de fichiers d’overlay, dans le cas du
Dataset
,DatasetOverlay.scala
etSpecificDatasetOverlay.scala
. Le premier contient le code à insérer dans la génération, qu’importe la version de Scala. Le deuxième quant à lui est associé à une version. - Un fichier “overlay” n’est pas qu’une série de fonctions. Comme je vous l’ai dit, ce code doit compiler et donc être du code valide. Pour ce faire, nous avons opté pour la structure suivante :
import zio._ import zio.spark.sql._ class DatasetOverlay[T](self: Dataset[T]) { import self._ // template:on // template:off }
Le code à l’intérieur de la class
DatasetOverlay
, entre les balises template sera pris en compte lors de la génération. Il faut voir cette classe comme une extension d’unDataset
, ce faisant, il peut utiliser toutes ses fonctions. - Maintenant que nous avons ces fichiers, le plugin va les prendre en compte lors de la génération. Tout comme pour les fichiers Spark, nous avons utilisé Scalameta pour extraire les fonctions entre les balises template. Nous les rajoutons ensuite dans le fichier final généré.
Conclusion
Voilà dans les grandes lignes comment nous sommes arrivés à générer du code zio-spark à partir du code Spark. Je me suis concentré ici sur la class Dataset
, mais cette méthode a ensuite été généralisée pour d’autres classes de Spark telle que la classe SparkContext
. Je l’ai rapidement mentionné avec les fichiers “overlay” mais cette méthode fonctionne pour différentes versions de Scala et de Spark au sein de notre système.
La génération nous a fait gagner énormément de temps puisqu’elle nous a évité d’écrire une énorme partie de notre codebase manuellement. Elle s’est aussi avérée très utile lorsque nous avons eu besoin de faire des gros refactoring. Ça a notamment été le cas lorsqu’il a fallu rajouter les traces avec ZIO 2 ou alors lorsque nous avons décidé de supporter Scala 3. Là où de telles actions auraient dû être coûteuses à implémenter, elles n’ont pris que quelques minutes.
Si jamais vous voulez aller plus loin, je vous conseille de regarder le plugin directement : https://github.com/univalence/zio-spark/tree/master/zio-spark-codegen.