Consommer des Messages depuis Apache Kafka¶
Vue d'ensemble¶
Apprenez à construire des applications consumer Kafka qui lisent et traitent des flux d'événements depuis les topics Kafka. Ce guide couvre la configuration des consumers, la gestion des offsets, les consumer groups pour le traitement parallèle, et les meilleures pratiques pour construire des applications de streaming fiables.
!!! info "Ce que vous allez apprendre" - Configurer et créer des consumers Kafka - S'abonner à des topics et interroger pour des messages - Gérer les offsets des consumers pour la tolérance aux pannes - Mettre à l'échelle avec les consumer groups - Implémenter la gestion d'erreurs et la récupération
Prérequis¶
- Docker et Docker Compose installés
- Compréhension de base des topics et partitions Kafka
- Environnement de développement avec les bibliothèques client Kafka
Démarrage Rapide : Exécuter Kafka avec Docker Compose¶
Utilisez cette configuration Docker Compose pour exécuter un cluster Kafka à nœud unique avec KRaft (pas besoin de Zookeeper) :
- Image Confluent Platform Kafka (inclut le mode KRaft)
- Port du broker Kafka exposé à l'hôte
- ID de nœud unique dans le cluster KRaft
- Ce nœud agit à la fois comme broker et controller (mode combiné)
- Listeners : PLAINTEXT pour les clients, CONTROLLER pour la communication interne du cluster
- Listener annoncé pour les clients externes
- Votants du quorum pour le consensus KRaft (format :
id@host:port) - ID de cluster unique - générer avec
docker run confluentinc/cp-kafka kafka-storage random-uuid - Répertoire de logs pour le stockage des données Kafka
- Désactiver la collecte de métriques Confluent Support pour une configuration locale minimale
Démarrer Kafka¶
# Démarrer Kafka
docker-compose up -d
# Attendre que Kafka soit prêt
docker-compose ps
# Créer un topic avec quelques données de test
docker exec kafka-kraft kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic orders --partitions 3 --replication-factor 1
# Produire des messages de test
docker exec -i kafka-kraft kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic orders << EOF
{"order_id": "ORD-001", "amount": 99.99}
{"order_id": "ORD-002", "amount": 149.50}
{"order_id": "ORD-003", "amount": 75.25}
EOF
# Arrêter Kafka
docker-compose down
Vérifier les Messages
bash # Consommer les messages depuis le début docker exec kafka-kraft kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic orders --from-beginning --max-messages 3
Fondamentaux des Consumers¶
Qu'est-ce qu'un Consumer Kafka ?¶
Définition du Consumer
Un consumer est une application client qui lit des données depuis les topics Kafka. Les consumers :
- Récupèrent les messages d'un ou plusieurs topics
- Traitent les événements en temps réel ou quasi-temps réel
- Suivent leur position de lecture avec les offsets
- Peuvent évoluer horizontalement avec les consumer groups
graph LR
T[Topic: orders<br/>3 Partitions] --> C1[Consumer 1<br/>Lit Partition 0]
T --> C2[Consumer 2<br/>Lit Partition 1]
T --> C3[Consumer 3<br/>Lit Partition 2]
C1 --> P[Logique de<br/>Traitement]
C2 --> P
C3 --> P
Étape 1 : Configurer Votre Consumer¶
Configuration Requise¶
Tous les consumers ont besoin d'une configuration de base pour se connecter au cluster Kafka :
- Adresses des brokers Kafka - spécifier plusieurs pour la redondance
- ID du consumer group - les consumers avec le même ID partagent la charge des partitions
- Désérialiseur pour les clés des messages - convertit les bytes en objets
- Désérialiseur pour les valeurs des messages - doit correspondre au sérialiseur du producer
- Où commencer la lecture si aucun offset n'existe :
earliestoulatest
| consumer_config.py | |
|---|---|
- Adresses des brokers Kafka - peut être une liste pour plusieurs brokers
- ID du consumer group - permet l'équilibrage de charge entre consumers
- Désérialiser les clés de bytes vers string
- Désérialiser les valeurs de bytes vers string
- Lire depuis le début si pas d'offset précédent
- Tableau d'adresses de brokers pour la connexion au cluster
- ID du consumer group pour la consommation coordonnée
- KafkaJS gère automatiquement la désérialisation pour les strings
- Adresses des brokers du cluster Kafka
- Identifiant du consumer group
- Stratégie de réinitialisation d'offset pour nouveaux consumers
Configuration Optionnelle¶
Optimisation des Performances
Considérez ces paramètres optionnels pour la production :
| Paramètre | Description | Défaut | Recommandation |
|-----------|-------------|---------|----------------|
| `fetch.min.bytes` | Données minimales par requête fetch | 1 | Augmenter pour le débit |
| `fetch.max.wait.ms` | Temps d'attente max si min bytes non atteint | 500ms | Équilibrer latence vs débit |
| `max.poll.records` | Records max par poll | 500 | Ajuster selon temps de traitement |
| `enable.auto.commit` | Auto-commit des offsets | `true` | `false` pour contrôle manuel |
| `session.timeout.ms` | Timeout heartbeat du consumer | 10s | Augmenter pour traitement lent |
Étape 2 : S'abonner aux Topics¶
Abonnement à un Topic Unique¶
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// S'abonner à un topic unique
consumer.subscribe(Collections.singletonList("orders")); // (1)!
- Passer une liste même pour un seul topic - l'API attend une Collection
- Notation liste même pour un seul topic
Topics Multiples¶
Abonnement Basé sur Pattern¶
Fonctionnalité Avancée
S'abonner aux topics correspondant à un pattern regex - utile pour la création dynamique de topics mais nécessite une gestion attentive.
Étape 3 : Interroger pour des Messages¶
La Boucle de Poll¶
Polling Infini
Les applications de streaming s'exécutent indéfiniment, interrogeant continuellement pour de nouveaux messages. C'est le comportement normal pour les systèmes événementiels.
- Boucle infinie - normal pour applications de streaming
- Poll avec timeout - retourne immédiatement si messages disponibles
- Itérer sur le batch de records retournés
- De quel topic provient ce message
- Numéro de partition (indexé à 0)
- Position offset dans la partition
- Clé du message (peut être null)
- Valeur du message - vos données réelles
- Arrêt propre à la sortie de l'application
| consumer_loop.py | |
|---|---|
- Pattern itérateur Python - plus propre que while loop
- Votre logique métier ici
- Arrêt gracieux
- Consommation async avec batching automatique
- Callback par message
- Chaînage optionnel - key peut être null
- Convertir Buffer en string
- Traitement async supporté
- Boucle infinie pour polling continu
- Lecture bloquante avec timeout -1 (attendre indéfiniment)
- Votre logique de traitement
Étape 4 : Gérer les Offsets des Consumers¶
Comprendre les Offsets¶
Suivi des Offsets
Kafka suit l'offset (position) de chaque message qu'un consumer a traité. Cela permet :
- **Tolérance aux pannes** : Reprendre depuis la dernière position après un crash
- **Sémantique exactly-once** : Éviter de retraiter les messages
- **Rejouabilité** : Revenir à un offset antérieur si nécessaire
sequenceDiagram
participant C as Consumer
participant K as Broker Kafka
participant T as topic __consumer_offsets
C->>K: Poll messages (offset 100-109)
K->>C: Retourne 10 messages
C->>C: Traite les messages
C->>K: Commit offset 110
K->>T: Stocke offset 110 pour le groupe
Note over C: Consumer crash
C->>K: Consumer redémarre, fetch offset
T->>K: Retourne offset 110
K->>C: Reprend depuis offset 110 Auto vs Commit Manuel¶
- Auto-Commit (Défaut)
Avantages : - Simple - aucun code nécessaire - Commits périodiques automatiques
Inconvénients : - Risque de traitement dupliqué - Aucun contrôle sur le timing
- Commit Manuel (Recommandé)
Avantages : - Contrôle précis - Garantie at-least-once - Commit après traitement réussi
Inconvénients : - Complexité de code accrue - Doit être géré explicitement
Exemples de Commit Manuel¶
props.put("enable.auto.commit", "false"); // (1)!
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processOrder(record.value());
// Commit après traitement réussi
consumer.commitSync(); // (2)!
} catch (Exception e) {
// Gérer l'erreur - ne pas commiter en cas d'échec
log.error("Processing failed", e); // (3)!
}
}
}
- Désactiver auto-commit pour contrôle manuel
- Commit synchrone - bloque jusqu'à acknowledgement
- Messages échoués n'auront pas leurs offsets commités
consumer = KafkaConsumer(
'orders',
enable_auto_commit=False // (1)!
)
for message in consumer:
try:
process_order(message.value)
consumer.commit() // (2)!
except Exception as e:
print(f"Error processing: {e}") // (3)!
# Ne pas commiter - réessayera au redémarrage
- Mode commit manuel
- Commit après traitement réussi
- Ignorer le commit en cas d'erreur pour retry
await consumer.run({
autoCommit: false, // (1)!
eachMessage: async ({ message, heartbeat, pause }) => {
try {
await processOrder(message.value.toString());
await consumer.commitOffsets([{ // (2)!
topic: 'orders',
partition: message.partition,
offset: (parseInt(message.offset) + 1).toString()
}]);
} catch (error) {
console.error('Processing failed:', error);
pause(); // (3)!
setTimeout(() => consumer.resume([{ topic: 'orders' }]), 5000);
}
}
});
- Désactiver auto-commit
- Commit manuel d'offset après succès
- Pause et retry en cas d'erreur
config := &kafka.ConfigMap{
"enable.auto.commit": false, // (1)!
}
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
continue
}
if err := processOrder(string(msg.Value)); err == nil {
_, err := consumer.CommitMessage(msg) // (2)!
if err != nil {
log.Printf("Commit failed: %v\n", err)
}
} else {
log.Printf("Processing failed: %v\n", err) // (3)!
}
}
- Mode commit manuel
- Commiter le message spécifique après traitement
- Ne pas commiter en cas d'échec
Étape 5 : Mettre à l'Échelle avec les Consumer Groups¶
Concepts des Consumer Groups¶
Mise à l'Échelle Horizontale
Les consumer groups permettent le traitement parallèle en distribuant les partitions entre plusieurs instances de consumers.
**Règles Clés :**
- Les consumers dans le même groupe partagent la charge des partitions
- Chaque partition est assignée à **un seul** consumer du groupe
- Plusieurs groupes peuvent lire le même topic indépendamment
graph TB
subgraph "Topic: orders (3 partitions)"
P0[Partition 0]
P1[Partition 1]
P2[Partition 2]
end
subgraph "Consumer Group: order-processing"
C1[Consumer 1]
C2[Consumer 2]
C3[Consumer 3]
end
P0 --> C1
P1 --> C2
P2 --> C3
Rééquilibrage¶
Rééquilibrage Automatique
Quand des consumers rejoignent ou quittent un groupe, Kafka rééquilibre automatiquement les assignations de partitions.
**Déclencheurs :**
- Un nouveau consumer rejoint le groupe
- Un consumer existant échoue/s'arrête
- Le nombre de partitions du topic change
**Pendant le rééquilibrage :**
- Courte pause de traitement (habituellement < 1 seconde)
- Partitions réassignées aux consumers disponibles
graph TB
subgraph "Avant Rééquilibrage"
P0A[Partition 0] --> C1A[Consumer 1]
P1A[Partition 1] --> C2A[Consumer 2 ❌ ÉCHOUE]
P2A[Partition 2] --> C3A[Consumer 3]
end
subgraph "Après Rééquilibrage"
P0B[Partition 0] --> C1B[Consumer 1]
P1B[Partition 1] --> C1B
P2B[Partition 2] --> C3B[Consumer 3]
end
Directives de Mise à l'Échelle¶
Nombre Optimal de Consumers
| Scénario | Recommandation |
|---|---|
| Partitions = Consumers | Idéal - parallélisme complet |
| Partitions > Consumers | OK - certains consumers gèrent plusieurs partitions |
| Consumers > Partitions | Gaspillage - consumers inactifs restent inutilisés |
Exemple :
- Topic avec 6 partitions → Déployer 6 consumers pour débit maximum
- Topic avec 3 partitions → Ne pas déployer plus de 3 consumers
Gestion d'Erreurs et Meilleures Pratiques¶
Gérer les Échecs de Traitement¶
Scénarios d'Échec Courants
1. Erreurs Réseau Transitoires ```java int maxRetries = 3; int retryCount = 0;
while (retryCount < maxRetries) {
try {
processOrder(record.value());
consumer.commitSync();
break; // Succès
} catch (TransientException e) {
retryCount++;
Thread.sleep(1000 * retryCount); // Backoff exponentiel
}
}
```
**2. Poison Pills (Messages Corrompus)**
```java
try {
processOrder(record.value());
} catch (DeserializationException e) {
// Envoyer vers dead letter queue
producer.send(new ProducerRecord<>("orders-dlq", record.value()));
consumer.commitSync(); // Ignorer le mauvais message
}
```
**3. Service Downstream Indisponible**
```java
try {
sendToDatabase(record.value());
} catch (DatabaseException e) {
consumer.pause(Collections.singleton(
new TopicPartition(record.topic(), record.partition())
));
// Réessayer après délai
scheduleRetry();
}
```
Liste de Contrôle des Meilleures Pratiques¶
!!! success "Consumers Prêts pour la Production" - [x] Utiliser les commits d'offset manuels pour données critiques - [x] Implémenter un traitement idempotent (gérer les duplicatas) - [x] Définir un session.timeout.ms approprié pour votre charge de travail - [x] Surveiller les métriques de lag des consumers - [x] Gérer les rééquilibrages gracieusement - [x] Utiliser des dead letter queues pour poison pills - [x] Implémenter des hooks d'arrêt appropriés - [x] Logger les offsets et métadonnées de traitement
Arrêt Gracieux¶
```java final KafkaConsumer
Runtime.getRuntime().addShutdownHook(new Thread(() -> { // (1)! System.out.println("Shutting down gracefully..."); consumer.wakeup(); // (2)! }));
try { consumer.subscribe(Collections.singletonList("orders")); while (true) { ConsumerRecords
- Enregistrer shutdown hook pour SIGTERM/SIGINT
- Interrompre poll() pour déclencher sortie propre
- WakeupException signale arrêt gracieux
- Fermer consumer pour commiter offsets finaux
```python import signal import sys
def shutdown(signum, frame): // (1)! print("Shutting down gracefully...") consumer.close() // (2)! sys.exit(0)
signal.signal(signal.SIGINT, shutdown) # Ctrl+C signal.signal(signal.SIGTERM, shutdown) # Docker stop
try: for message in consumer: process_order(message.value) except KeyboardInterrupt: pass finally: consumer.close() // (3)! ```
- Gestionnaire de signal pour arrêt gracieux
- Fermer consumer commite les offsets
- Assurer le nettoyage dans bloc finally
Exemple Complet : Service de Traitement de Commandes¶
Voici un exemple de consumer prêt pour la production :
```java title="OrderConsumerService.java" linenums="1" import org.apache.kafka.clients.consumer.; import java.time.Duration; import java.util.;
public class OrderConsumerService { private final KafkaConsumer
public OrderConsumerService(Properties config) {
this.consumer = new KafkaConsumer<>(config);
}
public void start() {
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
consumer.subscribe(Collections.singletonList("orders"));
System.out.println("Consumer started, waiting for messages...");
try {
while (running) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Commit après le batch
consumer.commitSync();
}
} catch (WakeupException e) {
// Attendu pendant l'arrêt
} catch (Exception e) {
System.err.println("Consumer error: " + e.getMessage());
} finally {
consumer.close();
System.out.println("Consumer closed");
}
}
private void processRecord(ConsumerRecord<String, String> record) {
try {
System.out.printf("Processing order: %s = %s%n",
record.key(), record.value());
// Votre logique métier ici
// ex : parser JSON, valider, stocker en base de données
} catch (Exception e) {
System.err.println("Failed to process record: " + e.getMessage());
// Envoyer vers DLQ ou implémenter logique de retry
}
}
private void shutdown() {
System.out.println("Shutdown signal received");
running = false;
consumer.wakeup();
}
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processing-service");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
OrderConsumerService service = new OrderConsumerService(props);
service.start();
}
} ```
```python title="order_consumer_service.py" linenums="1" from kafka import KafkaConsumer import signal import sys import json
class OrderConsumerService: def init(self, config): self.consumer = KafkaConsumer( 'orders', **config, value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) self.running = True
def start(self):
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
print("Consumer started, waiting for messages...")
try:
for message in self.consumer:
if not self.running:
break
self._process_message(message)
self.consumer.commit()
except Exception as e:
print(f"Consumer error: {e}")
finally:
self.consumer.close()
print("Consumer closed")
def _process_message(self, message):
try:
order = message.value
print(f"Processing order: {message.key} = {order}")
# Votre logique métier ici
# ex : valider commande, mettre à jour inventaire, envoyer notification
except Exception as e:
print(f"Failed to process message: {e}")
# Envoyer vers DLQ ou implémenter logique de retry
def _shutdown(self, signum, frame):
print("Shutdown signal received")
self.running = False
if name == "main": config = { 'bootstrap_servers': 'localhost:9092', 'group_id': 'order-processing-service', 'enable_auto_commit': False, 'auto_offset_reset': 'earliest' }
service = OrderConsumerService(config)
service.start()
```
Prochaines Étapes¶
!!! tip "Continuez Votre Apprentissage" - Utiliser Schema Registry - Gérer les schémas de données pour la sécurité des types - Connecter des Systèmes Externes - Intégrer Kafka avec des bases de données - Traiter des Streams - Transformer des données en temps réel - Retour au Tutoriel - Réviser les concepts fondamentaux
Ressources Supplémentaires¶
Documentation Officielle¶
- Documentation API Apache Kafka Consumer
- Référence Configuration Kafka Consumer
- Protocole Consumer Group - Détails Internes
- Guide Configuration Consumer Confluent
Tutoriels & Cours¶
- Cours Kafka Consumer 101 - Cours gratuit Confluent
- Exercice Interactif Kafka Consumer
- Meilleures Pratiques Consumer
Sujets Avancés¶
- Optimisation Performance Consumer
- Rééquilibrage Consumer Group - Analyse Approfondie
- Stratégies de Gestion des Offsets
Attribution du Cours
Ce guide est basé sur le contenu du cours Apache Kafka 101 de Confluent et la documentation officielle Apache Kafka.