Comment Produire des Messages vers Apache Kafka¶
Ce guide vous accompagne dans le processus d'écriture de données dans les topics Apache Kafka en utilisant les producers Kafka.
Prérequis¶
Avant de commencer, assurez-vous d'avoir :
- Docker et Docker Compose installés
- Environnement de développement configuré (JDK 11+, Python 3.8+, ou Node.js 16+)
- Une compréhension de base des concepts Kafka
Démarrage Rapide : Exécuter Kafka avec Docker Compose¶
Utilisez cette configuration Docker Compose pour exécuter un cluster Kafka à nœud unique avec KRaft (pas besoin de Zookeeper) :
- Image Kafka de Confluent Platform - inclut des outils supplémentaires et fonctionnalités enterprise
- Port du broker Kafka exposé à l'hôte
- ID de nœud unique dans le cluster KRaft
- Ce nœud agit à la fois comme broker et controller (mode combiné)
- Listeners : PLAINTEXT pour les clients, CONTROLLER pour la communication interne du cluster
- Listener annoncé pour les clients externes (utiliser
localhostpour le développement local) - Votants du quorum pour le consensus KRaft (format :
id@host:port) - ID de cluster unique - générer avec
docker run confluentinc/cp-kafka kafka-storage random-uuid - Répertoire de logs pour le stockage des données Kafka (chemin par défaut Confluent)
- Désactiver Confluent Metrics Reporter pour une configuration de développement minimale
Démarrer Kafka¶
# Démarrer Kafka
docker-compose up -d
# Attendre que Kafka soit prêt (vérifier le statut de santé)
docker-compose ps
# Voir les logs
docker-compose logs -f kafka
# Arrêter Kafka
docker-compose down
Vérifier que Kafka Fonctionne
```bash # Créer un topic de test docker exec kafka-kraft kafka-topics \ --bootstrap-server localhost:9092 \ --create --topic test-topic --partitions 3 --replication-factor 1
# Lister les topics
docker exec kafka-kraft kafka-topics \
--bootstrap-server localhost:9092 --list
```
Considérations de Production
Ceci est une configuration à nœud unique pour le développement. Pour la production : - Utilisez au moins 3 nœuds Kafka pour la haute disponibilité - Configurez des nœuds controller et broker séparés - Utilisez des volumes persistants pour le stockage des données - Activez l'authentification et le chiffrement (SASL/SSL)
Comprendre les Producers¶
Qu'est-ce qu'un Producer ?
Les producers sont des applications clientes responsables de l'écriture de données dans les topics Kafka. Toute application qui pousse des données—qu'il s'agisse de télémétrie, de logs, d'événements ou d'activités utilisateur—dans Kafka est considérée comme un producer.
graph LR
A[Application Producer] -->|Écrire Événements| B[Topic Kafka]
B --> C[Partition 0]
B --> D[Partition 1]
B --> E[Partition 2] Étape 1 : Configurer Votre Producer¶
Propriétés de Configuration de Base¶
# Connexion
bootstrap.servers=localhost:9092 # (1)!
# Sérialisation
key.serializer=org.apache.kafka.common.serialization.StringSerializer # (2)!
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Optionnel : Optimisation des performances
acks=all # (3)!
retries=3
linger.ms=1
- Liste des adresses des brokers pour la découverte initiale du cluster
- Convertit vos objets clé/valeur en tableaux d'octets
- Attendre que toutes les répliques accusent réception (durabilité maximale)
config = {
'bootstrap.servers': 'localhost:9092', # (1)!
'acks': 'all', # (2)!
'retries': 3,
'linger.ms': 1
}
- Adresses des brokers (séparées par des virgules si multiples)
- Attendre toutes les répliques in-sync
Référence de Configuration¶
| Propriété | Objectif | Valeurs Courantes |
|---|---|---|
bootstrap.servers | Liste des adresses des brokers | localhost:9092 ou plusieurs hôtes |
key.serializer | Comment convertir les clés en octets | StringSerializer, IntegerSerializer |
value.serializer | Comment convertir les valeurs en octets | StringSerializer, JsonSerializer |
acks | Niveau d'accusé de réception | 0, 1, all |
retries | Nombre de tentatives de réessai | 0 à Integer.MAX_VALUE |
Bonne Pratique Bootstrap Servers
Vous n'avez pas besoin de lister tous les brokers—juste assez pour la découverte initiale. Le producer apprendra automatiquement le reste du cluster.
Étape 2 : Créer une Instance de Producer¶
| simple_producer.py | |
|---|---|
Étape 3 : Envoyer des Messages¶
Stratégies d'Envoi¶
Aucune Garantie de Livraison
Le fire-and-forget ne fournit aucune confirmation. Les messages peuvent être perdus en cas de défaillance.
Garantie Forte
Bloque jusqu'à réception de l'accusé. À utiliser pour les messages critiques.
try {
RecordMetadata metadata = producer.send(record).get(); // (1)!
System.out.printf("✓ Envoyé vers partition %d à l'offset %d%n",
metadata.partition(),
metadata.offset());
} catch (Exception e) {
System.err.println("✗ Échec : " + e.getMessage());
}
-
.get()bloque jusqu'à la fin
try:
producer.produce('events', key='key1', value='value1')
producer.flush() # (1)!
print("✓ Message envoyé avec succès")
except Exception as e:
print(f"✗ Échec : {e}")
- Bloque jusqu'à la livraison de tous les messages
// Node.js n'a pas d'envoi sync natif
// Utiliser async avec wrapper Promise
function sendSync(producer, topic, key, value) {
return new Promise((resolve, reject) => {
producer.produce(
topic, null, Buffer.from(value), key,
Date.now(),
(err, offset) => {
if (err) reject(err);
else resolve(offset);
}
);
producer.flush(10000, (err) => {
if (err) reject(err);
});
});
}
// Utilisation
try {
const offset = await sendSync(producer, 'events', 'key1', 'value1');
console.log(`✓ Envoyé à l'offset ${offset}`);
} catch (err) {
console.error(`✗ Échec : ${err}`);
}
deliveryChan := make(chan kafka.Event)
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte("key1"),
Value: []byte("value1"),
}, deliveryChan)
if err != nil {
fmt.Printf("✗ Échec : %v\n", err)
}
// Attendre le rapport de livraison
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("✗ Livraison échouée : %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("✓ Envoyé vers partition %d à l'offset %v\n",
m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
Bonne Pratique
L'asynchrone avec callbacks offre le meilleur équilibre entre performance et fiabilité.
producer.send(record, (metadata, exception) -> { // (1)!
if (exception == null) {
System.out.printf("✓ Envoyé vers partition %d, offset %d%n",
metadata.partition(),
metadata.offset());
} else {
System.err.println("✗ Échec : " + exception.getMessage());
}
});
- Callback exécuté à la fin de l'envoi
def delivery_callback(err, msg): # (1)!
if err:
print(f"✗ Échec : {err}")
else:
print(f"✓ Envoyé vers partition {msg.partition()}, "
f"offset {msg.offset()}")
producer.produce(
'events',
key='key1',
value='value1',
callback=delivery_callback # (2)!
)
producer.poll(0) # Déclencher les callbacks
- Fonction callback pour les rapports de livraison
- Attacher le callback à ce message
producer.produce(
'events', // topic
null, // partition (null = automatique)
Buffer.from('value1'), // valeur
'key1', // clé
Date.now(), // timestamp
(err, offset) => { // (1)!
if (err) {
console.error('✗ Échec :', err);
} else {
console.log(`✓ Envoyé à l'offset ${offset}`);
}
}
);
- Callback pour confirmation de livraison
deliveryChan := make(chan kafka.Event, 10000) // (1)!
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte("key1"),
Value: []byte("value1"),
}, deliveryChan)
// Gérer les rapports de livraison de manière asynchrone
go func() {
for e := range deliveryChan {
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("✗ Échec : %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("✓ Envoyé vers partition %d à l'offset %v\n",
m.TopicPartition.Partition,
m.TopicPartition.Offset)
}
}
}()
- :material-buffer: Canal bufferisé pour envois non-bloquants
Étape 4 : Contrôler le Routage des Messages¶
Stratégies d'Attribution de Partition¶
graph TD
A{Message avec Key?} -->|Oui| B[Hasher la Key]
A{Message avec Key?} -->|Non| C[Round-Robin]
B --> D["Même Key → Même Partition<br/>(Ordre Préservé)"]
C --> E[Distribution Uniforme<br/>Entre Partitions]
Exemples de Routage¶
Garantie d'Ordre
Les messages avec la même clé vont toujours vers la même partition, préservant l'ordre.
// Tous les messages pour user-123 vont vers la même partition
ProducerRecord<String, String> record1 =
new ProducerRecord<>("events", "user-123", "login");
ProducerRecord<String, String> record2 =
new ProducerRecord<>("events", "user-123", "purchase");
producer.send(record1); // → Partition 2 (exemple)
producer.send(record2); // → Partition 2 (idem !)
// Même clé route vers la même partition
key := []byte("user-123")
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic},
Key: key,
Value: []byte("login"),
}, nil)
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic},
Key: key,
Value: []byte("purchase"),
}, nil)
// 1. Configurer le partitioner personnalisé
config.put("partitioner.class", "com.example.CustomPartitioner");
// 2. Implémenter l'interface Partitioner
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
// Router les messages haute priorité vers partition 0
if (value.toString().contains("URGENT")) {
return 0; // (1)!
}
// Routage par hash par défaut pour les autres
return Math.abs(key.hashCode()) %
cluster.partitionCountForTopic(topic);
}
// ... autres méthodes requises
}
- Les messages urgents obtiennent une partition dédiée pour traitement plus rapide
def custom_partitioner(key, all_partitions, available):
"""
Fonction de partitioner personnalisé.
"""
# Router les messages URGENT vers partition 0
if key and b'URGENT' in key:
return 0
# Partitionnement par hash par défaut pour les autres
return hash(key) % len(all_partitions)
# Configurer le producer avec partitioner personnalisé
config = {
'bootstrap.servers': 'localhost:9092',
'partitioner': custom_partitioner
}
// Go utilise librdkafka qui supporte les partitioners personnalisés
// via l'option de configuration 'partitioner'
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"partitioner": "murmur2_random", // ou personnalisé
}
// Pour logique vraiment personnalisée, calculer partition manuellement :
partition := customPartitionLogic(key, value)
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: int32(partition),
},
Key: key,
Value: value,
}, nil)
Étape 5 : Configurer les Accusés de Réception¶
Comprendre acks¶
graph LR
A[Producer] -->|acks=0| B["Sans Attente<br/>⚡ Plus Rapide"]
A -->|acks=1| C["Leader Seulement<br/>🚀 Équilibré"]
A -->|acks=all| D["Leader + Toutes Répliques<br/>✅ Plus Durable"]
Matrice de Comparaison¶
| Paramètre | Durabilité | Latence | Débit | Cas d'Usage |
|---|---|---|---|---|
acks=0 | ❌ Aucune | ⚡ Minimale | 🚀 Maximal | Logs fire-and-forget |
acks=1 | ⚠️ Moyenne | 🚀 Moyenne | 🚀 Élevé | Usage général |
acks=all | ✅ Maximale | 🐢 Maximale | 🐢 Minimal | Transactions financières |
Exemple de Configuration¶
# Attendre toutes les répliques in-sync (1)
acks=all
# Exiger au moins 2 répliques (2)
min.insync.replicas=2
# Réessayer en cas d'échec (3)
retries=3
- Le producer attend que toutes les répliques accusent réception
- Au moins 2 répliques doivent être in-sync
- Réessai automatique des envois échoués
Étape 6 : Gérer les Erreurs et Réessais¶
Erreurs Courantes¶
TimeoutException
Cause : Broker inaccessible ou problèmes réseau
**Solutions :**
- Vérifier la connectivité réseau et les règles de pare-feu
- Vérifier la configuration `bootstrap.servers`
- Augmenter `request.timeout.ms`
RecordTooLargeException
Cause : Le message dépasse message.max.bytes du broker
**Solutions :**
- Réduire la taille du message ou diviser en messages plus petits
- Augmenter config broker : `message.max.bytes`
- Augmenter config producer : `max.request.size`
SerializationException
Cause : Format de données invalide pour le serializer
**Solutions :**
- Valider les données avant production
- Vérifier la configuration du serializer
- Utiliser Schema Registry pour validation
Configuration des Réessais¶
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// Kafka réessaiera automatiquement (1)
log.warn("Erreur réessayable : {}", exception.getMessage());
} else {
// Non-réessayable - gérer manuellement (2)
log.error("Erreur fatale : {}", exception.getMessage());
// Stocker dans dead-letter queue ou alerter
deadLetterQueue.send(record);
}
}
});
- Erreurs transitoires (réseau, broker temporairement indisponible)
- Erreurs permanentes (données invalides, échec auth)
def delivery_callback(err, msg):
if err:
if err.code() == KafkaError._MSG_TIMED_OUT:
# Erreur réessayable
logger.warning(f"Timeout : {err}")
else:
# Erreur non-réessayable
logger.error(f"Fatale : {err}")
# Envoyer vers DLQ
dlq_producer.produce('dead-letter-queue', msg.value())
else:
logger.info(f"✓ Livré : {msg.offset()}")
for e := range deliveryChan {
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
err := m.TopicPartition.Error
// Vérifier si réessayable
if err.(kafka.Error).IsRetriable() {
log.Printf("Erreur réessayable : %v", err)
} else {
log.Printf("Erreur fatale : %v", err)
// Envoyer vers DLQ
}
} else {
log.Printf("✓ Livré à l'offset %v",
m.TopicPartition.Offset)
}
}
Étape 7 : Optimiser les Performances¶
Configuration du Batching¶
Avantages du Batching
Grouper les messages en lots améliore considérablement le débit en réduisant la surcharge réseau.
sequenceDiagram
participant P as Producer
participant B as Buffer de Batch
participant K as Broker Kafka
P->>B: Message 1
P->>B: Message 2
P->>B: Message 3
Note over B: Attendre linger.ms<br/>ou batch plein
B->>K: Envoyer Batch (3 messages)<br/>1 appel réseau au lieu de 3 !
K-->>B: Ack (les 3) # Taille du batch en octets (16KB par défaut)
batch.size=16384 # (1)!
# Temps d'attente avant envoi batch partiel (0ms = envoi immédiat)
linger.ms=10 # (2)!
# Compression
compression.type=snappy # (3)!
- Lots plus grands = meilleur débit (mais plus de mémoire)
- Échanger un peu de latence pour un meilleur batching
- Compresser les lots pour économiser la bande passante
Comparaison de Compression¶
- None
- Ratio : 1x (pas de compression)
- CPU : Minimal
-
Idéal pour : Données déjà compressées
-
Snappy ⭐
- Ratio : 2-3x
- CPU : Faible
-
Idéal pour : Usage général (recommandé)
-
LZ4
- Ratio : 2-3x
- CPU : Faible
-
Idéal pour : Exigences faible latence
-
GZIP
- Ratio : 4-5x
- CPU : Élevé
-
Idéal pour : Optimisation du stockage
-
ZSTD
- Ratio : 3-5x
- CPU : Moyen
- Idéal pour : Meilleur équilibre ratio/vitesse
Exemple Complet Fonctionnel¶
Exercice Pratique¶
Pratiquer avec le Lab Interactif Confluent
Prêt à l'essayer vous-même ? L'exercice pratique officiel de Confluent fournit un environnement pré-configuré avec validation en temps réel.
Ce que vous obtenez :
- Cluster Kafka pré-configuré
- Instructions étape par étape
- Exemples multi-langages
- Validation en temps réel
Résumé des Bonnes Pratiques¶
- À FAIRE
- Utiliser
acks=allpour les données critiques - Activer la compression (
snappyouzstd) - Envoyer de manière asynchrone avec callbacks
- Fermer les producers proprement (
flush()puisclose()) - Surveiller les métriques du producer
-
Utiliser des clés pour les messages ordonnés
-
À ÉVITER
- Utiliser des envois synchrones dans des scénarios à haut débit
- Ignorer les exceptions des callbacks
- Créer un nouveau producer par message (réutiliser les instances !)
- Envoyer des données sensibles sans chiffrement
- Utiliser
acks=0pour des données importantes - Oublier d'appeler
flush()avant arrêt
Dépannage¶
Producer Bloqué / Se Bloque Indéfiniment
Symptômes : Le producer se bloque indéfiniment sur send() ou flush()
Solutions :
Débit Faible / Envois Lents
Symptômes : Messages envoyés à un rythme très lent
Solutions :
Messages Désordonnés
Symptômes : Messages arrivent dans le mauvais ordre
Solutions :
Connexion Refusée
Symptômes : Impossible de se connecter aux brokers
Liste de vérification :
- Les brokers Kafka sont en cours d'exécution (
jpsoudocker ps) -
bootstrap.serverscorrespond aux adresses réelles des brokers - Le pare-feu autorise la connexion sur le port 9092
- Pour Docker : utiliser la configuration réseau/hôte correcte
Prochaines Étapes¶
- Consommer des Messages
Apprenez à lire les données des topics Kafka avec les consumers
- :material-schema: Schema Registry
Ajoutez validation et évolution de schémas à vos pipelines de données
- Concepts Fondamentaux
Plongée profonde dans l'architecture et les mécanismes internes de Kafka