Scala, Avro Serde et Schema registry

L'un des formats binaires les mieux reconnus et les plus efficaces avec Kafka, c'est le format Avro. Le format binaire présente l'avantage de ne prendre que peu de place par rapport à la plupart des formats lisibles (XML, Json, Yaml...), ce qui permet de gagner en débit. La serde est a priori plus rapide, car elle n'implique de parsing ou de mapping complexes. Avro est un format binaire qui est géré par la plateforme Confluent depuis quelques temps, en particulier pour ce qui concerne la partie schema registry pour gérer les schémas Avro (depuis la version 2.5.0, le format Protobuf est maintenant géré par la plateforme). L'outillage Java sur le sujet Kafka et Avro est assez simple à trouver et à utiliser, mais qu'en est-il de l'outillage Scala en respectant l'approche proposée par le langage ?

Avro4S pour la Serde

Pour la serde Avro, dans le cadre de Scala, il vous faudra utiliser la bibliothèque avro4s. Cette bibliothèque permet à la fois de générer des schémas Avro depuis des case classes et les serde dédiées sur différents formats.

avro4s se base sur Magnolia, que nous avons déjà découvert sur ce blog et qui permet ici de calculer le schéma et le code de Serde à la compilation.

Schema

avro4s se base donc sur Magnolia. Cette bibliothèque se base des macros Scala pour générer du code à la compilation autour de case class, ce qui permet de ne pas utiliser la réflexion pour avoir un schéma.

package io.univalence.data

case class User(id: String, name: String, age: Option[Int], friendNames: List[String])

Pour récupérer le schéma Avro d'une case class, il suffit d'utiliser AvroSchema.

AvroSchema[User]

Ce qui donne

{
  "type": "record",
  "name": "User",
  "namespace": "io.univalence.data",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "friendNames",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

AvroSchema est défini comme suit

object AvroSchema {
  def apply[T](implicit schemaFor: SchemaFor[T],
               fieldMapper: FieldMapper = DefaultFieldMapper): Schema =
    schemaFor.schema(fieldMapper)
}

C'est l'instance de SchemaFor sur User qui est générée automatiquement par Magnolia à la compilation. Nous pouvons voir que la conversion en schéma Avro dépend de fieldMapper qui permet de modifier la conversion des noms de champ dans la case class vers les noms de champs dans le schéma. Par défaut, il n'y a pas de transformation. Mais il est possible de convertir en snake case (utilisation de '_' comme délimiteur), en Lisp case ('-'), en Pascal case (camel case avec la première lettre en majuscule) ou de définir son fieldMapper.

Serde

avro4s permet de générer des serde pour des case class. En utilisant avro4s-kafka, il est possible de générer des serde compatibles avec Kafka.

avro4s propose trois formats de sérialisation :

  • Binary : format binaire
  • Data : format binaire incluant le schéma
  • Json : les données sont converties en Json (sans schéma)

La création d'une serde pour un type particulier passe par la création d'une instance de GenericSerde pour ce type en précisant le format.

import com.sksamuel.avro4s.kafka.GenericSerde
import com.sksamuel.avro4s._

val serde = new GenericSerde[User](JsonFormat)

Prenons cet exemple

val users = List(
  User("be-ji", "John", Some(32), List("Marc", "Sophie")),
  User("sh-gh", "Mary", None, Nil)
)

Voici comment sérialiser puis une désérialiser le contenue de la liste

// serialize
val data: List[Array[Byte]] =
  users.map(user => serde.serialize("user-stream", user))
data.foreach(d => println(new String(d)))

// deserialize
val results =
  data.map(d => serde.deserialize("user-stream", d))
results.foreach(println)

Le premier paramètre de serialize et de deserialize correpond au nom du topic Kafka où la serde est appliquée. Ce paramètre fait parti du contrat imposé par Kafka pour les implémentation de serde. Dans le cadre d'avro4s, ce paramètre n'est utilisé.

{"id":"be-ji","name":"John","age":{"int":32},"friendNames":["Marc","Sophie"]}
{"id":"sh-gh","name":"Mary","age":null,"friendNames":[]}

User(be-ji,John,Some(32),List(Marc, Sophie))
User(sh-gh,Mary,None,List())

val serde = new GenericSerde[User](BinaryFormat)

Obj   avro.schema�{"type":"record","name":"User","namespace":"io.univalence.data","fields":[{"name":"id","type":"string"},{"name":"name","type":"string"},{"name":"age","type":["null","int"]},{"name":"friendNames","type":{"type":"array","items":"string"}}]} avro.codenull���ɌeϿ���� �UW� 6
be-jJohn @Marc Sophie��ɌeϿ���� �UW�
Obj   avro.schema�{"type":"record","name":"User","namespace":"io.univalence.data","fields":[{"name":"id","type":"string"},{"name":"name","type":"string"},{"name":"age","type":["null","int"]},{"name":"friendNames","type":{"type":"array","items":"string"}}]} avro.codenull��D�E�NQ�s�� �q  
sh-gMary�D�E�NQ�s�� �q 

User(be-ji,John,Some(32),List(Marc, Sophie))
User(sh-gh,Mary,None,List())

val serde = new GenericSerde[User](DataFormat)

�be-jJohn@MarcSophie�
�sh-gMary��

User(be-ji,John,Some(32),List(Marc, Sophie))
User(sh-gh,Mary,None,List())

Avrohugger et les AVDL

Pour générer des case class à partir de fichier AVDL (Avro IDL), vous pouvez utiliser sbt-avrohugger. sbt-avrohugger se base sur avro-hugger. Ce dernier est une bibliothèque qui génère du code en écrivant directement le contenu, sans passer par les macros Scala.

Il y a aussi un autre plugin : sbt-avro4s. Cependant, l'auteur explique que ce plugin n'est plus maintenu et qu'il convient d'utiliser sbt-avrohugger.

@namespace("io.univalence.data")

protocol Domain {

  enum ContactType {
    ADDRESS, EMAIL, NONE
  }

  record User {
    string id;
    string name;
    int age;
    ContactType contactType;
    union { null, string } contact;
  }

}

sbt-avrohugger propose plusieurs façon de générer du code à partir de fichiers AVDL.

Dans le fichier Domain.scala, pour un paramétrage simple, nous obtenons le code suivant

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package io.univalence.data

object ContactType extends Enumeration {
  type ContactType = Value
  val ADDRESS, EMAIL, NONE = Value
}

final case class User(id: String, name: String, age: Int, contactType: ContactType.Value, contact: Option[String])

final case class User(var id: String, var name: String, var age: Int, var contactType: ContactType, var contact: Option[String]) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this("", "", 0, null, None)
  def get(field$: Int): AnyRef = // ...
  def put(field$: Int, value: Any): Unit = // ...
  def getSchema: org.apache.avro.Schema = User.SCHEMA$
}

object User {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(/* AVRO schema as JSON string */)
}

package io.univalence.data;
@org.apache.avro.specific.AvroGenerated
public enum ContactType implements org.apache.avro.generic.GenericEnumSymbol<ContactType> {
  ADDRESS, EMAIL, NONE  ;
  // ...
}

enum = JavaEnum

Pas de résultat avec avrohugger.types.ScalaCaseObjectEnum et avrohugger.types.ScalaEnumeration.

enum = avrohugger.types.EnumAsScalaString

final case class User(var id: String, var name: String, var age: Int, var contactType: String, var contact: Option[String])

Confluent schema registry

Confluent propose une bibliothèque pour lié votre service Kafka Streams au schema registry en utilisant le format Avro. Par contre, ce lien n'existe pas au niveau d'avro4s. Il reste donc à le créer.

Il est possible via Google de faire une recherche autour d'avro4s et schema registry. Vous allez trouver des Gist.

Nous proposons ici une variation qui permet de bénéficier d'un mode debug en plus de mode mock proposé par Confluent.

Il faut d'abord importer le client schema registry de Confluent. Celui-ci importe son propre client Kafka, qu'il faut exclure pour s'éviter des erreurs de compilation.

resolvers           += "confluent" at "https://packages.confluent.io/maven/"
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % "5.5.0" exclude ("org.apache.kafka", "kafka-clients")

Notre code propose trois types d'URL :

  • none : ce mode n'utilise pas de client schema registry. Il met en place une serde basic en JSON fournit par avro4s et correspondant au code vu précédemment. Il est à utiliser pour du debug ou des démos en local.
  • mock://... : ce mode propose de l'Avro binaire (fournit par avro4s) et un client schema registry mocké (fournit par Confluent).
  • http(s?)://... : ce mode propose de l'Avro binaire (fournit par avro4s) et utilise un client schema registry (fournit par Confluent).

import com.sksamuel.avro4s._
import com.sksamuel.avro4s.kafka.GenericSerde
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, MockSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.Serdes

trait SerdeGenerator {
  def serdeFor[A >: Null: Encoder: Decoder: SchemaFor]: Serde[A]
}

object SerdeGenerator {

  def fromUrl(schemaRegistryUrl: String): SerdeGenerator =
    if (schemaRegistryUrl.toLowerCase() == "none") {
      new BasicSerdeGenerator(JsonFormat)
    } else {
      lazy val serdeConfig: Map[String, _] = Map(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl
      )

      def schemaRegistry: SchemaRegistryClient =
        if (schemaRegistryUrl.startsWith("mock://"))
          new MockSchemaRegistryClient()
        else
          new CachedSchemaRegistryClient(schemaRegistryUrl, Int.MaxValue)

      new SchemaRegistryBasedSerdeGenerator(schemaRegistry, serdeConfig)
    }

}

class BasicSerdeGenerator(format: AvroFormat) extends SerdeGenerator {
  override def serdeFor[A >: Null: Encoder: Decoder: SchemaFor]: Serde[A] =
    new GenericSerde[A](format)
}

/** Generate serde based on Confluent schema registry.
  *
  * @param schemaRegistry schema registry client
  * @param serdeConfig configuration for the serde
  * @param isSerdeForRecordKeys
  */
class SchemaRegistryBasedSerdeGenerator(
    schemaRegistry: => SchemaRegistryClient,
    serdeConfig: Map[String, _],
    isSerdeForRecordKeys: Boolean = false
) extends SerdeGenerator {
  import scala.jdk.CollectionConverters._

  def serde: GenericAvroSerde = {
    val s = new GenericAvroSerde(schemaRegistry)
    s.configure(serdeConfig.asJava, isSerdeForRecordKeys)

    s
  }

  override def serdeFor[A >: Null: Encoder: Decoder: SchemaFor]: Serde[A] = {
    val rfA: RecordFormat[A] = RecordFormat[A]

    Serdes.fromFn(
      (topic, data) => serde.serializer().serialize(topic, rfA.to(data)),
      (topic, bytes) => Option(rfA.from(serde.deserializer().deserialize(topic, bytes)))
    )
  }
}

val serdeGenerator = SerdeGenerator.fromUrl(schemaRegistryUrl)
val userSerde      = serdeGenerator.serdeFor[User]

Référence

Stéphane Derosiaux. Serializing data efficiently with Apache Avro and dealing with a Schema Registry. 2017