Spark 2.4 : #TheWaitIsOver Tour d'horizon de la nouvelle version

La version 2.4 d'Apache Spark est sortie fin octobre 2018. Cette version intègre de nouvelles fonctionnalités et son lot de correctifs. Ce qui inclut notamment la mise à jour des dépendances (Scala 2.12, Kafka 2.0), un support natif d'Avro, une amélioration du support de Kubernetes, de nouvelles opérations pour gérer les collections et autres types complexes en Spark SQL, une amélioration de l'API data source v2, ainsi que d'autres nouveautés.

Nous allons dans cet article passer en revue les nouveautés de la version 2.4. Nous verrons notamment que ces nouveautés apportent non seulement de meilleures performances, mais qu'elles offrent plus de flexibilités en termes de traitement des données.

Support de Scala 2.12

Spark 2.4 vient déjà avec le support très attendu de Scala 2.12. Cette version de Scala sortie en version stable il y a 2 ans a été mise à jour régulièrement : nous en sommes à la 2.12.7 depuis fin septembre (avec un bug fix spécifique pour Spark 2.4). Pour information, il est prévu que Scala 2.13 sorte prochainement en version stable.

Scala 2.12 inclut une meilleure intégration avec les nouveautés apportées avec Java 8 (correspondance directe entre trait côté Scala et interface côté Java, utilisation des types SAM, utilisation d'invokedynamic pour les lambda expressions), de nouvelles optimisations (inlining, allocations des closures, détection de code mort, suppression plus efficace des (un)boxing...) et pour ceux qui ont fait du typechecking leur chemin de croix cheval de bataille, la correction du bug SI-2712.

Vous trouverez plus de détails sur ces fonctionnalités sur le blog d'Ippon Technologies : partie 1, partie 2 et partie 3.

Spark sur Kubernetes

Depuis la version 2.3, les utilisateurs de Spark ont la possibilité d'utiliser Kubernetes comme orchestrateur, en alternative par rapport à Yarn, Mesos ou en mode standalone.

Le support de Kubernetes dans Spark permet aux développeurs de livrer des images Docker, à la place du traditionnel JAR déployé avec spark-submit. De plus, Kubernetes offre un grand nombre de facilités dans la gestion, l'orchestration et le monitoring d'un cluster, en le comparant avec Yarn. Il permet notamment la mise en place de quota sur les ressources du cluster, la gestion de namespace (pour avoir du multi-tenant sur le même cluster, par exemple), etc.

La version 2.4 de Spark ajoute d'autres possibilités. Tout d'abord, un support de PySpark (pour Python 2.x et 3.x) et un support de SparkR. Des templates de Dockerfile sont mis à disposition pour ces deux langages.

Un mode client est aussi proposé. Il s'agit de faciliter l'exécution d'outils interactifs, comme spark-shell ou les notebooks, depuis un pod géré par un cluster Kubernetes ou depuis une machine cliente hors du cluster Spark.

Support intégré d'Avro

Avro est un format binaire de sérialisation de données très utilisé dans le monde Big Data, avec Hadoop et Spark, mais en particulier avec Kafka pour les données temps réel.

Jusqu'à présent l'intégration d'Avro se faisait en passant par un addon mis à disposition par Databricks. Depuis la version 2.4 de Spark, Avro a été ajouté aux formats supportés par défaut (JSON, Parquet, JDBC, CSV, texte), en améliorant au passage les performances.

En plus de la lecture et l'écriture des données en Avro, deux fonctions sont mises à disposition pour Avro : from_avro et to_avro. Ces fonctions sont les équivalentes de from_jon et to_json. Elles permettent de respectivement lire (désencapsulation) et écrire des données Avro (encapsulation) dans les champs d'un dataframe.

Spark 2.4 supporte une version plus moderne d'Avro, la version 1.8.2. Cette version majeure d'Avro qui est apparue 2 ans après la version 1.7 (début 2016) a ajouté le support des types logiques(*), avec par défaut les décimales, les dates et les timestamps.

(* Il s'agit d'une mécanique similaire à d'autres formats de sérialisation comme EDN avec les tagged elements.)

Support de Kafka 2.0.0

La version du client Kafka est mis à jour et passe de la version 0.10.0.1 à la version 2.0.0.

Le support des nouvelles versions de Kafka permet la gestion des transactions de Kafka dans Spark Streaming [SPARK-25005]. Ce support permet de choisir de lire :

  • soit les messages non transactionnels et les messages dans les transactions commitées,
  • soit de lire tous les messages disponibles, même ceux qui sont dans des transactions encore ouvertes ou interrompues.

Cette fonctionnalité est activée dans Spark par la propriété kafka.isolation.level. Les valeurs possibles sont read_committed pour prendre uniquement en compte les données commitées et read_uncommited, qui est la valeur par défaut.

Voici un exemple de code permettant de lire un flux sur un topic Kafka. Nous ne considérons que les offset commités. Pour chaque message récupéré, nous tentons de convertir la valeur en Int.

Il reste néanmoins le support des headers Kafka à intégrer dans Spark. Cette fonctionnalité est en cours de développement [SPARK-23539].

spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", brokerAddress)
 .option("kafka.isolation.level", "read_committed")
 // from the beginning of the topic
 .option("startingOffsets", "earliest")
 .option("subscribe", topic)
 .load()
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 .as[(String, String)]
 .flatMap({case (k,v) => Try(v.toInt).toOption})

Opération sur les collections

Une bonne partie des données à traiter à travers Spark transparaissent sous forme de structures imbriquées, incluant des collections (array) et autres types complexes (struct). c'est le cas en particulier dès lors que nous devons traiter des documents JSON, XML, certaines données remontées par Cassandra...

Jusque-là, Spark n'offrait pas de fonctions permettant de traiter le cas des collections. Il était alors nécessaire de passer par des UDF créés à la main pour transformer des colonnes dès lors qu'elles contiennent des listes ou des tables de propriétés, par exemple.

Depuis Spark 2.4, cela n'est plus une nécessité. Cette version fournit tout un ensemble de fonctions au niveau SQL et DataFrame pour gérer les types collection et les types complexes. En plus de faciliter la gestion des collections, ces fonctions sont aussi très performantes.

Enfin, cette fonctionnalité devrait faciliter d'autant plus l'adoption de formats contenant des structures imbriquées (disponibles à travers JSON, Avro, Cassandra...) et réduire très fortement le besoin de passer par des structures en Scala, Java ou Python pour traiter les données dîtes complexes.

Fonction d'ordre supérieur

Parmi ces nouvelles fonctions, nous avons la possibilité de pouvoir utiliser des fonctions dîtes d'ordre supérieur dans les requêtes SQL. Pour faire simple, une fonction d'ordre supérieur est une fonction qui prend en entrée une autre fonction. Ceci permet d'avoir des opérations génériques qui laissent au développeur le soin d'y intégrer un traitement spécifique par composition.

Par exemple, supposons que nous cherchons à incrémenter les entiers du dataset nested_data ci-dessous :

{ key: "a", values: [2,3,4] }
{ key: "b", values: [5,6,7] }

Avec Spark 2.4, nous pouvons écrire notre transformation directement dans la requête SQL en utilisant la fonction d'ordre supérieur transform :

SELECT  key,
        values,
        TRANSFORM(values, value -> value + 1) AS values_plus_one
FROM    nested_data

transform prend en entrée deux paramètres : values qui correspond à la collection à transformer et value → value + 1 qui représente le traitement spécifique à appliquer à chaque élément de la collection.

Avec l'exemple ci-dessous, nous obtenons donc :

{ key: "a", values: [2,3,4], values_plus_one: [3,4,5] }
{ key: "b", values: [5,6,7], values_plus_one: [6,7,8] }

Pour faire la même chose auparavant, il fallait passer par une UDF :

val aPlusOne: Seq[Int] => Seq[Int] =
  values => values.map(value => value + 1)

sparkSession.udf.register("aPlusOne",aPlusOne)

Et pour la requête SQL :

SELECT  key,
        values,
        aPlusOne(values) AS values_plus_one
FROM    nested_data

D'autres opérations d'ordre supérieur sont mises à dispositions :

  • filter : pour ne conserver que les éléments satisfaisant un prédicat.
  • aggregate : pour agréger les éléments d'une collection selon une fonction d'agrégation (à la manière de foldLeft en Scala).
  • zip_with : associe les éléments de deux collections de même indice et applique à ces éléments la fonction passée en paramètre pour les "fusionner".

Un peu plus pour les tableaux et les maps !

Cette version de Spark intègre aussi de nouvelles fonctions au niveau SQL et Dataframe pour la manipulation de tableaux et des maps.

array_union([1,2,3], [4,5,6])          = [1,2,3,4,5,6]
array_except([1,2,3], [4,3,2])         = [1]
flatten([[1,2,3], [4,5], [6]])         = [1,2,3,4,5,6]
array_repeat(2, 3)                     = [2,2,2]
arrays_zip([1,2] as i, ['a','b'] as j) = [{i:1, j:'a'}, {i:2, j:'b'}]
map_entries(map([1, 2], ['x', 'y']))   = [ROW(1, 'x'), ROW(2, 'y')]

La liste complète des fonctions ajoutées dans Spark est dans le ticket JIRA [SPARK-23899].

Data Source API v2

L'API Data Source est tout ce qui permet de lire ou écrire des données à travers Spark sur des systèmes de persistance (base de données, HDFS, Elasticsearch, Kafka...). La version précédente est simple et fonctionne dans la plupart des cas d'utilisation. Mais, elle est aussi fortement couplée avec l'API Spark de niveau supérieur (SparkContext, RDD, DataFrame), ce qui réduit la flexibilité de l'API Data Source en termes d’évolution. Elle ne permet pas non plus de bénéficier d'optimisations au niveau Spark, sachant qu'elle ne remonte pas d'informations du stockage physique comme le partitionnement ou l'organisation des données. Enfin, elle ne permet pas d'ajouter simplement de nouvelles opérations empilables (eg. filtrage, limitation de la quantité de données, échantillonnage...), elle n'offre pas de lecture en colonne et ne supporte le streaming.

Dans sa version actuelle Data source API v2 est avant tout un refactoring de la v1 qui permettra au fur et à mesure de faire évoluer plus rapidement l'API elle-même. Ainsi, la nouvelle API ne se base plus sur des trait Scala, mais sur des interface Java, pour des raisons d'interopérabilité. La nouvelle API s'affranchit de dépendances de l'API Spark et introduit la notion de InternalRow, pour des manipulations propres à l'API Data Source avant de passer au type Row.

Les évolutions de l'API sont à suivre dans le ticket Jira suivant : [SPARK-22386].

Et aussi…

Spark 2.4 inclut en plus :

  • Des améliorations dans PySpark, notamment en intégrant d'autant plus des UDFs Panda, déjà introduit dans la version 2.3, et aussi à travers la mise en place d'un modèle d'évaluation gloutonne (eager) pour les dataframe pour une meilleure expérience utilisateur dans le cadre par exemple de Jupyter (en utilisant la propriété spark.sql.repl.eagerEval.enabled).
  • Structured Streaming : les données de chaque micro-batch sont encodées dans un DataFrame et plus seulement dans un RDD.
  • Image Source : une fonction de lecture des images a été ajoutée dans Spark 2.3. En 2.4, il est possible de charger une image depuis spark.read.
  • Améliorations dans SQL : optimisations des drivers et amélioration des performances.
  • Parquet : passage à la version 1.10.0.
  • Lecteur ORC intégré et passage à la version 1.5.2.
  • Le Barrier Scheduling (qui fera à lui seul l'objet d'un autre article de blog, expliquant l'intérêt de cette fonctionnalité et son fonctionnement).

Conclusion

En mettant beaucoup de composants à jour (Avro, Kafka, Scala), les gains en performance et l'amélioration du support des types complexes. Cette version est définitivement à déployer très rapidement sur les clusters.

Certaines de ces fonctionnalités changent vraiment la donne et peuvent grandement améliorer les projets sur Spark.

Néanmoins, cette version est encore un peu jeune. Il convient de ne déployer que certains projets sur cette version en mettant à jour progressivement les autres projets dès que la 2.4.1 sera sortie.