Use Kafka Schema Registry¶
Learn how to manage data schemas, ensure compatibility, and serialize data efficiently with Kafka Schema Registry.
What You'll Learn¶
By the end of this guide, you'll be able to:
- ✅ Understand why schema management is critical for data integrity
- ✅ Set up and configure Kafka Schema Registry
- ✅ Register and evolve schemas with compatibility rules
- ✅ Serialize and deserialize data using Avro, Protobuf, and JSON Schema
- ✅ Integrate Schema Registry with producers and consumers
- ✅ Handle schema evolution in production systems
Prerequisites:
- Docker and Docker Compose installed
- Basic understanding of producers and consumers
- Familiarity with data serialization formats
Estimated Time: 30 minutes
Quick Start: Run Kafka with Schema Registry¶
Use this Docker Compose configuration to run Kafka with KRaft and Schema Registry:
- Confluent Schema Registry image - compatible with Apache Kafka
- Schema Registry REST API port
- Hostname for service discovery
- Kafka bootstrap servers for storing schemas
- REST API endpoint for schema operations
- Internal Kafka topic for storing schemas (created automatically)
- Replication factor 1 for development (use 3+ in production)
Start Services¶
# Start Kafka and Schema Registry
docker-compose up -d
# Wait for services to be ready
docker-compose ps
# Verify Schema Registry is running
curl http://localhost:8081/
# Check subjects (should be empty initially)
curl http://localhost:8081/subjects
# Stop services
docker-compose down
Test Schema Registration
```bash # Register a simple Avro schema curl -X POST http://localhost:8081/subjects/test-value/versions \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{ "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}" }'
# List subjects (should show "test-value")
curl http://localhost:8081/subjects
```
Production Considerations
For production deployments: - Run multiple Schema Registry instances (3+) for high availability - Set replication factor to 3 for _schemas topic - Enable authentication (basic auth or mutual TLS) - Use HTTPS for Schema Registry REST API - Monitor Schema Registry availability and latency
What is Schema Registry?¶
Schema Registry is a centralized repository for managing and validating data schemas. It ensures that producers and consumers agree on the structure of messages, preventing data corruption and enabling safe schema evolution.
Why Use Schema Registry?
Without Schema Registry, incompatible schema changes can break consumers. Schema Registry enforces compatibility rules that prevent breaking changes from being deployed.
graph LR
A[Producer] -->|1. Register Schema| B[Schema Registry]
B -->|2. Return Schema ID| A
A -->|3. Send Message<br/>with Schema ID| C[Kafka Topic]
D[Consumer] -->|4. Fetch Message| C
D -->|5. Get Schema by ID| B
B -->|6. Return Schema| D
D -->|7. Deserialize| D
!!! tip "Schema Registry Benefits" - Data Integrity: Prevents incompatible data from entering topics - Schema Evolution: Supports adding/removing fields without breaking consumers - Bandwidth Savings: Messages only contain a small schema ID instead of full schema - Compatibility Enforcement: Multiple compatibility modes (backward, forward, full)
Step 1: Set Up Schema Registry¶
Installation¶
- Schema Registry service - stores schemas in internal Kafka topic
_schemas - Default Schema Registry REST API port
- Hostname for Schema Registry service discovery
- Connect to Kafka cluster to store schemas
- REST API endpoint for schema operations
# Download Confluent Platform
wget https://packages.confluent.io/archive/7.5/confluent-7.5.0.tar.gz # (1)!
tar -xzf confluent-7.5.0.tar.gz
cd confluent-7.5.0
# Configure Schema Registry
cat <<EOF > etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081 # (2)!
kafkastore.bootstrap.servers=localhost:9092 # (3)!
kafkastore.topic=_schemas # (4)!
debug=false
EOF
# Start Schema Registry
bin/schema-registry-start etc/schema-registry/schema-registry.properties
- Download Confluent Platform which includes Schema Registry
- REST API endpoint - accessible at
http://localhost:8081 - Kafka cluster connection for storing schemas
- Internal topic for schema storage (created automatically)
Verify Installation¶
# Check Schema Registry health
curl http://localhost:8081/ # (1)!
# List subjects (should be empty initially)
curl http://localhost:8081/subjects # (2)!
- Should return
{}if Schema Registry is running - Returns list of registered schema subjects (topics + key/value suffix)
Schema Registry is Ready
If both commands succeed, Schema Registry is running and ready to manage schemas.
Step 2: Choose a Serialization Format¶
Schema Registry supports three serialization formats. Choose based on your use case:
- Avro
Best For: General-purpose data serialization
Pros: - Compact binary format - Rich data types (unions, maps, arrays) - Mature ecosystem
Cons: - Requires code generation (optional) - Less human-readable
- :material-buffer: Protobuf
Best For: High-performance microservices
Pros: - Fastest serialization/deserialization - Backward and forward compatible by design - Strong typing with code generation
Cons: - Requires .proto files - Limited dynamic schema support
- JSON Schema
Best For: Human-readable data, APIs
Pros: - Human-readable format - No code generation required - Easy debugging
Cons: - Larger message size - Slower parsing - Less compact than Avro/Protobuf
Most Common Choice
Avro is the most popular format for Kafka because it balances compactness, flexibility, and schema evolution. This guide focuses on Avro, with examples for Protobuf and JSON Schema.
Step 3: Register a Schema¶
Define an Avro Schema¶
Create a schema for an Order event:
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{ "name": "order_id", "type": "string" }, // (1)!
{ "name": "customer_id", "type": "string" },
{ "name": "amount", "type": "double" },
{ "name": "timestamp", "type": "long" } // (2)!
]
}
- Unique order identifier - required field
- Unix timestamp in milliseconds - required field
Register the Schema¶
curl -X POST http://localhost:8081/subjects/orders-value/versions \ # (1)!
-H "Content-Type: application/vnd.schemaregistry.v1+json" \ # (2)!
-d '{
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.example.orders\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"customer_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}" # (3)!
}'
- Subject name:
<topic>-<key|value>- hereorders-valuefor message values - Required content type for Schema Registry API
- Schema definition as JSON-encoded string (note escaped quotes)
Response:
- Schema ID - used to reference this schema in messages
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
# Configure Schema Registry client
sr_client = SchemaRegistryClient({
'url': 'http://localhost:8081' # (1)!
})
# Define Avro schema
avro_schema = """
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"}
]
}
"""
# Register schema
schema_id = sr_client.register_schema( # (2)!
subject_name='orders-value',
schema=Schema(avro_schema, schema_type='AVRO')
)
print(f"Schema registered with ID: {schema_id}")
- Schema Registry endpoint - adjust for your deployment
- Returns schema ID which is cached for future lookups
Verify Registration¶
# List all subjects
curl http://localhost:8081/subjects
# Get latest schema for subject
curl http://localhost:8081/subjects/orders-value/versions/latest # (1)!
# Get schema by ID
curl http://localhost:8081/schemas/ids/1 # (2)!
- Returns latest version of schema for
orders-valuesubject - Returns schema definition for schema ID 1
Step 4: Produce Messages with Schema¶
Using Avro Serializer¶
- Full Avro schema definition (same as registered in Schema Registry)
- Avro serializer - automatically looks up schema ID from Schema Registry
- Assign Avro serializer to value serializer (key can use String)
- Python dictionary matching Avro schema structure
- Producer automatically serializes value using Avro schema
- Message sent as:
[magic_byte][schema_id][avro_binary_data]
- Confluent's Kafka Avro Serializer - handles schema registry integration
- Schema Registry URL for schema lookup and registration
- GenericRecord represents Avro data without code generation
- Serializer automatically fetches schema ID and serializes to binary
Schema Validation at Produce Time
If the message doesn't match the schema, the producer will throw a SerializationException before sending to Kafka. This prevents invalid data from entering the pipeline.
Step 5: Consume Messages with Schema¶
- Avro deserializer - reads schema ID from message and fetches schema
- Optional: provide schema for validation (deserializer can fetch from registry)
- Assign Avro deserializer to value deserializer
- Message automatically deserialized to Python dictionary
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.avro.generic.GenericRecord;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processor");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class); // (1)!
props.put("schema.registry.url", "http://localhost:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord order = record.value(); // (2)!
System.out.println("Order: " + order.get("order_id"));
}
}
- Kafka Avro Deserializer - automatically fetches schema from registry
- Value automatically deserialized to Avro GenericRecord
Step 6: Schema Evolution¶
Compatibility Modes¶
Schema Registry supports four compatibility modes:
| Mode | Description | Allowed Changes |
|---|---|---|
| BACKWARD | New schema can read old data | Delete fields, add optional fields |
| FORWARD | Old schema can read new data | Add fields, delete optional fields |
| FULL | Both backward and forward compatible | Add/delete optional fields only |
| NONE | No compatibility checks | Any change allowed (⚠️ dangerous) |
Default Compatibility Mode
Schema Registry uses BACKWARD compatibility by default, which is the most common choice. Consumers can upgrade independently without breaking.
Configure Compatibility Mode¶
# Set compatibility mode for subject
curl -X PUT http://localhost:8081/config/orders-value \ # (1)!
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}' # (2)!
# Set global default compatibility
curl -X PUT http://localhost:8081/config \ # (3)!
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'
- Set compatibility mode for specific subject
- Compatibility mode: BACKWARD, FORWARD, FULL, or NONE
- Set default compatibility mode for all new subjects
Evolve the Schema (Add Optional Field)¶
Let's add a status field to the Order schema:
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{ "name": "order_id", "type": "string" },
{ "name": "customer_id", "type": "string" },
{ "name": "amount", "type": "double" },
{ "name": "timestamp", "type": "long" },
{ "name": "status", "type": "string", "default": "PENDING" } // (1)!
]
}
- New optional field with default value - backward compatible
# Register new version
curl -X POST http://localhost:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{...}"}' # (1)!
# Response: {"id": 2} # (2)!
- New schema with added
statusfield - New schema ID assigned - old consumers can still read new messages
!!! success "Schema Evolution in Action" - Old consumers (using schema v1) can read new messages - they ignore the status field - New producers (using schema v2) can write with status field - No downtime required - gradual migration possible
Breaking Changes (Incompatible)¶
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{ "name": "order_id", "type": "int" }, // (1)!
{ "name": "customer_id", "type": "string" },
{ "name": "amount", "type": "double" }
]
}
- Changed
order_idtype fromstringtoint- BREAKING CHANGE
# Try to register incompatible schema
curl -X POST http://localhost:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{...}"}'
# Response: HTTP 409 Conflict # (1)!
# {
# "error_code": 409,
# "message": "Schema being registered is incompatible with an earlier schema"
# }
- Schema Registry rejects incompatible schemas - prevents breaking consumers
Step 7: Using Protobuf and JSON Schema¶
Protobuf Example¶
syntax = "proto3";
package com.example.orders;
message Order {
string order_id = 1; // (1)!
string customer_id = 2;
double amount = 3;
int64 timestamp = 4;
}
- Field numbers must never change - used for backward compatibility
curl -X POST http://localhost:8081/subjects/orders-proto-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "PROTOBUF", # (1)!
"schema": "syntax = \"proto3\"; package com.example.orders; message Order { string order_id = 1; string customer_id = 2; double amount = 3; int64 timestamp = 4; }"
}'
- Specify
PROTOBUFschema type (default is AVRO)
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
protobuf_serializer = ProtobufSerializer( # (1)!
msg_type=Order, # (2)!
schema_registry_client=sr_client
)
producer = SerializingProducer({
'bootstrap.servers': 'localhost:9092',
'value.serializer': protobuf_serializer
})
- Protobuf serializer - similar to Avro serializer
- Generated Protobuf class from
.protofile
JSON Schema Example¶
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"order_id": {"type": "string"}, // (1)!
"customer_id": {"type": "string"},
"amount": {"type": "number"},
"timestamp": {"type": "integer"}
},
"required": ["order_id", "customer_id", "amount", "timestamp"] // (2)!
}
- JSON Schema type definitions - standard JSON Schema format
- Required fields - enforced at serialization time
from confluent_kafka.schema_registry.json_schema import JSONSerializer
json_serializer = JSONSerializer(
schema_str=json_schema_str,
schema_registry_client=sr_client
)
producer = SerializingProducer({
'bootstrap.servers': 'localhost:9092',
'value.serializer': json_serializer # (1)!
})
# Send JSON message
order = { # (2)!
'order_id': 'ORD-12345',
'customer_id': 'CUST-67890',
'amount': 99.99,
'timestamp': 1702834567000
}
producer.produce('orders', value=order)
- JSON Schema serializer - validates against JSON Schema
- Plain Python dictionary - serialized to JSON with schema validation
Best Practices¶
- DO
- Use BACKWARD or FULL compatibility for production
- Add default values to new fields for backward compatibility
- Test schema evolution in staging before production
- Version schemas semantically (v1, v2, etc.)
-
Use Avro for general-purpose data serialization
-
DON'T
- Change field types without creating a new topic
- Use
NONEcompatibility mode in production - Remove required fields without migration plan
- Deploy breaking schema changes without coordination
- Hardcode schema IDs in application code
Production Checklist¶
- Schema Registry deployed with HA (multiple replicas)
- Compatibility mode configured (
BACKWARDorFULL) - Schema validation enabled in producers
- Monitoring for schema registry availability
- Schema versioning strategy documented
- Rollback plan for schema changes
- Security: SSL + authentication enabled (see Schema Registry Security)
Troubleshooting¶
Error: Schema being registered is incompatible
Cause: New schema violates compatibility rules
**Solution:**
1. Check compatibility mode: `curl http://localhost:8081/config/orders-value`
2. Review schema changes - ensure backward compatibility
3. Add default values to new fields
4. If breaking change is required:
- Create a new topic with new schema
- Migrate consumers gradually
- Use Kafka Streams for data transformation
Error: Subject not found
Cause: Schema not registered for topic
**Solution:**
```bash
# List all subjects
curl http://localhost:8081/subjects
# Check if subject exists
curl http://localhost:8081/subjects/orders-value/versions
```
Register schema manually using REST API or producer with `auto.register.schemas=true`
Error: Failed to deserialize Avro message
Cause: Message written without schema or corrupted
**Solution:**
1. Verify schema ID in message header: first 5 bytes should be `[0x00][schema_id]`
2. Check Schema Registry availability
3. Ensure producer used Avro serializer correctly
4. Check for schema ID mismatch (old cached schema)
Complete Example: Order Service with Schema Evolution¶
What's Next?¶
Now that you understand schema management, explore:
- Stream Processing - Transform data with Kafka Streams and Flink
- Kafka Connect - Use Schema Registry with source/sink connectors
- Produce Messages - Optimize producer configuration
Additional Resources¶
Official Documentation¶
- Confluent Schema Registry Documentation
- Schema Registry REST API Reference
- Schema Registry Configuration Options
Schema Formats¶
Tutorials & Best Practices¶
- Schema Registry Tutorial - Free Confluent course
- Schema Evolution Best Practices
- Serialization and Schema Registry Guide
- Schema Validation Patterns
Advanced Topics¶
Course Attribution
This guide is based on content from Confluent Schema Registry documentation, Apache Avro specification, and industry best practices.