Utiliser Kafka Schema Registry¶
Apprenez à gérer les schémas de données, garantir la compatibilité et sérialiser les données efficacement avec Kafka Schema Registry.
Ce que Vous Allez Apprendre¶
À la fin de ce guide, vous serez capable de :
- ✅ Comprendre pourquoi la gestion des schémas est critique pour l'intégrité des données
- ✅ Configurer Kafka Schema Registry
- ✅ Enregistrer et faire évoluer des schémas avec des règles de compatibilité
- ✅ Sérialiser et désérialiser des données avec Avro, Protobuf et JSON Schema
- ✅ Intégrer Schema Registry avec les producteurs et consommateurs
- ✅ Gérer l'évolution des schémas en production
Prérequis :
- Docker et Docker Compose installés
- Compréhension de base des producteurs et consommateurs
- Familiarité avec les formats de sérialisation de données
Temps Estimé : 30 minutes
Démarrage Rapide : Exécuter Kafka avec Schema Registry¶
Utilisez cette configuration Docker Compose pour exécuter Kafka avec KRaft et Schema Registry :
- Image Confluent Schema Registry - compatible avec Apache Kafka
- Port de l'API REST Schema Registry
- Nom d'hôte pour la découverte de service
- Serveurs bootstrap Kafka pour stocker les schémas
- Point de terminaison de l'API REST pour les opérations de schéma
- Topic Kafka interne pour stocker les schémas (créé automatiquement)
- Facteur de réplication 1 pour le développement (utiliser 3+ en production)
Démarrer les Services¶
# Démarrer Kafka et Schema Registry
docker-compose up -d
# Attendre que les services soient prêts
docker-compose ps
# Vérifier que Schema Registry fonctionne
curl http://localhost:8081/
# Vérifier les sujets (devrait être vide initialement)
curl http://localhost:8081/subjects
# Arrêter les services
docker-compose down
Tester l'Enregistrement de Schéma
```bash # Enregistrer un schéma Avro simple curl -X POST http://localhost:8081/subjects/test-value/versions \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{ "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}" }'
# Lister les sujets (devrait montrer "test-value")
curl http://localhost:8081/subjects
```
Considérations de Production
Pour les déploiements en production : - Exécuter plusieurs instances Schema Registry (3+) pour la haute disponibilité - Définir le facteur de réplication à 3 pour le topic _schemas - Activer l'authentification (basic auth ou mutual TLS) - Utiliser HTTPS pour l'API REST Schema Registry - Surveiller la disponibilité et la latence de Schema Registry
Qu'est-ce que Schema Registry ?¶
Schema Registry est un référentiel centralisé pour gérer et valider les schémas de données. Il garantit que les producteurs et consommateurs s'accordent sur la structure des messages, évitant la corruption des données et permettant une évolution sûre des schémas.
Pourquoi Utiliser Schema Registry ?
Sans Schema Registry, les changements de schéma incompatibles peuvent casser les consommateurs. Schema Registry applique des règles de compatibilité qui empêchent le déploiement de changements cassants.
graph LR
A[Producteur] -->|1. Enregistrer Schéma| B[Schema Registry]
B -->|2. Retourner ID Schéma| A
A -->|3. Envoyer Message<br/>avec ID Schéma| C[Topic Kafka]
D[Consommateur] -->|4. Récupérer Message| C
D -->|5. Obtenir Schéma par ID| B
B -->|6. Retourner Schéma| D
D -->|7. Désérialiser| D
!!! tip "Avantages de Schema Registry" - Intégrité des Données : Empêche les données incompatibles d'entrer dans les topics - Évolution du Schéma : Supporte l'ajout/suppression de champs sans casser les consommateurs - Économie de Bande Passante : Les messages ne contiennent qu'un petit ID de schéma au lieu du schéma complet - Application de Compatibilité : Plusieurs modes de compatibilité (backward, forward, full)
Étape 1 : Configurer Schema Registry¶
Installation¶
- Service Schema Registry - stocke les schémas dans le topic interne
_schemas - Port par défaut de l'API REST Schema Registry
- Nom d'hôte pour la découverte du service Schema Registry
- Connexion au cluster Kafka pour stocker les schémas
- Point de terminaison de l'API REST pour les opérations de schéma
# Télécharger Confluent Platform
wget https://packages.confluent.io/archive/7.5/confluent-7.5.0.tar.gz # (1)!
tar -xzf confluent-7.5.0.tar.gz
cd confluent-7.5.0
# Configurer Schema Registry
cat <<EOF > etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081 # (2)!
kafkastore.bootstrap.servers=localhost:9092 # (3)!
kafkastore.topic=_schemas # (4)!
debug=false
EOF
# Démarrer Schema Registry
bin/schema-registry-start etc/schema-registry/schema-registry.properties
- Télécharger Confluent Platform qui inclut Schema Registry
- Point de terminaison de l'API REST - accessible à
http://localhost:8081 - Connexion au cluster Kafka pour stocker les schémas
- Topic interne pour le stockage des schémas (créé automatiquement)
Vérifier l'Installation¶
# Vérifier la santé de Schema Registry
curl http://localhost:8081/ # (1)!
# Lister les sujets (devrait être vide initialement)
curl http://localhost:8081/subjects # (2)!
- Devrait retourner
{}si Schema Registry fonctionne - Retourne la liste des sujets de schéma enregistrés (topics + suffixe key/value)
Schema Registry est Prêt
Si les deux commandes réussissent, Schema Registry fonctionne et est prêt à gérer les schémas.
Étape 2 : Choisir un Format de Sérialisation¶
Schema Registry supporte trois formats de sérialisation. Choisissez en fonction de votre cas d'usage :
- Avro
Idéal Pour : Sérialisation de données généraliste
Avantages : - Format binaire compact - Types de données riches (unions, maps, arrays) - Écosystème mature
Inconvénients : - Nécessite la génération de code (optionnel) - Moins lisible par l'humain
- :material-buffer: Protobuf
Idéal Pour : Microservices haute performance
Avantages : - Sérialisation/désérialisation la plus rapide - Compatible backward et forward par conception - Typage fort avec génération de code
Inconvénients : - Nécessite des fichiers .proto - Support de schéma dynamique limité
- JSON Schema
Idéal Pour : Données lisibles par l'humain, APIs
Avantages : - Format lisible par l'humain - Pas de génération de code requise - Débogage facile
Inconvénients : - Taille de message plus grande - Parsing plus lent - Moins compact qu'Avro/Protobuf
Choix le Plus Courant
Avro est le format le plus populaire pour Kafka car il équilibre compacité, flexibilité et évolution de schéma. Ce guide se concentre sur Avro, avec des exemples pour Protobuf et JSON Schema.
Étape 3 : Enregistrer un Schéma¶
Définir un Schéma Avro¶
Créer un schéma pour un événement Order :
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{ "name": "order_id", "type": "string" }, // (1)!
{ "name": "customer_id", "type": "string" },
{ "name": "amount", "type": "double" },
{ "name": "timestamp", "type": "long" } // (2)!
]
}
- Identifiant unique de commande - champ obligatoire
- Horodatage Unix en millisecondes - champ obligatoire
Enregistrer le Schéma¶
curl -X POST http://localhost:8081/subjects/orders-value/versions \ # (1)!
-H "Content-Type: application/vnd.schemaregistry.v1+json" \ # (2)!
-d '{
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.example.orders\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"customer_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}" # (3)!
}'
- Nom du sujet :
<topic>-<key|value>- iciorders-valuepour les valeurs de message - Type de contenu requis pour l'API Schema Registry
- Définition de schéma en chaîne encodée JSON (notez les guillemets échappés)
Réponse :
- ID de schéma - utilisé pour référencer ce schéma dans les messages
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
# Configurer le client Schema Registry
sr_client = SchemaRegistryClient({
'url': 'http://localhost:8081' # (1)!
})
# Définir le schéma Avro
avro_schema = """
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"}
]
}
"""
# Enregistrer le schéma
schema_id = sr_client.register_schema( # (2)!
subject_name='orders-value',
schema=Schema(avro_schema, schema_type='AVRO')
)
print(f"Schéma enregistré avec l'ID : {schema_id}")
- Point de terminaison Schema Registry - ajustez pour votre déploiement
- Retourne l'ID de schéma qui est mis en cache pour les recherches futures
Vérifier l'Enregistrement¶
# Lister tous les sujets
curl http://localhost:8081/subjects
# Obtenir le dernier schéma pour un sujet
curl http://localhost:8081/subjects/orders-value/versions/latest # (1)!
# Obtenir le schéma par ID
curl http://localhost:8081/schemas/ids/1 # (2)!
- Retourne la dernière version du schéma pour le sujet
orders-value - Retourne la définition du schéma pour l'ID de schéma 1
Étape 4 : Produire des Messages avec Schéma¶
Utiliser le Sérialiseur Avro¶
- Définition complète du schéma Avro (identique à celui enregistré dans Schema Registry)
- Sérialiseur Avro - recherche automatiquement l'ID de schéma depuis Schema Registry
- Assigner le sérialiseur Avro au sérialiseur de valeur (la clé peut utiliser String)
- Dictionnaire Python correspondant à la structure du schéma Avro
- Le producteur sérialise automatiquement la valeur en utilisant le schéma Avro
- Message envoyé comme :
[magic_byte][schema_id][avro_binary_data]
- Sérialiseur Kafka Avro de Confluent - gère l'intégration avec Schema Registry
- URL Schema Registry pour la recherche et l'enregistrement de schéma
- GenericRecord représente les données Avro sans génération de code
- Le sérialiseur récupère automatiquement l'ID de schéma et sérialise en binaire
Validation du Schéma au Moment de la Production
Si le message ne correspond pas au schéma, le producteur lèvera une SerializationException avant d'envoyer à Kafka. Cela empêche les données invalides d'entrer dans le pipeline.
Étape 5 : Consommer des Messages avec Schéma¶
- Désérialiseur Avro - lit l'ID de schéma du message et récupère le schéma
- Optionnel : fournir le schéma pour validation (le désérialiseur peut le récupérer depuis le registre)
- Assigner le désérialiseur Avro au désérialiseur de valeur
- Message automatiquement désérialisé en dictionnaire Python
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.avro.generic.GenericRecord;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processor");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class); // (1)!
props.put("schema.registry.url", "http://localhost:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord order = record.value(); // (2)!
System.out.println("Commande : " + order.get("order_id"));
}
}
- Désérialiseur Kafka Avro - récupère automatiquement le schéma depuis le registre
- Valeur automatiquement désérialisée en Avro GenericRecord
Étape 6 : Évolution du Schéma¶
Modes de Compatibilité¶
Schema Registry supporte quatre modes de compatibilité :
| Mode | Description | Changements Autorisés |
|---|---|---|
| BACKWARD | Le nouveau schéma peut lire les anciennes données | Supprimer des champs, ajouter des champs optionnels |
| FORWARD | L'ancien schéma peut lire les nouvelles données | Ajouter des champs, supprimer des champs optionnels |
| FULL | Compatible backward et forward | Ajouter/supprimer uniquement des champs optionnels |
| NONE | Aucune vérification de compatibilité | Tout changement autorisé (⚠️ dangereux) |
Mode de Compatibilité par Défaut
Schema Registry utilise la compatibilité BACKWARD par défaut, qui est le choix le plus courant. Les consommateurs peuvent être mis à jour indépendamment sans casser.
Configurer le Mode de Compatibilité¶
# Définir le mode de compatibilité pour un sujet
curl -X PUT http://localhost:8081/config/orders-value \ # (1)!
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}' # (2)!
# Définir la compatibilité globale par défaut
curl -X PUT http://localhost:8081/config \ # (3)!
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'
- Définir le mode de compatibilité pour un sujet spécifique
- Mode de compatibilité : BACKWARD, FORWARD, FULL ou NONE
- Définir le mode de compatibilité par défaut pour tous les nouveaux sujets
Faire Évoluer le Schéma (Ajouter un Champ Optionnel)¶
Ajoutons un champ status au schéma Order :
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{ "name": "order_id", "type": "string" },
{ "name": "customer_id", "type": "string" },
{ "name": "amount", "type": "double" },
{ "name": "timestamp", "type": "long" },
{ "name": "status", "type": "string", "default": "PENDING" } // (1)!
]
}
- Nouveau champ optionnel avec valeur par défaut - compatible backward
# Enregistrer la nouvelle version
curl -X POST http://localhost:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{...}"}' # (1)!
# Réponse : {"id": 2} # (2)!
- Nouveau schéma avec champ
statusajouté - Nouvel ID de schéma assigné - les anciens consommateurs peuvent toujours lire les nouveaux messages
!!! success "Évolution du Schéma en Action" - Anciens consommateurs (utilisant le schéma v1) peuvent lire les nouveaux messages - ils ignorent le champ status - Nouveaux producteurs (utilisant le schéma v2) peuvent écrire avec le champ status - Aucun temps d'arrêt requis - migration graduelle possible
Changements Cassants (Incompatibles)¶
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{ "name": "order_id", "type": "int" }, // (1)!
{ "name": "customer_id", "type": "string" },
{ "name": "amount", "type": "double" }
]
}
- Type de
order_idchangé destringàint- CHANGEMENT CASSANT
# Essayer d'enregistrer un schéma incompatible
curl -X POST http://localhost:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{...}"}'
# Réponse : HTTP 409 Conflict # (1)!
# {
# "error_code": 409,
# "message": "Le schéma en cours d'enregistrement est incompatible avec un schéma antérieur"
# }
- Schema Registry rejette les schémas incompatibles - empêche de casser les consommateurs
Étape 7 : Utiliser Protobuf et JSON Schema¶
Exemple Protobuf¶
syntax = "proto3";
package com.example.orders;
message Order {
string order_id = 1; // (1)!
string customer_id = 2;
double amount = 3;
int64 timestamp = 4;
}
- Les numéros de champ ne doivent jamais changer - utilisés pour la compatibilité backward
curl -X POST http://localhost:8081/subjects/orders-proto-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "PROTOBUF", # (1)!
"schema": "syntax = \"proto3\"; package com.example.orders; message Order { string order_id = 1; string customer_id = 2; double amount = 3; int64 timestamp = 4; }"
}'
- Spécifier le type de schéma
PROTOBUF(par défaut c'est AVRO)
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
protobuf_serializer = ProtobufSerializer( # (1)!
msg_type=Order, # (2)!
schema_registry_client=sr_client
)
producer = SerializingProducer({
'bootstrap.servers': 'localhost:9092',
'value.serializer': protobuf_serializer
})
- Sérialiseur Protobuf - similaire au sérialiseur Avro
- Classe Protobuf générée depuis le fichier
.proto
Exemple JSON Schema¶
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"order_id": {"type": "string"}, // (1)!
"customer_id": {"type": "string"},
"amount": {"type": "number"},
"timestamp": {"type": "integer"}
},
"required": ["order_id", "customer_id", "amount", "timestamp"] // (2)!
}
- Définitions de type JSON Schema - format JSON Schema standard
- Champs obligatoires - appliqués au moment de la sérialisation
from confluent_kafka.schema_registry.json_schema import JSONSerializer
json_serializer = JSONSerializer(
schema_str=json_schema_str,
schema_registry_client=sr_client
)
producer = SerializingProducer({
'bootstrap.servers': 'localhost:9092',
'value.serializer': json_serializer # (1)!
})
# Envoyer un message JSON
order = { # (2)!
'order_id': 'ORD-12345',
'customer_id': 'CUST-67890',
'amount': 99.99,
'timestamp': 1702834567000
}
producer.produce('orders', value=order)
- Sérialiseur JSON Schema - valide contre le JSON Schema
- Dictionnaire Python simple - sérialisé en JSON avec validation de schéma
Bonnes Pratiques¶
- À FAIRE
- Utiliser la compatibilité BACKWARD ou FULL en production
- Ajouter des valeurs par défaut aux nouveaux champs pour la compatibilité backward
- Tester l'évolution du schéma en staging avant la production
- Versionner les schémas sémantiquement (v1, v2, etc.)
-
Utiliser Avro pour la sérialisation de données généraliste
-
À ÉVITER
- Changer les types de champs sans créer un nouveau topic
- Utiliser le mode de compatibilité
NONEen production - Supprimer des champs obligatoires sans plan de migration
- Déployer des changements de schéma cassants sans coordination
- Coder en dur les ID de schéma dans le code applicatif
Liste de Vérification Production¶
- Schema Registry déployé avec HA (plusieurs réplicas)
- Mode de compatibilité configuré (
BACKWARDouFULL) - Validation de schéma activée dans les producteurs
- Surveillance de la disponibilité de Schema Registry
- Stratégie de versionnement de schéma documentée
- Plan de rollback pour les changements de schéma
- Sécurité : SSL + authentification activés (voir Sécurité Schema Registry)
Dépannage¶
Erreur : Schema being registered is incompatible
Cause : Le nouveau schéma viole les règles de compatibilité
**Solution :**
1. Vérifier le mode de compatibilité : `curl http://localhost:8081/config/orders-value`
2. Examiner les changements de schéma - assurer la compatibilité backward
3. Ajouter des valeurs par défaut aux nouveaux champs
4. Si un changement cassant est requis :
- Créer un nouveau topic avec le nouveau schéma
- Migrer les consommateurs graduellement
- Utiliser Kafka Streams pour la transformation des données
Erreur : Subject not found
Cause : Schéma non enregistré pour le topic
**Solution :**
```bash
# Lister tous les sujets
curl http://localhost:8081/subjects
# Vérifier si le sujet existe
curl http://localhost:8081/subjects/orders-value/versions
```
Enregistrer le schéma manuellement via l'API REST ou le producteur avec `auto.register.schemas=true`
Erreur : Failed to deserialize Avro message
Cause : Message écrit sans schéma ou corrompu
**Solution :**
1. Vérifier l'ID de schéma dans l'en-tête du message : les 5 premiers octets devraient être `[0x00][schema_id]`
2. Vérifier la disponibilité de Schema Registry
3. S'assurer que le producteur a utilisé le sérialiseur Avro correctement
4. Vérifier les incompatibilités d'ID de schéma (ancien schéma en cache)
Exemple Complet : Service de Commandes avec Évolution de Schéma¶
Prochaines Étapes¶
Maintenant que vous comprenez la gestion des schémas, explorez :
- Stream Processing - Transformer les données avec Kafka Streams et Flink
- Kafka Connect - Utiliser Schema Registry avec les connecteurs source/sink
- Produire des Messages - Optimiser la configuration du producteur
Ressources Additionnelles¶
Documentation Officielle¶
- Documentation Confluent Schema Registry
- Référence API REST Schema Registry
- Options de Configuration Schema Registry
Formats de Schémas¶
Tutoriels & Meilleures Pratiques¶
- Tutoriel Schema Registry - Cours gratuit Confluent
- Meilleures Pratiques d'Évolution de Schéma
- Guide Sérialisation et Schema Registry
- Patterns de Validation de Schéma
Sujets Avancés¶
Attribution du Cours
Ce guide est basé sur le contenu de la documentation Confluent Schema Registry, la spécification Apache Avro, et les meilleures pratiques de l'industrie.