Process Data Streams with Kafka¶
Learn how to transform, aggregate, and enrich data in real-time using Kafka Streams and Apache Flink.
What You'll Learn¶
By the end of this guide, you'll be able to:
- ✅ Understand stream processing concepts and use cases
- ✅ Build stateless and stateful transformations
- ✅ Implement windowing and time-based aggregations
- ✅ Join streams and tables in real-time
- ✅ Use Kafka Streams for lightweight processing
- ✅ Use Apache Flink for advanced analytics
- ✅ Deploy stream processing applications in production
Prerequisites:
- Docker and Docker Compose installed
- Understanding of producers and consumers
- Familiarity with Java or Python (for code examples)
Estimated Time: 45 minutes
Quick Start: Run Kafka with Docker Compose¶
Use this Docker Compose configuration to run a single-node Kafka cluster with KRaft (no Zookeeper needed):
- Confluent Platform Kafka image (includes KRaft mode)
- Kafka broker port exposed to host
- Unique node ID in the KRaft cluster
- This node acts as both broker and controller (combined mode)
- Listeners: PLAINTEXT for clients, CONTROLLER for internal cluster communication
- Advertised listener for external clients
- Quorum voters for KRaft consensus (format:
id@host:port) - Unique cluster ID - generate with
docker run confluentinc/cp-kafka kafka-storage random-uuid - Log directory for Kafka data storage
- Disable Confluent Support metrics collection for minimal local setup
- Default partition count for new topics (good for parallel stream processing)
Start Kafka and Create Topics¶
# Start Kafka
docker-compose up -d
# Wait for Kafka to be ready
docker-compose ps
# Create input topics for stream processing
docker exec kafka-kraft kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic orders --partitions 3 --replication-factor 1
docker exec kafka-kraft kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic customers --partitions 3 --replication-factor 1 \
--config cleanup.policy=compact # (1)!
# Produce sample data
docker exec -i kafka-kraft kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic orders \
--property "parse.key=true" \
--property "key.separator=:" << EOF
ORD-001:{"order_id":"ORD-001","customer_id":"CUST-123","amount":99.99,"region":"US-WEST"}
ORD-002:{"order_id":"ORD-002","customer_id":"CUST-456","amount":149.50,"region":"US-EAST"}
ORD-003:{"order_id":"ORD-003","customer_id":"CUST-123","amount":75.25,"region":"US-WEST"}
EOF
# Stop Kafka
docker-compose down
- Compacted topic for customer data (keeps only latest value per key)
Verify Stream Processing Setup
```bash # List topics docker exec kafka-kraft kafka-topics \ --bootstrap-server localhost:9092 --list
# Consume from orders topic
docker exec kafka-kraft kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic orders --from-beginning --max-messages 3
```
Running Kafka Streams Applications
Kafka Streams applications run as separate Java processes, not inside Kafka:
```bash
# Build your Kafka Streams application
mvn clean package
# Run locally (connects to Docker Kafka)
java -jar target/stream-processor-1.0.jar
```
For Flink, you'll need a separate Flink cluster (see [Deployment Strategies](#deployment-strategies) section).
What is Stream Processing?¶
Stream Processing is the real-time processing of continuous data streams. Unlike batch processing (which processes historical data), stream processing analyzes data as it arrives—enabling instant insights and actions.
Stream Processing vs Batch Processing
Batch Processing: Process data in large chunks every hour/day (e.g., nightly ETL jobs)
Stream Processing: Process each event immediately as it arrives (e.g., fraud detection, real-time dashboards)
graph LR
A[Orders Topic] -->|Stream| B[Stream Processor]
B -->|Filter| C[Valid Orders]
B -->|Aggregate| D[Revenue by Region]
B -->|Enrich| E[Orders + Customer Info]
C --> F[Fulfillment System]
D --> G[Analytics Dashboard]
E --> H[Customer Service]
!!! tip "When to Use Stream Processing" - Real-time analytics - Live dashboards, metrics, monitoring - Event-driven actions - Fraud detection, alerting, notifications - Data enrichment - Join events with reference data in real-time - Continuous ETL - Transform and route data between systems - Complex event processing - Detect patterns across multiple events
Stream Processing Frameworks¶
Kafka ecosystem offers two main frameworks:
- Kafka Streams
Best For: Lightweight, embedded processing
Pros: - Java library - no separate cluster needed - Exactly-once semantics built-in - Auto-scaling with consumer groups - Simple deployment (just a JAR file)
Cons: - Java/Scala only (no Python) - Limited to Kafka as source/sink - Less advanced analytics features
- Apache Flink
Best For: Advanced analytics, complex CEP
Pros: - Powerful SQL interface - Advanced windowing and joins - Python support (PyFlink) - Multiple sources/sinks (Kafka, databases, files)
Cons: - Requires separate cluster (JobManager + TaskManagers) - More operational complexity - Heavier resource footprint
!!! tip "Which Framework to Choose?" - Start with Kafka Streams if you need simple transformations and want minimal ops overhead - Use Apache Flink if you need SQL, Python support, or advanced windowing/joins
Step 1: Stateless Transformations (Filter, Map)¶
Stateless operations process each event independently without remembering previous events.
Kafka Streams Example: Filter and Transform Orders¶
- Application ID - used for consumer group and state store naming
- Read stream from
orderstopic - creates a KStream - Filter operation - only keeps orders > $100 (stateless)
- Map operation - transforms each order by adding VIP flag
- Write filtered/transformed stream to
vip-orderstopic
- Create Flink DataStream from Kafka source
- Filter predicate - return True to keep the event
- Filter operation - removes low-value orders
- Map transformation - adds VIP flag and returns JSON string
Use Cases for Stateless Transformations¶
- Filtering: Remove invalid events, route by type, compliance filtering
- Mapping: Format conversion, field extraction, enrichment with constants
- FlatMap: Parse complex events into multiple simple events
Step 2: Stateful Aggregations (Count, Sum)¶
Stateful operations maintain state across multiple events—enabling aggregations like counts, sums, and averages.
Kafka Streams Example: Count Orders by Region¶
- Re-key stream by region - required for grouping
- KTable represents aggregated state (region → count)
- Group by key (region) - prepares for aggregation
- Count operation - maintains state in RocksDB store named "order-count-store"
- Convert KTable back to KStream and write to output topic
- Transform to tuple (region, 1) - 1 represents one order
- Key by region (first element of tuple)
- Sum the second element (count) - Flink maintains state automatically
State Management
Stateful operations require persistent state stores: - Kafka Streams: Uses RocksDB (embedded key-value store) + changelog topic for fault tolerance - Flink: Uses checkpoint snapshots to distributed storage (HDFS, S3)
Step 3: Windowing and Time-Based Aggregations¶
Windowing groups events into time buckets—enabling operations like "orders per minute" or "revenue per hour."
Window Types¶
- Tumbling Windows
Fixed-size, non-overlapping windows
Use Case: "Count orders every 5 minutes"
- Hopping Windows
Fixed-size, overlapping windows
Use Case: "Revenue per 10 minutes, updated every 5 minutes"
- Sliding Windows
Window per event (continuous)
Use Case: "Average of last 100 events"
- Session Windows
Gap-based windows (activity-driven)
Use Case: "User sessions with 30-min inactivity timeout"
Kafka Streams Example: Tumbling Window (5-Minute Revenue)¶
- Transform stream to (region, amount) pairs
- Define tumbling window - fixed 5-minute buckets
ofSizeWithNoGrace- no late event handling (events must arrive on time)- Apply windowing to grouped stream
- Sum amounts within each window
- Windowed key contains both the region and window metadata (start/end times)
- Event time column - timestamp extracted from message payload
- Watermark strategy - allows 5 seconds of late events
- Window start time - beginning of 5-minute tumbling window
- Aggregate function - sum revenue within window
- Tumbling window definition - groups events into 5-minute buckets
!!! tip "Event Time vs Processing Time" - Event Time: Timestamp from the event itself (e.g., order creation time) - use this for accurate analytics - Processing Time: Timestamp when event is processed by stream processor - simpler but less accurate
Step 4: Joining Streams and Tables¶
Joins combine data from multiple streams or tables in real-time—enabling enrichment and correlation.
Join Types¶
| Join Type | Description | Use Case |
|---|---|---|
| Stream-Stream | Join two event streams | Correlate related events (e.g., order + payment) |
| Stream-Table | Enrich stream with reference data | Add customer info to orders |
| Table-Table | Join two tables (KTables) | Combine aggregated views |
Kafka Streams Example: Enrich Orders with Customer Data¶
- KTable from compacted topic - represents current state of customers
- Materialized view stored in RocksDB for fast lookups
- Re-key orders stream to match customer table key (customer_id)
- Left join - keeps orders even if customer not found (customerJson = null)
- Join with customer table - lookup happens in state store (fast)
- Value joiner - combines order and customer JSON into enriched event
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = StreamTableEnvironment.create(env_settings)
# Define orders stream
table_env.execute_sql("""
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
amount DOUBLE,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# Define customers table (compacted topic)
table_env.execute_sql("""
CREATE TABLE customers (
customer_id STRING,
name STRING,
email STRING,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'customers',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset', -- (1)!
'format' = 'json'
)
""")
# Join orders with customers
result = table_env.sql_query("""
SELECT
o.order_id,
o.customer_id,
o.amount,
c.name AS customer_name, -- (2)!
c.email AS customer_email
FROM orders o
LEFT JOIN customers c -- (3)!
ON o.customer_id = c.customer_id
""")
result.execute().print()
- Read all customer data from beginning to build state
- Enrich order with customer name and email
- Left join - keeps orders even if customer not found
!!! warning "Join Performance Considerations" - Stream-Table joins are fast (in-memory lookup) but require the table to fit in state - Stream-Stream joins require windowing (join events within time window) - Use compacted topics for tables to keep state small (only latest value per key)
Step 5: Advanced Patterns¶
Pattern 1: Sessionization (Session Windows)¶
Track user sessions with inactivity timeout:
// Session window: group events with <30-minute gap into same session
SessionWindows sessionWindow = SessionWindows.ofInactivityGapWithNoGrace(
Duration.ofMinutes(30) // (1)!
);
KTable<Windowed<String>, Long> sessionCounts = clickstream
.groupByKey()
.windowedBy(sessionWindow) // (2)!
.count();
- If no events for 30 minutes, session ends
- Dynamic windows - each user can have different session duration
Pattern 2: Deduplication¶
Remove duplicate events using state:
KStream<String, String> deduplicated = orders
.groupByKey()
.reduce( // (1)!
(oldValue, newValue) -> newValue, // Keep latest
Materialized.as("dedup-store")
)
.toStream();
- Reduce keeps only one value per key - effectively deduplicates
Pattern 3: Changelog Stream¶
Convert KTable (state) to changelog stream:
KTable<String, Long> counts = orders.groupByKey().count();
KStream<String, Long> changelog = counts.toStream(); // (1)!
// Emits: (key=US-WEST, value=1), (key=US-WEST, value=2), ...
- Changelog emits every state update - useful for downstream systems
Step 6: Exactly-Once Semantics¶
Kafka Streams supports exactly-once processing (EOS) to prevent duplicate results.
Enable Exactly-Once in Kafka Streams¶
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2); // (1)!
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
- Exactly-once v2 - uses transactional writes and idempotent producers
!!! danger "Exactly-Once Requirements" - Kafka 2.5+ required for EOS v2 - Replication factor ≥ 3 recommended - min.insync.replicas ≥ 2 - Slight performance overhead (~5-10%) vs at-least-once
Best Practices¶
- DO
- Use event time (not processing time) for windowing
- Enable exactly-once semantics for critical applications
- Use compacted topics for KTables (changelog retention)
- Monitor consumer lag and state store size
-
Test with out-of-order and late events
-
DON'T
- Store unbounded state (use windowing or TTL)
- Use large windows without tuning state store size
- Assume events arrive in order (handle late events)
- Ignore rebalancing and state migration time
- Process sensitive data without encryption
Production Checklist¶
- Exactly-once semantics enabled for critical apps
- State stores configured with appropriate retention
- Monitoring: consumer lag, processing latency, state size
- Fault tolerance: changelog topics replicated (RF ≥ 3)
- Scaling: multiple instances for parallel processing
- Error handling: dead letter queue for poison pills
- Testing: unit tests with TopologyTestDriver (Kafka Streams)
Troubleshooting¶
Error: StreamsException: State store not found
Cause: State store not materialized or query before ready
**Solution:**
1. Ensure state store is materialized:
```java
.count(Materialized.as("my-store"))
```
2. Wait for state to restore after rebalance:
```java
streams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING) {
// State restored, ready to query
}
});
```
Error: Late events dropped by window
Cause: Events arrived after window closed (past allowed lateness)
**Solution:**
1. Increase grace period for late events:
```java
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5), // Window size
Duration.ofMinutes(1) // Allow 1 minute of lateness
)
```
2. Use watermarks in Flink to handle late events
3. Monitor late event metrics
Error: RocksDB out of memory
Cause: State store grew too large for available memory
**Solution:**
1. Add windowing to bound state size
2. Increase RocksDB block cache:
```java
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
CustomRocksDBConfig.class);
```
3. Scale horizontally (more instances = state partitioned)
4. Use TTL for state entries
Complete Example: Real-Time Fraud Detection¶
Deployment Strategies¶
Kafka Streams Deployment¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-processor
spec:
replicas: 3 # (1)!
selector:
matchLabels:
app: order-processor
template:
metadata:
labels:
app: order-processor
spec:
containers:
- name: processor
image: order-processor:1.0
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka:9092"
resources: # (2)!
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
- 3 replicas for parallel processing - auto-scaling possible
- Resource limits - RocksDB state store needs memory
Flink Deployment¶
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-stream-processor
spec:
image: flink:1.18
flinkVersion: v1_18
jobManager: # (1)!
replicas: 1
resource:
memory: "2Gi"
cpu: 1
taskManager: # (2)!
replicas: 3
resource:
memory: "4Gi"
cpu: 2
job:
jarURI: s3://my-bucket/stream-processor.jar
parallelism: 6 # (3)!
upgradeMode: stateful # (4)!
- JobManager - orchestrates job execution
- TaskManagers - execute parallel tasks
- Parallelism level - distribute work across task slots
- Stateful upgrade - preserve state during redeployment
What's Next?¶
Now that you understand stream processing, explore:
- Kafka Connect - Integrate stream processing with external systems
- Schema Registry - Use Avro for efficient stream serialization
- Kafka Getting Started - Review Kafka fundamentals
Additional Resources¶
Official Documentation¶
- Apache Kafka Streams Documentation
- Kafka Streams API Javadoc
- Apache Flink Documentation
- Flink Kafka Connector
Tutorials & Courses¶
- Kafka Streams 101 Course - Free Confluent course
- Apache Flink Hands-On Training
- Stream Processing Fundamentals
- Building Streaming Applications Tutorial
Books & Guides¶
- Designing Event-Driven Systems - Free O'Reilly book by Ben Stopford
- Stream Processing with Apache Flink - Fabian Hueske & Vasiliki Kalavri
- Kafka Streams in Action - Bill Bejeck
- Mastering Kafka Streams - Mitch Seymour
Advanced Topics¶
- Kafka Streams State Stores
- Flink State Management
- Exactly-Once Semantics in Kafka Streams
- Windowing Strategies Comparison
Course Attribution
This guide is based on content from Apache Kafka Streams documentation, Apache Flink documentation, Confluent tutorials, and industry best practices.