Historisation de données avec Parquet

Nous allons voir aujourd'hui comment il est possible de compresser fortement des données historisées, par exemple une même table pour des dates différentes, donc en principe n'ayant pas un changement conséquent de données au quotidien en utilisant simplement le format de fichier Parquet, assez connu aujourd'hui.

Introduisons tout d'abord le RLE pour Run-Length Encoding (codage par plages si l'OQLF est dans le coin), qui est un algorithme de compression de données sans pertes. Youhou !

Cet algorithme est principalement utilisé pour les documents noirs et blancs comme le fax 👴📠 et pour certains formats de stockage d'images comme le JPEG et BMP.

Run-Length Encoding

Prenons le cas d'une image qui représente une lettre de l'alphabet en noir sur un fond blanc. Nous savons que cette image est en fait composé de pixels, soit noirs, soit blancs. Dans cette image, nous avons beaucoup de pixels blanc successifs suivis d'un nombre plus ou moins important de pixels noirs puis à nouveau de pixels blancs.

Le principe du RLE consiste à dire qu'au lieu de stocker toute l'information qui indique la couleur du pixel, il suffit de stocker seulement le nombre de pixels puis la couleur du pixel.

Supposons que B = pixel blanc et N = pixel noir. Pour pouvoir stocker 10 pixels blancs sans compression, nous devons stocker BBBBBBBBBB. Mais avec RLE, nous pouvons dire 10B (avec "10" étant écrit au format binaire 0x000a), ce qui prend beaucoup moins de place.

Bit-packing

Parquet utilise aussi du bit-packing, une technique pour optimiser le rangement de données. En quelque mots : lorsqu'on déclare une variable, une taille en bits est réservée. Par exemple un int réserve 32 bits en général. Mais si l'entier est déclaré avec une valeur de 1 alors nous n'utilisons que 1 bit sur ces 32 bits et il est possible de ranger d'autres données dans l'espace restant.

What now?

Imaginez maintenant que vous pouvez utiliser ces deux techniques en même temps : cela ferait de vous un grand maître absolu de la compression. N'est-ce pas ?

Dans les faits, cette alliance de techniques a déjà été mis en place par le biais d'un format de fichier assez familier en data engineering : le Parquet. <EOD> (End Of Dream).

Alors que vos rêves de création d'un nouvel algorithme de compression s'évaporent, soudainement, une terrible idée se propage dans votre esprit.

Parquet utilise donc un mélange de RLE et de bit-packing. Nous avons vu que le RLE marche très bien dans le cas où la donnée est similaire sur un nombre assez long de bits. De plus, il faut savoir que Parquet stocke la donnée en colonne ce qui signifie que dans un bloc de données d'un disque dur, on essaie de stocker toute la donnée d'une colonne et non pas d'une ligne.

Imaginons maintenant que nous travaillons au sein d'une entreprise qui souhaite stocker les données d'une table chaque jour — ce qui arrive assez couramment. Et imaginons aussi que nous l'avons fait pendant une semaine pour commencer.

Pour simplifier, supposons que la table ressemble à ça pour le 5 avril 2018 :

iddatadump_date
idTigrou1some data20180405
idTigrou2some more data20180405
idTigrou2data20180405

Ici, nous avons id qui nous servira de clé primaire de la table Tigrou (si nous ne connaissons pas la clé primaire, c'est beaucoup plus compliqué mais cela ne sera pas traité ici). Nous avons également la date à laquelle nous avons stocké cette table et bien sûr de la donnée.

L'idée que nous avons eu précédemment se dessine de plus en plus clairement :

Pouvons-nous "ranger" la donnée de l'ensemble des tables Tigrou de manière à prendre avantage du format Parquet qui utilise un stockage colonne et des optimisations de compression comme le RLE et le bit-packing pour pouvoir ainsi fortement réduire la taille qu'occupe ces tables tout en pouvant requêter simplement la nouvelle table super compressée résultant de ce rangement ?

Et bien la réponse* est :

(* : Nous aurons besoin d'Impala pour requêter la donnée sans s'arracher les cheveux)

Voyons alors comment faire ça ! Et voyons aussi à la fin des résultats de ce rangement sur des données bien réelles 😇

Reprenons les données des tables Tigrou (seulement sur trois jours pour ne pas avoir à allonger l'exemple).

Tigrou pour le 20180405

+---+----+--------+
|id |data|date    |
+---+----+--------+
|1  |toto|20180405|
|2  |titi|20180405|
|3  |tata|20180405|
+---+----+--------+

Tigrou pour le 20180406


+---+------+--------+
|id |data  |date    |
+---+------+--------+
|1  |tototo|20180406|
|2  |titi  |20180406|
|3  |tatata|20180406|
+---+------+--------+

Tigrou pour le 20180407


+---+--------+--------+
|id |data    |date    |
+---+--------+--------+
|1  |tototo  |20180407|
|2  |titi    |20180407|
|3  |tatatata|20180407|
+---+--------+--------+

Notre but est d'arriver à une table super compressée qui contient la donnée des trois tables.

Nous pouvons commencer par faire l'union de ces trois tables (en utilisant la fonction unionByName expliqué dans cet article)

val dfs: Seq[(String, DataFrame)] = Seq(
      "20180405" -> Seq(("1", "toto"), ("2", "titi"), ("3", "tata")).toDF("id", "data"),
      "20190406" -> Seq(("1", "tototo"), ("2", "titi"), ("3", "tatata")).toDF("id", "data"),
      "20180407" -> Seq(("1", "tototo"), ("2", "titi"), ("3", "tatatata")).toDF("id", "data")
    )

val unionDF: DataFrame = dfs.map(df => df._2.withColumn("date", lit(df._1))).reduce(_ unionByName _)

unionDF.show(false)

Cela donne :

+---+--------+--------+
|id |data    |date    |
+---+--------+--------+
|1  |toto    |20180405|
|2  |titi    |20180405|
|3  |tata    |20180405|
|1  |tototo  |20180406|
|2  |titi    |20180406|
|3  |tatata  |20180406|
|1  |tototo  |20180407|
|2  |titi    |20180407|
|3  |tatatata|20180407|
+---+--------+--------+

Et ensuite nous pouvons grouper la donnée similaire par date :

val dateGroupDF: DataFrame = unionDF.groupBy(cols.map(expr): _*).agg(collect_list("date").as("date"))

dateGroupDF.show(false)

En résultat :

+---+--------+------------------------------+
|id |data    |date                          |
+---+--------+------------------------------+
|3  |tatata  |[20180406]                    |
|3  |tatatata|[20180407]                    |
|3  |tata    |[20180405]                    |
|2  |titi    |[20180405, 20180406, 20180407]|
|1  |tototo  |[20180406, 20180407]          |
|1  |toto    |[20180405]                    |
+---+--------+------------------------------+

Finalement on groupe par id :

val finalDF: DataFrame = dateGroupDF
.groupBy(groupBy.map(expr): _*)
.agg(collect_list(struct(c.columns.filter(x => !groupBy.contains(x)).map(expr): _*)).as("rest"))
.sort(sortBy.map(expr): _*)

finalDF.show(false)

Et on a enfin :

+---+------------------------------------------------------------------+
|id |rest                                                              |
+---+------------------------------------------------------------------+
|1  |[                                                                 |
|   |  [tototo, [20180406, 20180407]],                                 |
|   |  [toto,   [20180405]]                                            |
|   |]                                                                 |
|2  |[                                                                 |
|   |  [titi, [20180405, 20180406, 20180407]]                          |
|   |]                                                                 |                       
|3  |[                                                                 |
|   |  [tatata,   [20180406]],                                         |
|   |  [tatatata, [20180407]],                                         |
|   |  [tata,     [20180405]]                                          |
|   |]                                                                 |
+---+------------------------------------------------------------------+

Le RLE ici se fait entre tototo / toto et tatata, tatatata et tata !

Tout le code est disponible dans spark-tools/plumbus.

Nous verrons dans un prochain article comment créer des vues sur cette table qui permettent d'exploiter les données comme si nous avions les tables de départ.

J'aimerais pour finir vous partager quelques chiffres sur un test rapide fait sur des tables de production faisant ~350Mo en parquet pour un jour. En appliquant ce processus à ces tables pour cinq jours nous passons de 1.8Go à 629Mo ce qui représente un taux de compression de 65% 🥳

Les fichiers en entrée

Le fichier parquet en sortie

Avec du snappy en bonus