How to Produce Messages to Apache Kafka¶
This guide walks you through the process of writing data into Apache Kafka topics using Kafka producers.
Prerequisites¶
Before you begin, ensure you have:
- Docker and Docker Compose installed
- Development environment set up (JDK 11+, Python 3.8+, or Node.js 16+)
- Basic understanding of Kafka concepts
Quick Start: Run Kafka with Docker Compose¶
Use this Docker Compose configuration to run a single-node Kafka cluster with KRaft (no Zookeeper needed):
- Confluent Platform Kafka image - includes additional tools and enterprise features
- Kafka broker port exposed to host
- Unique node ID in the KRaft cluster
- This node acts as both broker and controller (combined mode)
- Listeners: PLAINTEXT for clients, CONTROLLER for internal cluster communication
- Advertised listener for external clients (use
localhostfor local development) - Quorum voters for KRaft consensus (format:
id@host:port) - Unique cluster ID - generate with
docker run confluentinc/cp-kafka kafka-storage random-uuid - Log directory for Kafka data storage (Confluent default path)
- Disable Confluent Metrics Reporter for minimal development setup
Start Kafka¶
# Start Kafka
docker-compose up -d
# Wait for Kafka to be ready (check health status)
docker-compose ps
# View logs
docker-compose logs -f kafka
# Stop Kafka
docker-compose down
Verify Kafka is Running
```bash # Create a test topic docker exec kafka-kraft kafka-topics \ --bootstrap-server localhost:9092 \ --create --topic test-topic --partitions 3 --replication-factor 1
# List topics
docker exec kafka-kraft kafka-topics \
--bootstrap-server localhost:9092 --list
```
Production Considerations
This is a single-node setup for development. For production: - Use at least 3 Kafka nodes for high availability - Configure separate controller and broker nodes - Use persistent volumes for data storage - Enable authentication and encryption (SASL/SSL)
Understanding Producers¶
What is a Producer?
Producers are client applications responsible for writing data into Kafka topics. Any application that pushes data—whether telemetry, logs, events, or user activity—into Kafka is considered a producer.
graph LR
A[Producer Application] -->|Write Events| B[Kafka Topic]
B --> C[Partition 0]
B --> D[Partition 1]
B --> E[Partition 2] Step 1: Configure Your Producer¶
Basic Configuration Properties¶
# Connection
bootstrap.servers=localhost:9092 # (1)!
# Serialization
key.serializer=org.apache.kafka.common.serialization.StringSerializer # (2)!
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Optional: Performance tuning
acks=all # (3)!
retries=3
linger.ms=1
- List of broker addresses for initial cluster discovery
- Converts your key/value objects to byte arrays
- Wait for all replicas to acknowledge (highest durability)
config = {
'bootstrap.servers': 'localhost:9092', # (1)!
'acks': 'all', # (2)!
'retries': 3,
'linger.ms': 1
}
- Broker addresses (comma-separated if multiple)
- Wait for all in-sync replicas
Configuration Reference¶
| Property | Purpose | Common Values |
|---|---|---|
bootstrap.servers | List of broker addresses | localhost:9092 or multiple hosts |
key.serializer | How to convert keys to bytes | StringSerializer, IntegerSerializer |
value.serializer | How to convert values to bytes | StringSerializer, JsonSerializer |
acks | Acknowledgment level | 0, 1, all |
retries | Number of retry attempts | 0 to Integer.MAX_VALUE |
Bootstrap Servers Best Practice
You don't need to list all brokers—just enough for initial discovery. The producer will automatically learn about the rest of the cluster.
Step 2: Create a Producer Instance¶
| simple_producer.py | |
|---|---|
Step 3: Send Messages¶
Sending Strategies¶
No Delivery Guarantee
Fire-and-forget provides no confirmation. Messages may be lost if failures occur.
Strong Guarantee
Blocks until acknowledgment received. Use for critical messages.
try {
RecordMetadata metadata = producer.send(record).get(); // (1)!
System.out.printf("✓ Sent to partition %d at offset %d%n",
metadata.partition(),
metadata.offset());
} catch (Exception e) {
System.err.println("✗ Failed: " + e.getMessage());
}
-
.get()blocks until completed
try:
producer.produce('events', key='key1', value='value1')
producer.flush() # (1)!
print("✓ Message sent successfully")
except Exception as e:
print(f"✗ Failed: {e}")
- Blocks until all messages delivered
// Node.js doesn't have native sync sends
// Use async with Promise wrapper
function sendSync(producer, topic, key, value) {
return new Promise((resolve, reject) => {
producer.produce(
topic, null, Buffer.from(value), key,
Date.now(),
(err, offset) => {
if (err) reject(err);
else resolve(offset);
}
);
producer.flush(10000, (err) => {
if (err) reject(err);
});
});
}
// Usage
try {
const offset = await sendSync(producer, 'events', 'key1', 'value1');
console.log(`✓ Sent to offset ${offset}`);
} catch (err) {
console.error(`✗ Failed: ${err}`);
}
deliveryChan := make(chan kafka.Event)
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte("key1"),
Value: []byte("value1"),
}, deliveryChan)
if err != nil {
fmt.Printf("✗ Failed: %v\n", err)
}
// Wait for delivery report
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("✗ Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("✓ Sent to partition %d at offset %v\n",
m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
Best Practice
Asynchronous with callbacks provides the best balance of performance and reliability.
producer.send(record, (metadata, exception) -> { // (1)!
if (exception == null) {
System.out.printf("✓ Sent to partition %d, offset %d%n",
metadata.partition(),
metadata.offset());
} else {
System.err.println("✗ Failed: " + exception.getMessage());
}
});
- Callback executed when send completes
def delivery_callback(err, msg): # (1)!
if err:
print(f"✗ Failed: {err}")
else:
print(f"✓ Sent to partition {msg.partition()}, "
f"offset {msg.offset()}")
producer.produce(
'events',
key='key1',
value='value1',
callback=delivery_callback # (2)!
)
producer.poll(0) # Trigger callbacks
- Callback function for delivery reports
- Attach callback to this message
producer.produce(
'events', // topic
null, // partition (null = automatic)
Buffer.from('value1'), // value
'key1', // key
Date.now(), // timestamp
(err, offset) => { // (1)!
if (err) {
console.error('✗ Failed:', err);
} else {
console.log(`✓ Sent to offset ${offset}`);
}
}
);
- Callback for delivery confirmation
deliveryChan := make(chan kafka.Event, 10000) // (1)!
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte("key1"),
Value: []byte("value1"),
}, deliveryChan)
// Handle delivery reports asynchronously
go func() {
for e := range deliveryChan {
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("✗ Failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("✓ Sent to partition %d at offset %v\n",
m.TopicPartition.Partition,
m.TopicPartition.Offset)
}
}
}()
- :material-buffer: Buffered channel for non-blocking sends
Step 4: Control Message Routing¶
Partition Assignment Strategies¶
graph TD
A{Message has Key?} -->|Yes| B[Hash Key]
A{Message has Key?} -->|No| C[Round-Robin]
B --> D["Same Key → Same Partition<br/>(Order Preserved)"]
C --> E[Distribute Evenly<br/>Across Partitions]
Routing Examples¶
Order Guarantee
Messages with the same key always go to the same partition, preserving order.
// All messages for user-123 go to same partition
ProducerRecord<String, String> record1 =
new ProducerRecord<>("events", "user-123", "login");
ProducerRecord<String, String> record2 =
new ProducerRecord<>("events", "user-123", "purchase");
producer.send(record1); // → Partition 2 (example)
producer.send(record2); // → Partition 2 (same!)
// Same key routes to same partition
key := []byte("user-123")
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic},
Key: key,
Value: []byte("login"),
}, nil)
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic},
Key: key,
Value: []byte("purchase"),
}, nil)
// 1. Configure custom partitioner
config.put("partitioner.class", "com.example.CustomPartitioner");
// 2. Implement Partitioner interface
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
// Route high-priority messages to partition 0
if (value.toString().contains("URGENT")) {
return 0; // (1)!
}
// Default hash-based routing for others
return Math.abs(key.hashCode()) %
cluster.partitionCountForTopic(topic);
}
// ... other required methods
}
- Urgent messages get dedicated partition for faster processing
def custom_partitioner(key, all_partitions, available):
"""
Custom partitioner function.
"""
# Route URGENT messages to partition 0
if key and b'URGENT' in key:
return 0
# Default hash partitioning for others
return hash(key) % len(all_partitions)
# Configure producer with custom partitioner
config = {
'bootstrap.servers': 'localhost:9092',
'partitioner': custom_partitioner
}
// Go uses librdkafka which supports custom partitioners
// via the 'partitioner' configuration option
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"partitioner": "murmur2_random", // or custom
}
// For truly custom logic, calculate partition manually:
partition := customPartitionLogic(key, value)
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: int32(partition),
},
Key: key,
Value: value,
}, nil)
Step 5: Configure Acknowledgments¶
Understanding acks¶
graph LR
A[Producer] -->|acks=0| B["No Wait<br/>⚡ Fastest"]
A -->|acks=1| C["Leader Only<br/>🚀 Balanced"]
A -->|acks=all| D["Leader + All Replicas<br/>✅ Most Durable"]
Comparison Matrix¶
| Setting | Durability | Latency | Throughput | Use Case |
|---|---|---|---|---|
acks=0 | ❌ None | ⚡ Lowest | 🚀 Highest | Fire-and-forget logs |
acks=1 | ⚠️ Medium | 🚀 Medium | 🚀 High | General purpose |
acks=all | ✅ Highest | 🐢 Highest | 🐢 Lowest | Financial transactions |
Configuration Example¶
# Wait for all in-sync replicas (1)
acks=all
# Require at least 2 replicas (2)
min.insync.replicas=2
# Retry on failure (3)
retries=3
- Producer waits for all replicas to acknowledge
- At least 2 replicas must be in-sync
- Automatically retry failed sends
Step 6: Handle Errors and Retries¶
Common Errors¶
TimeoutException
Cause: Broker unreachable or network issues
**Solutions:**
- Check network connectivity and firewall rules
- Verify `bootstrap.servers` configuration
- Increase `request.timeout.ms`
RecordTooLargeException
Cause: Message exceeds broker's message.max.bytes
**Solutions:**
- Reduce message size or split into smaller messages
- Increase broker config: `message.max.bytes`
- Increase producer config: `max.request.size`
SerializationException
Cause: Invalid data format for serializer
**Solutions:**
- Validate data before producing
- Check serializer configuration
- Use Schema Registry for validation
Retry Configuration¶
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// Kafka will retry automatically (1)
log.warn("Retriable error: {}", exception.getMessage());
} else {
// Non-retriable - handle manually (2)
log.error("Fatal error: {}", exception.getMessage());
// Store to dead-letter queue or alert
deadLetterQueue.send(record);
}
}
});
- Transient errors (network, broker temporarily down)
- Permanent errors (invalid data, auth failure)
def delivery_callback(err, msg):
if err:
if err.code() == KafkaError._MSG_TIMED_OUT:
# Retriable error
logger.warning(f"Timeout: {err}")
else:
# Non-retriable error
logger.error(f"Fatal: {err}")
# Send to DLQ
dlq_producer.produce('dead-letter-queue', msg.value())
else:
logger.info(f"✓ Delivered: {msg.offset()}")
for e := range deliveryChan {
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
err := m.TopicPartition.Error
// Check if retriable
if err.(kafka.Error).IsRetriable() {
log.Printf("Retriable error: %v", err)
} else {
log.Printf("Fatal error: %v", err)
// Send to DLQ
}
} else {
log.Printf("✓ Delivered to offset %v",
m.TopicPartition.Offset)
}
}
Step 7: Optimize Performance¶
Batching Configuration¶
Batching Benefits
Grouping messages into batches dramatically improves throughput by reducing network overhead.
sequenceDiagram
participant P as Producer
participant B as Batch Buffer
participant K as Kafka Broker
P->>B: Message 1
P->>B: Message 2
P->>B: Message 3
Note over B: Wait linger.ms<br/>or batch fills
B->>K: Send Batch (3 messages)<br/>1 network call instead of 3!
K-->>B: Ack (all 3) # Batch size in bytes (16KB default)
batch.size=16384 # (1)!
# Wait time before sending partial batch (0ms = send immediately)
linger.ms=10 # (2)!
# Compression
compression.type=snappy # (3)!
- Larger batches = better throughput (but more memory)
- Trade a little latency for better batching
- Compress batches to save network bandwidth
Compression Comparison¶
- None
- Ratio: 1x (no compression)
- CPU: Minimal
-
Best for: Already compressed data
-
Snappy ⭐
- Ratio: 2-3x
- CPU: Low
-
Best for: General purpose (recommended)
-
LZ4
- Ratio: 2-3x
- CPU: Low
-
Best for: Low-latency requirements
-
GZIP
- Ratio: 4-5x
- CPU: High
-
Best for: Storage optimization
-
ZSTD
- Ratio: 3-5x
- CPU: Medium
- Best for: Best balance of ratio/speed
Complete Working Example¶
Hands-on Exercise¶
Practice with Confluent's Interactive Lab
Ready to try it yourself? The official Confluent hands-on exercise provides a pre-configured environment with real-time validation.
What you'll get:
- Pre-configured Kafka cluster
- Step-by-step instructions
- Multi-language examples
- Real-time validation
Best Practices Summary¶
- DO
- Use
acks=allfor critical data - Enable compression (
snappyorzstd) - Send asynchronously with callbacks
- Close producers gracefully (
flush()thenclose()) - Monitor producer metrics
-
Use keys for ordered messages
-
DON'T
- Use synchronous sends in high-throughput scenarios
- Ignore callback exceptions
- Create a new producer per message (reuse instances!)
- Send sensitive data without encryption
- Use
acks=0for important data - Forget to call
flush()before shutdown
Troubleshooting¶
Producer Hangs / Blocks Forever
Symptoms: Producer blocks indefinitely on send() or flush()
Solutions:
Low Throughput / Slow Sends
Symptoms: Messages sent at a very slow rate
Solutions:
Messages Out of Order
Symptoms: Messages arrive in wrong sequence
Solutions:
Connection Refused
Symptoms: Cannot connect to brokers
Checklist:
- Kafka brokers are running (
jpsordocker ps) -
bootstrap.serversmatches actual broker addresses - Firewall allows connection on port 9092
- For Docker: using correct network/host configuration
Next Steps¶
- Consume Messages
Learn how to read data from Kafka topics using consumers
- :material-schema: Schema Registry
Add schema validation and evolution to your data pipelines
- Core Concepts
Deep dive into Kafka architecture and internals