Consuming Messages from Apache Kafka¶
Overview¶
Learn how to build Kafka consumer applications that read and process event streams from Kafka topics. This guide covers consumer configuration, offset management, consumer groups for parallel processing, and best practices for building reliable streaming applications.
!!! info "What You'll Learn" - Configure and create Kafka consumers - Subscribe to topics and poll for messages - Manage consumer offsets for fault tolerance - Scale with consumer groups - Implement error handling and recovery
Prerequisites¶
- Docker and Docker Compose installed
- Basic understanding of Kafka topics and partitions
- Development environment with Kafka client libraries
Quick Start: Run Kafka with Docker Compose¶
Use this Docker Compose configuration to run a single-node Kafka cluster with KRaft (no Zookeeper needed):
- Confluent Platform Kafka image (includes KRaft mode)
- Kafka broker port exposed to host
- Unique node ID in the KRaft cluster
- This node acts as both broker and controller (combined mode)
- Listeners: PLAINTEXT for clients, CONTROLLER for internal cluster communication
- Advertised listener for external clients
- Quorum voters for KRaft consensus (format:
id@host:port) - Unique cluster ID - generate with
docker run confluentinc/cp-kafka kafka-storage random-uuid - Log directory for Kafka data storage
- Disable Confluent Support metrics collection for minimal local setup
Start Kafka¶
# Start Kafka
docker-compose up -d
# Wait for Kafka to be ready
docker-compose ps
# Create a topic with some test data
docker exec kafka-kraft kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic orders --partitions 3 --replication-factor 1
# Produce test messages
docker exec -i kafka-kraft kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic orders << EOF
{"order_id": "ORD-001", "amount": 99.99}
{"order_id": "ORD-002", "amount": 149.50}
{"order_id": "ORD-003", "amount": 75.25}
EOF
# Stop Kafka
docker-compose down
Verify Messages
bash # Consume messages from beginning docker exec kafka-kraft kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic orders --from-beginning --max-messages 3
Consumer Fundamentals¶
What is a Kafka Consumer?¶
Consumer Definition
A consumer is a client application that reads data from Kafka topics. Consumers:
- Pull messages from one or more topics
- Process events in real-time or near-real-time
- Track their reading position using offsets
- Can scale horizontally using consumer groups
graph LR
T[Topic: orders<br/>3 Partitions] --> C1[Consumer 1<br/>Reads Partition 0]
T --> C2[Consumer 2<br/>Reads Partition 1]
T --> C3[Consumer 3<br/>Reads Partition 2]
C1 --> P[Processing<br/>Logic]
C2 --> P
C3 --> P
Step 1: Configure Your Consumer¶
Required Configuration¶
All consumers need basic configuration to connect to the Kafka cluster:
- Kafka broker addresses - specify multiple for redundancy
- Consumer group ID - consumers with same ID share partition load
- Deserializer for message keys - converts bytes back to objects
- Deserializer for message values - must match producer's serializer
- Where to start reading if no offset exists:
earliestorlatest
| consumer_config.py | |
|---|---|
- Kafka broker addresses - can be a list for multiple brokers
- Consumer group ID - enables load balancing across consumers
- Deserialize keys from bytes to string
- Deserialize values from bytes to string
- Read from beginning if no previous offset
- Array of broker addresses for cluster connection
- Consumer group ID for coordinated consumption
- KafkaJS handles deserialization automatically for strings
- Kafka cluster broker addresses
- Consumer group identifier
- Offset reset strategy for new consumers
Optional Configuration¶
Performance Tuning
Consider these optional settings for production:
| Parameter | Description | Default | Recommendation |
|-----------|-------------|---------|----------------|
| `fetch.min.bytes` | Minimum data per fetch request | 1 | Increase for throughput |
| `fetch.max.wait.ms` | Max wait time if min bytes not met | 500ms | Balance latency vs throughput |
| `max.poll.records` | Max records per poll | 500 | Tune based on processing time |
| `enable.auto.commit` | Auto-commit offsets | `true` | `false` for manual control |
| `session.timeout.ms` | Consumer heartbeat timeout | 10s | Increase for slow processing |
Step 2: Subscribe to Topics¶
Single Topic Subscription¶
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to single topic
consumer.subscribe(Collections.singletonList("orders")); // (1)!
- Pass a list even for single topic - API expects Collection
- List notation even for single topic
Multiple Topics¶
Pattern-Based Subscription¶
Advanced Feature
Subscribe to topics matching a regex pattern - useful for dynamic topic creation but requires careful management.
Step 3: Poll for Messages¶
The Poll Loop¶
Infinite Polling
Streaming applications run indefinitely, continuously polling for new messages. This is the normal behavior for event-driven systems.
- Infinite loop - normal for streaming applications
- Poll with timeout - returns immediately if messages available
- Iterate over batch of records returned
- Which topic this message came from
- Partition number (0-indexed)
- Offset position within partition
- Message key (can be null)
- Message value - your actual data
- Clean shutdown on application exit
| consumer_loop.py | |
|---|---|
- Python's iterator pattern - cleaner than while loop
- Your business logic here
- Graceful shutdown
- Async consumption with automatic batching
- Callback per message
- Optional chaining - key might be null
- Convert Buffer to string
- Async processing supported
- Infinite loop for continuous polling
- Blocking read with -1 timeout (wait indefinitely)
- Your processing logic
Step 4: Manage Consumer Offsets¶
Understanding Offsets¶
Offset Tracking
Kafka tracks the offset (position) of each message a consumer has processed. This enables:
- **Fault tolerance**: Resume from last position after crash
- **Exactly-once semantics**: Avoid reprocessing messages
- **Replayability**: Rewind to earlier offset if needed
sequenceDiagram
participant C as Consumer
participant K as Kafka Broker
participant T as __consumer_offsets topic
C->>K: Poll messages (offset 100-109)
K->>C: Return 10 messages
C->>C: Process messages
C->>K: Commit offset 110
K->>T: Store offset 110 for group
Note over C: Consumer crashes
C->>K: Consumer restarts, fetch offset
T->>K: Return offset 110
K->>C: Resume from offset 110 Auto vs Manual Commit¶
- Auto-Commit (Default)
Pros: - Simple - no code needed - Automatic periodic commits
Cons: - Risk of duplicate processing - No control over timing
- Manual Commit (Recommended)
Pros: - Precise control - At-least-once guarantee - Commit after successful processing
Cons: - More code complexity - Must handle explicitly
Manual Commit Examples¶
props.put("enable.auto.commit", "false"); // (1)!
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processOrder(record.value());
// Commit after successful processing
consumer.commitSync(); // (2)!
} catch (Exception e) {
// Handle error - don't commit on failure
log.error("Processing failed", e); // (3)!
}
}
}
- Disable auto-commit for manual control
- Synchronous commit - blocks until acknowledged
- Failed messages won't have offsets committed
consumer = KafkaConsumer(
'orders',
enable_auto_commit=False # (1)!
)
for message in consumer:
try:
process_order(message.value)
consumer.commit() # (2)!
except Exception as e:
print(f"Error processing: {e}") # (3)!
# Don't commit - will retry on restart
- Manual commit mode
- Commit after successful processing
- Skip commit on error for retry
await consumer.run({
autoCommit: false, // (1)!
eachMessage: async ({ message, heartbeat, pause }) => {
try {
await processOrder(message.value.toString());
await consumer.commitOffsets([{ // (2)!
topic: 'orders',
partition: message.partition,
offset: (parseInt(message.offset) + 1).toString()
}]);
} catch (error) {
console.error('Processing failed:', error);
pause(); // (3)!
setTimeout(() => consumer.resume([{ topic: 'orders' }]), 5000);
}
}
});
- Disable auto-commit
- Manual offset commit after success
- Pause and retry on error
config := &kafka.ConfigMap{
"enable.auto.commit": false, // (1)!
}
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
continue
}
if err := processOrder(string(msg.Value)); err == nil {
_, err := consumer.CommitMessage(msg) // (2)!
if err != nil {
log.Printf("Commit failed: %v\n", err)
}
} else {
log.Printf("Processing failed: %v\n", err) // (3)!
}
}
- Manual commit mode
- Commit specific message after processing
- Don't commit on failure
Step 5: Scale with Consumer Groups¶
Consumer Group Concepts¶
Horizontal Scaling
Consumer groups enable parallel processing by distributing partitions across multiple consumer instances.
**Key Rules:**
- Consumers in same group share partition load
- Each partition assigned to **one** consumer in the group
- Multiple groups can read same topic independently
graph TB
subgraph "Topic: orders (3 partitions)"
P0[Partition 0]
P1[Partition 1]
P2[Partition 2]
end
subgraph "Consumer Group: order-processing"
C1[Consumer 1]
C2[Consumer 2]
C3[Consumer 3]
end
P0 --> C1
P1 --> C2
P2 --> C3
Rebalancing¶
Automatic Rebalancing
When consumers join or leave a group, Kafka automatically rebalances partition assignments.
**Triggers:**
- New consumer joins group
- Existing consumer fails/stops
- Topic partition count changes
**During rebalance:**
- Short processing pause (usually < 1 second)
- Partitions reassigned to available consumers
graph TB
subgraph "Before Rebalance"
P0A[Partition 0] --> C1A[Consumer 1]
P1A[Partition 1] --> C2A[Consumer 2 ❌ FAILS]
P2A[Partition 2] --> C3A[Consumer 3]
end
subgraph "After Rebalance"
P0B[Partition 0] --> C1B[Consumer 1]
P1B[Partition 1] --> C1B
P2B[Partition 2] --> C3B[Consumer 3]
end
Scaling Guidelines¶
Optimal Consumer Count
| Scenario | Recommendation |
|---|---|
| Partitions = Consumers | Ideal - full parallelism |
| Partitions > Consumers | Okay - some consumers handle multiple partitions |
| Consumers > Partitions | Wasteful - idle consumers sit unused |
Example:
- Topic with 6 partitions → Deploy 6 consumers for maximum throughput
- Topic with 3 partitions → Don't deploy more than 3 consumers
Error Handling and Best Practices¶
Handling Processing Failures¶
Common Failure Scenarios
1. Transient Network Errors ```java int maxRetries = 3; int retryCount = 0;
while (retryCount < maxRetries) {
try {
processOrder(record.value());
consumer.commitSync();
break; // Success
} catch (TransientException e) {
retryCount++;
Thread.sleep(1000 * retryCount); // Exponential backoff
}
}
```
**2. Poison Pills (Bad Messages)**
```java
try {
processOrder(record.value());
} catch (DeserializationException e) {
// Send to dead letter queue
producer.send(new ProducerRecord<>("orders-dlq", record.value()));
consumer.commitSync(); // Skip bad message
}
```
**3. Downstream Service Unavailable**
```java
try {
sendToDatabase(record.value());
} catch (DatabaseException e) {
consumer.pause(Collections.singleton(
new TopicPartition(record.topic(), record.partition())
));
// Retry after delay
scheduleRetry();
}
```
Best Practices Checklist¶
!!! success "Production-Ready Consumers" - [x] Use manual offset commits for critical data - [x] Implement idempotent processing (handle duplicates) - [x] Set appropriate session.timeout.ms for your workload - [x] Monitor consumer lag metrics - [x] Handle rebalances gracefully - [x] Use dead letter queues for poison pills - [x] Implement proper shutdown hooks - [x] Log offsets and processing metadata
Graceful Shutdown¶
```java final KafkaConsumer
Runtime.getRuntime().addShutdownHook(new Thread(() -> { // (1)! System.out.println("Shutting down gracefully..."); consumer.wakeup(); // (2)! }));
try { consumer.subscribe(Collections.singletonList("orders")); while (true) { ConsumerRecords
- Register shutdown hook for SIGTERM/SIGINT
- Interrupt poll() to trigger clean exit
- WakeupException signals graceful shutdown
- Close consumer to commit final offsets
```python import signal import sys
def shutdown(signum, frame): # (1)! print("Shutting down gracefully...") consumer.close() # (2)! sys.exit(0)
signal.signal(signal.SIGINT, shutdown) # Ctrl+C signal.signal(signal.SIGTERM, shutdown) # Docker stop
try: for message in consumer: process_order(message.value) except KeyboardInterrupt: pass finally: consumer.close() # (3)! ```
- Signal handler for graceful shutdown
- Close consumer commits offsets
- Ensure cleanup in finally block
Complete Example: Order Processing Service¶
Here's a production-ready consumer example:
```java title="OrderConsumerService.java" linenums="1" import org.apache.kafka.clients.consumer.; import java.time.Duration; import java.util.;
public class OrderConsumerService { private final KafkaConsumer
public OrderConsumerService(Properties config) {
this.consumer = new KafkaConsumer<>(config);
}
public void start() {
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
consumer.subscribe(Collections.singletonList("orders"));
System.out.println("Consumer started, waiting for messages...");
try {
while (running) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Commit after batch
consumer.commitSync();
}
} catch (WakeupException e) {
// Expected during shutdown
} catch (Exception e) {
System.err.println("Consumer error: " + e.getMessage());
} finally {
consumer.close();
System.out.println("Consumer closed");
}
}
private void processRecord(ConsumerRecord<String, String> record) {
try {
System.out.printf("Processing order: %s = %s%n",
record.key(), record.value());
// Your business logic here
// e.g., parse JSON, validate, store in database
} catch (Exception e) {
System.err.println("Failed to process record: " + e.getMessage());
// Send to DLQ or implement retry logic
}
}
private void shutdown() {
System.out.println("Shutdown signal received");
running = false;
consumer.wakeup();
}
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processing-service");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
OrderConsumerService service = new OrderConsumerService(props);
service.start();
}
} ```
```python title="order_consumer_service.py" linenums="1" from kafka import KafkaConsumer import signal import sys import json
class OrderConsumerService: def init(self, config): self.consumer = KafkaConsumer( 'orders', **config, value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) self.running = True
def start(self):
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
print("Consumer started, waiting for messages...")
try:
for message in self.consumer:
if not self.running:
break
self._process_message(message)
self.consumer.commit()
except Exception as e:
print(f"Consumer error: {e}")
finally:
self.consumer.close()
print("Consumer closed")
def _process_message(self, message):
try:
order = message.value
print(f"Processing order: {message.key} = {order}")
# Your business logic here
# e.g., validate order, update inventory, send notification
except Exception as e:
print(f"Failed to process message: {e}")
# Send to DLQ or implement retry logic
def _shutdown(self, signum, frame):
print("Shutdown signal received")
self.running = False
if name == "main": config = { 'bootstrap_servers': 'localhost:9092', 'group_id': 'order-processing-service', 'enable_auto_commit': False, 'auto_offset_reset': 'earliest' }
service = OrderConsumerService(config)
service.start()
```
What's Next?¶
!!! tip "Continue Learning" - Use Schema Registry - Manage data schemas for type safety - Connect External Systems - Integrate Kafka with databases - Process Streams - Transform data in real-time - Back to Tutorial - Review core concepts
Additional Resources¶
Official Documentation¶
- Apache Kafka Consumer API Documentation
- Kafka Consumer Configuration Reference
- Consumer Group Protocol Internals
- Confluent Consumer Configuration Guide
Tutorials & Courses¶
- Kafka Consumer 101 Course - Free Confluent course
- Interactive Kafka Consumer Exercise
- Consumer Best Practices
Advanced Topics¶
Course Attribution
This guide is based on content from the Apache Kafka 101 course by Confluent and official Apache Kafka documentation.