Le stream processing est l'outil de base d'un monde constamment en modification. Kafka Streams fait partie de ces outils en facilitant la création de tel traitement sur la base d'architecture microservice. Et comme souvent dans ce type d'architecture, vous allez vous retrouver avec des services stateful, ce qui veut dire que vous allez y stocker un état qu'il faudra mettre à jour. Kafka Streams facilite aussi la mise en place d'une fonctionnalité stateful.

Cependant, si stocker des états est une nécessité, vous ne pourrez pas non plus stocker à l'infinie vos données. Il faudra alors pouvoir libérer de la place en suivant des critères. Kafka Streams propose quelques stratégies pour libérer de l'espace de stockage. Ces stratégies sont utilisables si vos données partagent des caractéristiques communes. Dans le cas contraire, il vous faudra fouiller plus profondément la boîte à outils qu'est Kafka Streams afin de bénéficier d'une stratégie plus adaptée.

Dans cet article, nous allons voir ces deux types de stratégie.

Supprimer des données avec des caractéristiques communes

Supprimer des données sur la base de caractéristiques individuelles

La suppression de données sur la base de caractéristiques individuelles ne fait pas partie du DSL haut niveau proposé par Kafka Streams.

Prenons le cas, d'une information avec une durée de validité. Cette date de validité peut varier de quelques jours à plusieurs mois, voire bien plus. Par exemple, j'ai des messages qui m'indiquent si des produits vendus en magasin sont aussi vendus sur le

import java.time.{Duration, Instant}
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.{ProcessorContext, PunctuationType, Punctuator}
import org.apache.kafka.streams.state.KeyValueStore

class StoreExpirationPurger[K, V](
    storeName: String,
    checkInterval: Duration,
    expiryOf: V => Instant)
    extends Transformer[K, V, KeyValue[K, V]] {
  import scala.jdk.CollectionConverters._

  private var stateStore: KeyValueStore[K, V] = _
  private var context: ProcessorContext       = _

  override def init(context: ProcessorContext): Unit = {
    this.context = context
    this.stateStore = context.getStateStore(storeName)
    context.schedule(
      checkInterval,
      PunctuationType.STREAM_TIME,
      new Punctuator {
        override def punctuate(ts: Long): Unit = {
          val timestamp = Instant.ofEpochMilli(ts)

          stateStore
            .all()
            .asScala
            .foreach { kv =>
              if (kv.value != null && expiryOf(kv.value).isBefore(timestamp)) {
                context.forward(kv.key, null)
              }
            }
        }
      }
    )
  }

  override def transform(key: K, value: V): KeyValue[K, V] = {
    if (value == null) {
      stateStore.delete(key)
    } else {
      stateStore.put(key, value)
    }

    KeyValue.pair(key, value)
  }

  override def close(): Unit = ()
}