Traiter des Flux de Données avec Kafka¶
Apprenez à transformer, agréger et enrichir des données en temps réel avec Kafka Streams et Apache Flink.
Ce que Vous Allez Apprendre¶
À la fin de ce guide, vous serez capable de :
- ✅ Comprendre les concepts et cas d'usage du traitement de flux
- ✅ Créer des transformations sans état et avec état
- ✅ Implémenter du fenêtrage et des agrégations temporelles
- ✅ Joindre des flux et des tables en temps réel
- ✅ Utiliser Kafka Streams pour du traitement léger
- ✅ Utiliser Apache Flink pour de l'analytique avancée
- ✅ Déployer des applications de traitement de flux en production
Prérequis :
- Docker et Docker Compose installés
- Compréhension des producteurs et consommateurs
- Familiarité avec Java ou Python (pour les exemples de code)
Temps Estimé : 45 minutes
Démarrage Rapide : Exécuter Kafka avec Docker Compose¶
Utilisez cette configuration Docker Compose pour exécuter un cluster Kafka à nœud unique avec KRaft (pas besoin de Zookeeper) :
- Image Confluent Platform Kafka (inclut le mode KRaft)
- Port du broker Kafka exposé à l'hôte
- ID de nœud unique dans le cluster KRaft
- Ce nœud agit à la fois comme broker et controller (mode combiné)
- Listeners : PLAINTEXT pour les clients, CONTROLLER pour la communication interne du cluster
- Listener annoncé pour les clients externes
- Votants du quorum pour le consensus KRaft (format :
id@host:port) - ID de cluster unique - générer avec
docker run confluentinc/cp-kafka kafka-storage random-uuid - Répertoire de logs pour le stockage des données Kafka
- Désactiver la collecte de métriques Confluent Support pour une configuration locale minimale
- Nombre de partitions par défaut pour les nouveaux topics (bon pour le traitement de flux parallèle)
Démarrer Kafka et Créer les Topics¶
# Démarrer Kafka
docker-compose up -d
# Attendre que Kafka soit prêt
docker-compose ps
# Créer les topics d'entrée pour le traitement de flux
docker exec kafka-kraft kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic orders --partitions 3 --replication-factor 1
docker exec kafka-kraft kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic customers --partitions 3 --replication-factor 1 \
--config cleanup.policy=compact # (1)!
# Produire des données d'exemple
docker exec -i kafka-kraft kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic orders \
--property "parse.key=true" \
--property "key.separator=:" << EOF
ORD-001:{"order_id":"ORD-001","customer_id":"CUST-123","amount":99.99,"region":"US-WEST"}
ORD-002:{"order_id":"ORD-002","customer_id":"CUST-456","amount":149.50,"region":"US-EAST"}
ORD-003:{"order_id":"ORD-003","customer_id":"CUST-123","amount":75.25,"region":"US-WEST"}
EOF
# Arrêter Kafka
docker-compose down
- Topic compacté pour les données client (garde uniquement la dernière valeur par clé)
Vérifier la Configuration du Traitement de Flux
```bash # Lister les topics docker exec kafka-kraft kafka-topics \ --bootstrap-server localhost:9092 --list
# Consommer depuis le topic orders
docker exec kafka-kraft kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic orders --from-beginning --max-messages 3
```
Exécution des Applications Kafka Streams
Les applications Kafka Streams s'exécutent comme des processus Java séparés, pas à l'intérieur de Kafka :
```bash
# Construire votre application Kafka Streams
mvn clean package
# Exécuter localement (se connecte à Docker Kafka)
java -jar target/stream-processor-1.0.jar
```
Pour Flink, vous aurez besoin d'un cluster Flink séparé (voir la section [Stratégies de Déploiement](#stratégies-de-déploiement)).
Qu'est-ce que le Traitement de Flux ?¶
Le Traitement de Flux est le traitement en temps réel de flux de données continus. Contrairement au traitement par lot (qui traite des données historiques), le traitement de flux analyse les données dès leur arrivée—permettant des insights et actions instantanés.
Traitement de Flux vs Traitement par Lot
Traitement par Lot : Traiter les données en gros morceaux chaque heure/jour (ex: jobs ETL nocturnes)
Traitement de Flux : Traiter chaque événement immédiatement à son arrivée (ex: détection de fraude, tableaux de bord temps réel)
graph LR
A[Topic Commandes] -->|Flux| B[Processeur de Flux]
B -->|Filtrer| C[Commandes Valides]
B -->|Agréger| D[Revenu par Région]
B -->|Enrichir| E[Commandes + Info Client]
C --> F[Système Fulfillment]
D --> G[Tableau de Bord Analytics]
E --> H[Service Client]
!!! tip "Quand Utiliser le Traitement de Flux" - Analytique temps réel - Tableaux de bord live, métriques, surveillance - Actions pilotées par événements - Détection de fraude, alertes, notifications - Enrichissement de données - Joindre des événements avec des données de référence en temps réel - ETL continu - Transformer et router les données entre systèmes - Traitement d'événements complexes - Détecter des patterns à travers plusieurs événements
Frameworks de Traitement de Flux¶
L'écosystème Kafka offre deux frameworks principaux :
- Kafka Streams
Idéal Pour : Traitement léger et embarqué
Avantages : - Bibliothèque Java - pas de cluster séparé nécessaire - Sémantique exactly-once intégrée - Auto-scaling avec groupes de consommateurs - Déploiement simple (juste un fichier JAR)
Inconvénients : - Java/Scala uniquement (pas de Python) - Limité à Kafka comme source/sink - Fonctionnalités analytiques avancées limitées
- Apache Flink
Idéal Pour : Analytique avancée, CEP complexe
Avantages : - Interface SQL puissante - Fenêtrage et jointures avancés - Support Python (PyFlink) - Sources/sinks multiples (Kafka, bases de données, fichiers)
Inconvénients : - Nécessite un cluster séparé (JobManager + TaskManagers) - Complexité opérationnelle plus élevée - Empreinte mémoire plus lourde
!!! tip "Quel Framework Choisir ?" - Commencez avec Kafka Streams si vous avez besoin de transformations simples et voulez un minimum de surcharge opérationnelle - Utilisez Apache Flink si vous avez besoin de SQL, support Python, ou fenêtrage/jointures avancés
Étape 1 : Transformations Sans État (Filter, Map)¶
Les opérations sans état traitent chaque événement indépendamment sans mémoriser les événements précédents.
Exemple Kafka Streams : Filtrer et Transformer les Commandes¶
- ID d'application - utilisé pour le groupe de consommateurs et le nommage des state stores
- Lire le flux depuis le topic
orders- crée un KStream - Opération de filtre - garde uniquement les commandes > $100 (sans état)
- Opération de map - transforme chaque commande en ajoutant le flag VIP
- Écrire le flux filtré/transformé vers le topic
vip-orders
- Créer un DataStream Flink depuis la source Kafka
- Prédicat de filtre - retourner True pour conserver l'événement
- Opération de filtre - supprime les commandes de faible valeur
- Transformation map - ajoute le flag VIP et retourne une chaîne JSON
Cas d'Usage pour les Transformations Sans État¶
- Filtrage : Supprimer les événements invalides, router par type, filtrage de conformité
- Mapping : Conversion de format, extraction de champs, enrichissement avec des constantes
- FlatMap : Parser des événements complexes en plusieurs événements simples
Étape 2 : Agrégations Avec État (Count, Sum)¶
Les opérations avec état maintiennent un état à travers plusieurs événements—permettant des agrégations comme des comptages, sommes et moyennes.
Exemple Kafka Streams : Compter les Commandes par Région¶
- Re-keyer le flux par région - requis pour le groupement
- KTable représente l'état agrégé (région → comptage)
- Grouper par clé (région) - prépare pour l'agrégation
- Opération de comptage - maintient l'état dans le store RocksDB nommé "order-count-store"
- Convertir KTable en KStream et écrire vers le topic de sortie
- Transformer en tuple (région, 1) - 1 représente une commande
- Keyer par région (premier élément du tuple)
- Sommer le second élément (comptage) - Flink maintient l'état automatiquement
Gestion de l'État
Les opérations avec état nécessitent des state stores persistants : - Kafka Streams : Utilise RocksDB (base de données clé-valeur embarquée) + topic changelog pour la tolérance aux pannes - Flink : Utilise des snapshots de checkpoint vers du stockage distribué (HDFS, S3)
Étape 3 : Fenêtrage et Agrégations Temporelles¶
Le fenêtrage groupe les événements en buckets temporels—permettant des opérations comme "commandes par minute" ou "revenu par heure".
Types de Fenêtres¶
- Fenêtres Tumbling
Fenêtres de taille fixe, non chevauchantes
Cas d'Usage : "Compter les commandes toutes les 5 minutes"
- Fenêtres Hopping
Fenêtres de taille fixe, chevauchantes
Cas d'Usage : "Revenu par 10 minutes, mis à jour toutes les 5 minutes"
- Fenêtres Sliding
Fenêtre par événement (continue)
Cas d'Usage : "Moyenne des 100 derniers événements"
- Fenêtres Session
Fenêtres basées sur les écarts (pilotées par l'activité)
Cas d'Usage : "Sessions utilisateur avec timeout d'inactivité de 30min"
Exemple Kafka Streams : Fenêtre Tumbling (Revenu 5 Minutes)¶
- Transformer le flux en paires (région, montant)
- Définir une fenêtre tumbling - buckets fixes de 5 minutes
ofSizeWithNoGrace- pas de gestion des événements tardifs (les événements doivent arriver à l'heure)- Appliquer le fenêtrage au flux groupé
- Sommer les montants dans chaque fenêtre
- La clé fenêtrée contient à la fois la région et les métadonnées de fenêtre (heures début/fin)
- Colonne event time - timestamp extrait du payload du message
- Stratégie de watermark - permet 5 secondes d'événements tardifs
- Heure de début de fenêtre - début de la fenêtre tumbling de 5 minutes
- Fonction d'agrégation - sommer le revenu dans la fenêtre
- Définition de fenêtre tumbling - groupe les événements en buckets de 5 minutes
!!! tip "Event Time vs Processing Time" - Event Time : Timestamp provenant de l'événement lui-même (ex: heure de création de commande) - utilisez ceci pour une analytique précise - Processing Time : Timestamp quand l'événement est traité par le processeur de flux - plus simple mais moins précis
Étape 4 : Jointure de Flux et Tables¶
Les jointures combinent des données de plusieurs flux ou tables en temps réel—permettant l'enrichissement et la corrélation.
Types de Jointures¶
| Type de Jointure | Description | Cas d'Usage |
|---|---|---|
| Stream-Stream | Joindre deux flux d'événements | Corréler des événements liés (ex: commande + paiement) |
| Stream-Table | Enrichir un flux avec des données de référence | Ajouter les infos client aux commandes |
| Table-Table | Joindre deux tables (KTables) | Combiner des vues agrégées |
Exemple Kafka Streams : Enrichir les Commandes avec les Données Client¶
- KTable depuis un topic compacté - représente l'état actuel des clients
- Vue matérialisée stockée dans RocksDB pour des recherches rapides
- Re-keyer le flux de commandes pour correspondre à la clé de la table client (customer_id)
- Left join - conserve les commandes même si le client n'est pas trouvé (customerJson = null)
- Jointure avec la table client - la recherche se fait dans le state store (rapide)
- Value joiner - combine le JSON de commande et client en un événement enrichi
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = StreamTableEnvironment.create(env_settings)
# Définir le flux de commandes
table_env.execute_sql("""
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
amount DOUBLE,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# Définir la table clients (topic compacté)
table_env.execute_sql("""
CREATE TABLE customers (
customer_id STRING,
name STRING,
email STRING,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'customers',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset', -- (1)!
'format' = 'json'
)
""")
# Joindre les commandes avec les clients
result = table_env.sql_query("""
SELECT
o.order_id,
o.customer_id,
o.amount,
c.name AS customer_name, -- (2)!
c.email AS customer_email
FROM orders o
LEFT JOIN customers c -- (3)!
ON o.customer_id = c.customer_id
""")
result.execute().print()
- Lire toutes les données client depuis le début pour construire l'état
- Enrichir la commande avec le nom et l'email du client
- Left join - conserve les commandes même si le client n'est pas trouvé
!!! warning "Considérations de Performance des Jointures" - Les jointures Stream-Table sont rapides (recherche en mémoire) mais nécessitent que la table tienne en état - Les jointures Stream-Stream nécessitent du fenêtrage (joindre les événements dans une fenêtre temporelle) - Utilisez des topics compactés pour les tables afin de garder l'état petit (uniquement la dernière valeur par clé)
Étape 5 : Patterns Avancés¶
Pattern 1 : Sessionisation (Fenêtres de Session)¶
Suivre les sessions utilisateur avec timeout d'inactivité :
// Fenêtre de session : grouper les événements avec un écart <30 minutes dans la même session
SessionWindows sessionWindow = SessionWindows.ofInactivityGapWithNoGrace(
Duration.ofMinutes(30) // (1)!
);
KTable<Windowed<String>, Long> sessionCounts = clickstream
.groupByKey()
.windowedBy(sessionWindow) // (2)!
.count();
- Si aucun événement pendant 30 minutes, la session se termine
- Fenêtres dynamiques - chaque utilisateur peut avoir une durée de session différente
Pattern 2 : Déduplication¶
Supprimer les événements dupliqués en utilisant l'état :
KStream<String, String> deduplicated = orders
.groupByKey()
.reduce( // (1)!
(oldValue, newValue) -> newValue, // Garder le plus récent
Materialized.as("dedup-store")
)
.toStream();
- Reduce garde une seule valeur par clé - déduplique effectivement
Pattern 3 : Flux Changelog¶
Convertir KTable (état) en flux changelog :
KTable<String, Long> counts = orders.groupByKey().count();
KStream<String, Long> changelog = counts.toStream(); // (1)!
// Émet : (key=US-WEST, value=1), (key=US-WEST, value=2), ...
- Changelog émet chaque mise à jour d'état - utile pour les systèmes downstream
Étape 6 : Sémantique Exactly-Once¶
Kafka Streams supporte le traitement exactly-once (EOS) pour éviter les résultats dupliqués.
Activer Exactly-Once dans Kafka Streams¶
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2); // (1)!
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
- Exactly-once v2 - utilise des écritures transactionnelles et des producteurs idempotents
!!! danger "Exigences Exactly-Once" - Kafka 2.5+ requis pour EOS v2 - Facteur de réplication ≥ 3 recommandé - min.insync.replicas ≥ 2 - Légère surcharge de performance (~5-10%) vs at-least-once
Bonnes Pratiques¶
- À FAIRE
- Utiliser event time (pas processing time) pour le fenêtrage
- Activer la sémantique exactly-once pour les applications critiques
- Utiliser des topics compactés pour les KTables (rétention changelog)
- Surveiller le lag consommateur et la taille du state store
-
Tester avec des événements hors ordre et tardifs
-
À ÉVITER
- Stocker un état non borné (utiliser le fenêtrage ou TTL)
- Utiliser de grandes fenêtres sans ajuster la taille du state store
- Supposer que les événements arrivent dans l'ordre (gérer les événements tardifs)
- Ignorer le rééquilibrage et le temps de migration d'état
- Traiter des données sensibles sans chiffrement
Liste de Vérification Production¶
- Sémantique exactly-once activée pour les apps critiques
- State stores configurés avec rétention appropriée
- Surveillance : lag consommateur, latence traitement, taille état
- Tolérance aux pannes : topics changelog répliqués (RF ≥ 3)
- Scaling : instances multiples pour traitement parallèle
- Gestion d'erreurs : dead letter queue pour messages empoisonnés
- Tests : tests unitaires avec TopologyTestDriver (Kafka Streams)
Dépannage¶
Erreur : StreamsException: State store not found
Cause : State store non matérialisé ou requête avant prêt
**Solution :**
1. S'assurer que le state store est matérialisé :
```java
.count(Materialized.as("my-store"))
```
2. Attendre la restauration de l'état après rééquilibrage :
```java
streams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING) {
// État restauré, prêt à requêter
}
});
```
Erreur : Late events dropped by window
Cause : Événements arrivés après fermeture de la fenêtre (au-delà de la latence autorisée)
**Solution :**
1. Augmenter la période de grâce pour les événements tardifs :
```java
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5), // Taille fenêtre
Duration.ofMinutes(1) // Permettre 1 minute de latence
)
```
2. Utiliser des watermarks dans Flink pour gérer les événements tardifs
3. Surveiller les métriques d'événements tardifs
Erreur : RocksDB out of memory
Cause : State store devenu trop grand pour la mémoire disponible
**Solution :**
1. Ajouter du fenêtrage pour borner la taille de l'état
2. Augmenter le cache de bloc RocksDB :
```java
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
CustomRocksDBConfig.class);
```
3. Scaler horizontalement (plus d'instances = état partitionné)
4. Utiliser TTL pour les entrées d'état
Exemple Complet : Détection de Fraude en Temps Réel¶
Stratégies de Déploiement¶
Déploiement Kafka Streams¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-processor
spec:
replicas: 3 # (1)!
selector:
matchLabels:
app: order-processor
template:
metadata:
labels:
app: order-processor
spec:
containers:
- name: processor
image: order-processor:1.0
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka:9092"
resources: # (2)!
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
- 3 réplicas pour traitement parallèle - auto-scaling possible
- Limites de ressources - le state store RocksDB nécessite de la mémoire
Déploiement Flink¶
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-stream-processor
spec:
image: flink:1.18
flinkVersion: v1_18
jobManager: # (1)!
replicas: 1
resource:
memory: "2Gi"
cpu: 1
taskManager: # (2)!
replicas: 3
resource:
memory: "4Gi"
cpu: 2
job:
jarURI: s3://my-bucket/stream-processor.jar
parallelism: 6 # (3)!
upgradeMode: stateful # (4)!
- JobManager - orchestre l'exécution du job
- TaskManagers - exécutent les tâches parallèles
- Niveau de parallélisme - distribuer le travail à travers les slots de tâche
- Mise à jour avec état - préserver l'état lors du redéploiement
Prochaines Étapes¶
Maintenant que vous comprenez le traitement de flux, explorez :
- Kafka Connect - Intégrer le traitement de flux avec des systèmes externes
- Schema Registry - Utiliser Avro pour une sérialisation de flux efficace
- Démarrer avec Kafka - Revoir les fondamentaux de Kafka
Ressources Additionnelles¶
Documentation Officielle¶
- Documentation Apache Kafka Streams
- Javadoc API Kafka Streams
- Documentation Apache Flink
- Connecteur Kafka pour Flink
Tutoriels & Cours¶
- Cours Kafka Streams 101 - Cours gratuit Confluent
- Formation Pratique Apache Flink
- Fondamentaux du Traitement de Flux
- Tutoriel Construction d'Applications Streaming
Livres & Guides¶
- Designing Event-Driven Systems - Livre gratuit O'Reilly par Ben Stopford
- Stream Processing with Apache Flink - Fabian Hueske & Vasiliki Kalavri
- Kafka Streams in Action - Bill Bejeck
- Mastering Kafka Streams - Mitch Seymour
Sujets Avancés¶
- State Stores Kafka Streams
- Gestion d'État Flink
- Sémantique Exactly-Once dans Kafka Streams
- Comparaison des Stratégies de Fenêtrage
Attribution du Cours
Ce guide est basé sur le contenu de la documentation Apache Kafka Streams, la documentation Apache Flink, les tutoriels Confluent, et les meilleures pratiques de l'industrie.