Integrating External Systems with Kafka Connect¶
Overview¶
Learn how to use Kafka Connect to build data integration pipelines between Apache Kafka and external systems without writing custom code. This guide covers connector setup, configuration, Single Message Transforms (SMTs), and best practices for building production-ready data pipelines.
!!! info "What You'll Learn" - Understand Kafka Connect architecture - Configure source and sink connectors - Apply transformations with SMTs - Deploy and monitor connectors - Choose between self-managed and cloud-managed options
Prerequisites¶
- Docker and Docker Compose installed
- Basic understanding of Kafka topics and partitions
- Target system credentials (database, API, etc.)
Quick Start: Run Kafka Connect with Docker Compose¶
Use this Docker Compose configuration to run Kafka with KRaft and Kafka Connect in distributed mode:
- Confluent Platform Kafka image (includes KRaft mode)
- Use service name
kafkafor internal communication between containers - Disable Confluent Support metrics collection for minimal local setup
- Confluent Platform image includes many pre-installed connectors
- Kafka Connect REST API port for managing connectors
- Bootstrap servers pointing to Kafka service
- Unique group ID for this Connect cluster
- Internal topics for storing connector configs, offsets, and status
- Replication factor 1 for development (use 3+ in production)
- JSON converter for message serialization (can use Avro with Schema Registry)
- Plugin path where connector JARs are loaded from
Start Kafka Connect¶
# Start services
docker-compose up -d
# Wait for services to be ready
docker-compose ps
# Verify Kafka Connect is running
curl http://localhost:8083/
# List installed connectors
curl http://localhost:8083/connector-plugins | jq
# Stop services
docker-compose down
Install Additional Connectors
```bash # Install connector from Confluent Hub (example: Elasticsearch Sink) docker exec kafka-connect confluent-hub install \ confluentinc/kafka-connect-elasticsearch:latest --no-prompt
# Restart Connect to load new connector
docker-compose restart kafka-connect
```
Production Considerations
This is a minimal setup for development. For production: - Run multiple Connect workers (3+) for high availability - Set replication factor to 3 for internal topics - Enable SSL/SASL for secure communication - Use Avro with Schema Registry for data serialization - Monitor connector health and lag
What is Kafka Connect?¶
Kafka Connect Definition
Kafka Connect is Kafka's integration framework for streaming data between Kafka and external systems. It provides:
- **Plug-and-play connectors** for popular systems
- **Declarative configuration** (JSON-based)
- **Automatic scaling and fault tolerance**
- **No custom code required** for most use cases
graph LR
subgraph "External Systems"
DB[(PostgreSQL<br/>Database)]
SALES[Salesforce<br/>API]
FILES[S3<br/>Bucket]
end
subgraph "Kafka Connect Cluster"
SC1[Source<br/>Connector]
SC2[Source<br/>Connector]
SINK1[Sink<br/>Connector]
end
subgraph "Kafka Cluster"
T1[Topic: orders]
T2[Topic: customers]
T3[Topic: analytics]
end
subgraph "Destination Systems"
ES[(Elasticsearch)]
DW[(Snowflake<br/>Data Warehouse)]
end
DB -->|Read changes| SC1
SALES -->|Pull data| SC2
SC1 -->|Produce| T1
SC2 -->|Produce| T2
T1 -->|Consume| SINK1
T3 -->|Consume| SINK1
SINK1 -->|Write| ES
SINK1 -->|Write| DW
Connector Types¶
- Source Connectors
Purpose: Import data INTO Kafka
Examples: - Database CDC (Debezium) - File sources (S3, HDFS) - Message queues (RabbitMQ) - APIs (Salesforce, REST)
Output: Kafka topics
- Sink Connectors
Purpose: Export data FROM Kafka
Examples: - Databases (PostgreSQL, MySQL) - Search engines (Elasticsearch) - Data warehouses (Snowflake) - Object storage (S3, GCS)
Input: Kafka topics
Step 1: Set Up Kafka Connect¶
Deployment Modes¶
Kafka Connect can run in two modes:
Development Only
Standalone mode runs a single worker process. Use only for development and testing.
| Start standalone worker | |
|---|---|
- Worker configuration: bootstrap servers, key/value converters
- Connector configuration file (JSON)
Recommended for Production
Distributed mode runs a cluster of workers with automatic load balancing and fault tolerance.
| Start distributed worker | |
|---|---|
- Cluster configuration: group.id, offset storage topics
- Start on multiple machines for HA
- Use REST API (port 8083 by default) to manage connectors
Worker Configuration¶
- Kafka cluster address
- How to serialize keys (JSON, Avro, String)
- How to serialize values
- File-based offset storage (standalone only)
- Unique cluster identifier - workers with same ID form a cluster
- Stores connector configurations
- Tracks source connector positions
- Stores connector and task statuses
- REST API endpoint for connector management
Step 2: Configure Source Connectors¶
Source connectors bring data FROM external systems INTO Kafka.
Example: PostgreSQL CDC with Debezium¶
Change Data Capture (CDC)
CDC connectors capture database changes (inserts, updates, deletes) and stream them to Kafka in real-time.
- Unique connector name within the cluster
- Fully qualified connector class name
- Number of parallel tasks (scale based on tables/partitions)
- Database connection details
- Database name to capture changes from
- Logical name used in topic naming
- Whitelist specific tables (comma-separated)
- PostgreSQL logical decoding plugin
- Apply transformations (see SMTs section)
Deploy the Connector¶
# Create connector
curl -X POST http://localhost:8083/connectors \ # (1)!
-H "Content-Type: application/json" \
-d @postgres-source-connector.json
# Check status
curl http://localhost:8083/connectors/postgres-source/status # (2)!
# List all connectors
curl http://localhost:8083/connectors # (3)!
- POST connector config to REST API
- Verify connector is RUNNING
- See all deployed connectors
Common Source Connectors¶
Popular Source Connectors
| Connector | Use Case | Data Format |
|---|---|---|
| Debezium (PostgreSQL, MySQL) | Database CDC | JSON, Avro |
| JDBC Source | Poll database tables | JSON, Avro |
| S3 Source | Read files from S3 | CSV, JSON, Avro |
| Salesforce | Pull CRM data | JSON |
| MongoDB | Capture change streams | JSON, BSON |
| Syslog | Collect log events | String, JSON |
Step 3: Configure Sink Connectors¶
Sink connectors send data FROM Kafka TO external systems.
Example: Elasticsearch Sink¶
- Elasticsearch sink connector class
- Parallelize across partitions
- Comma-separated list of topics to consume
- Elasticsearch cluster connection
- Document type (deprecated in ES 7+, use "_doc")
- Use Kafka message key as document ID
- Delete ES document on null value (tombstone)
- Unwrap Debezium CDC envelope
Example: JDBC Sink (PostgreSQL)¶
- JDBC connection string
- Insert mode: insert, upsert, or update
- Primary key mode: record_key, record_value, or kafka
- Fields to use as primary key
- Target table name pattern
- Auto-create table if doesn't exist
- Auto-add columns when schema changes
Common Sink Connectors¶
Popular Sink Connectors
| Connector | Use Case | Features |
|---|---|---|
| Elasticsearch | Search & analytics | Full-text search, real-time indexing |
| JDBC Sink | Relational databases | Upsert support, auto-create tables |
| S3 Sink | Data lake storage | Partitioning, compression, formats |
| Snowflake | Cloud data warehouse | Batch loading, schema evolution |
| BigQuery | Google analytics | Streaming inserts, partitioning |
| MongoDB Sink | Document database | Upsert, change stream compatibility |
Step 4: Apply Single Message Transforms (SMTs)¶
Lightweight Transformations
SMTs perform simple, stateless transformations on messages as they flow through connectors.
**Use SMTs for:**
- Adding/removing fields
- Renaming fields
- Filtering messages
- Masking sensitive data
- Routing to different topics
**Don't use SMTs for:**
- Stateful operations (aggregations, joins)
- Complex business logic
- Multi-message transformations
Common SMT Examples¶
1. Add Field (Context Enrichment)¶
{
"transforms": "addSource",
"transforms.addSource.type": "org.apache.kafka.connect.transforms.InsertField$Value", // (1)!
"transforms.addSource.static.field": "source_system", // (2)!
"transforms.addSource.static.value": "production_db" // (3)!
}
- SMT class for adding fields to value
- Field name to add
- Static value to set
Result:
// Before
{"order_id": 123, "amount": 99.99}
// After
{"order_id": 123, "amount": 99.99, "source_system": "production_db"}
2. Mask Sensitive Data¶
{
"transforms": "maskPII",
"transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPII.fields": "credit_card,ssn", // (1)!
"transforms.maskPII.replacement": "****" // (2)!
}
- Fields to mask
- Replacement value
3. Filter Messages¶
{
"transforms": "filter",
"transforms.filter.type": "io.confluent.connect.transforms.Filter$Value", // (1)!
"transforms.filter.filter.condition": "$.status == 'CANCELLED'", // (2)!
"transforms.filter.filter.type": "exclude" // (3)!
}
- Filter transform (requires Confluent license or custom SMT)
- JSONPath condition
- Exclude or include matching messages
4. Rename Fields¶
{
"transforms": "rename",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "old_name:new_name,user_id:customer_id" // (1)!
}
- Comma-separated field mappings
5. Route to Different Topics¶
{
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)orders(.*)", // (1)!
"transforms.route.replacement": "$1orders_v2$2" // (2)!
}
- Regex pattern to match topic name
- Replacement pattern
Chain Multiple Transforms¶
{
"transforms": "unwrap,addTimestamp,maskPII", // (1)!
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "ingestion_time",
"transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPII.fields": "ssn,credit_card"
}
- Applied in order: unwrap → add timestamp → mask PII
Step 5: Monitor and Manage Connectors¶
Connector Lifecycle¶
curl http://localhost:8083/connectors/my-connector/status
# Response:
{
"name": "my-connector",
"connector": {
"state": "RUNNING", // (1)!
"worker_id": "connect-worker-1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING", // (2)!
"worker_id": "connect-worker-1:8083"
}
]
}
- Connector state: RUNNING, PAUSED, FAILED
- Task state - each task processes subset of data
# Pause connector
curl -X PUT http://localhost:8083/connectors/my-connector/pause // (1)!
# Resume connector
curl -X PUT http://localhost:8083/connectors/my-connector/resume // (2)!
- Stop processing without deleting configuration
- Restart from last committed offset
Monitoring Metrics¶
Key Metrics to Monitor
| Metric | Description | Action on Alert |
|---|---|---|
| Connector state | RUNNING, PAUSED, FAILED | Restart if FAILED |
| Task state | Individual task status | Check logs, restart task |
| Records processed | Throughput rate | Scale tasks if slow |
| Error count | Failed message count | Check error logs, fix config |
| Offset lag | Source connector lag | Increase parallelism |
Common Issues¶
Troubleshooting Connectors
1. Connector Fails to Start ```bash # Check logs tail -f logs/connect.log
# Common causes:
# - Missing connector plugin
# - Invalid configuration
# - Network connectivity
```
**2. Slow Performance**
```json
{
"tasks.max": "4" // Increase parallelism
}
```
**3. Schema Evolution Errors**
```json
{
"value.converter.schemas.enable": "false", // Disable schemas
"auto.evolve": "true" // Auto-adapt to schema changes
}
```
**4. Offset Corruption**
```bash
# Reset offsets (consumer group)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group connect-my-connector \
--reset-offsets --to-earliest --execute --all-topics
```
Connector Ecosystem¶
Confluent Hub¶
Connector Repository
Confluent Hub provides 100+ certified connectors.
**Install connectors:**
```bash
# Install Confluent Hub CLI
confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 // (1)!
# List installed
confluent-hub list
```
1. Installs connector and dependencies
Self-Managed vs Cloud-Managed¶
- Self-Managed
Pros: - Full control over infrastructure - No vendor lock-in - Custom connectors
Cons: - Operational overhead - Manual scaling - Security management
Use when: Running on-premises or need custom connectors
- Cloud-Managed (Confluent Cloud)
Pros: - Zero operational overhead - Auto-scaling - 80+ managed connectors
Cons: - Vendor-specific - Limited customization
Use when: Prefer managed services, standard connectors
Best Practices¶
Production Deployment Checklist
Configuration:
- [x] Use distributed mode for production
- [x] Set replication factor ≥ 3 for internal topics
- [x] Enable authentication and encryption
- [x] Configure appropriate `tasks.max` for parallelism
**Monitoring:**
- [x] Monitor connector and task states
- [x] Track throughput and lag metrics
- [x] Set up alerts for FAILED state
- [x] Log errors to centralized system
**Data Quality:**
- [x] Enable schema validation (Schema Registry)
- [x] Use SMTs for data quality checks
- [x] Handle schema evolution gracefully
- [x] Test with sample data first
**Performance:**
- [x] Tune `batch.size` and `linger.ms`
- [x] Adjust `tasks.max` based on load
- [x] Use compression for large messages
- [x] Monitor consumer lag
SMT Best Practices¶
When NOT to Use SMTs
SMTs are stateless and single-record. Don't use SMTs for:
- Aggregations (use Kafka Streams/Flink)
- Joins (use stream processing)
- Complex business logic
- Enrichment requiring external lookups
**Instead:** Stream data to Kafka first, then process with dedicated stream processing.
Complete Example: End-to-End Pipeline¶
Here's a complete data pipeline from PostgreSQL to Elasticsearch:
What's Next?¶
!!! tip "Continue Learning" - Use Schema Registry - Manage schemas for data quality - Process Streams - Transform data with Flink or Kafka Streams - Produce Messages - Write custom producers - Consume Messages - Write custom consumers
Additional Resources¶
- Kafka Connect Documentation
- Kafka Connect 101 Course
- Confluent Hub - Connector Repository
- Debezium CDC Connectors
- Single Message Transforms Reference
Hands-On Practice
Try the interactive Kafka Connect exercise from Confluent to practice deploying connectors.