Intégrer des Systèmes Externes avec Kafka Connect¶
Vue d'ensemble¶
Apprenez à utiliser Kafka Connect pour construire des pipelines d'intégration de données entre Apache Kafka et des systèmes externes sans écrire de code personnalisé. Ce guide couvre la configuration des connecteurs, les Single Message Transforms (SMTs), et les meilleures pratiques pour construire des pipelines de données prêts pour la production.
!!! info "Ce que vous allez apprendre" - Comprendre l'architecture de Kafka Connect - Configurer des connecteurs source et sink - Appliquer des transformations avec les SMTs - Déployer et surveiller les connecteurs - Choisir entre auto-géré et géré dans le cloud
Prérequis¶
- Docker et Docker Compose installés
- Compréhension de base des topics et partitions Kafka
- Identifiants du système cible (base de données, API, etc.)
Démarrage Rapide : Exécuter Kafka Connect avec Docker Compose¶
Utilisez cette configuration Docker Compose pour exécuter Kafka avec KRaft et Kafka Connect en mode distribué :
- Image Confluent Platform Kafka (inclut le mode KRaft)
- Utiliser le nom de service
kafkapour la communication interne entre conteneurs - Désactiver la collecte de métriques Confluent Support pour une configuration locale minimale
- L'image Confluent Platform inclut de nombreux connecteurs pré-installés
- Port de l'API REST Kafka Connect pour gérer les connecteurs
- Bootstrap servers pointant vers le service Kafka
- ID de groupe unique pour ce cluster Connect
- Topics internes pour stocker les configs, offsets et statuts des connecteurs
- Facteur de réplication 1 pour le développement (utiliser 3+ en production)
- Convertisseur JSON pour la sérialisation des messages (peut utiliser Avro avec Schema Registry)
- Chemin des plugins où les JARs des connecteurs sont chargés
Démarrer Kafka Connect¶
# Démarrer les services
docker-compose up -d
# Attendre que les services soient prêts
docker-compose ps
# Vérifier que Kafka Connect fonctionne
curl http://localhost:8083/
# Lister les connecteurs installés
curl http://localhost:8083/connector-plugins | jq
# Arrêter les services
docker-compose down
Installer des Connecteurs Supplémentaires
```bash # Installer un connecteur depuis Confluent Hub (exemple: Elasticsearch Sink) docker exec kafka-connect confluent-hub install \ confluentinc/kafka-connect-elasticsearch:latest --no-prompt
# Redémarrer Connect pour charger le nouveau connecteur
docker-compose restart kafka-connect
```
Considérations de Production
Ceci est une configuration minimale pour le développement. Pour la production : - Exécuter plusieurs workers Connect (3+) pour la haute disponibilité - Définir le facteur de réplication à 3 pour les topics internes - Activer SSL/SASL pour la communication sécurisée - Utiliser Avro avec Schema Registry pour la sérialisation des données - Surveiller la santé des connecteurs et le lag
Qu'est-ce que Kafka Connect ?¶
Définition de Kafka Connect
Kafka Connect est le framework d'intégration de Kafka pour le streaming de données entre Kafka et des systèmes externes. Il fournit :
- **Connecteurs plug-and-play** pour systèmes populaires
- **Configuration déclarative** (basée sur JSON)
- **Scaling automatique et tolérance aux pannes**
- **Aucun code personnalisé requis** pour la plupart des cas d'usage
graph LR
subgraph "Systèmes Externes"
DB[(PostgreSQL<br/>Database)]
SALES[Salesforce<br/>API]
FILES[S3<br/>Bucket]
end
subgraph "Cluster Kafka Connect"
SC1[Source<br/>Connector]
SC2[Source<br/>Connector]
SINK1[Sink<br/>Connector]
end
subgraph "Cluster Kafka"
T1[Topic: orders]
T2[Topic: customers]
T3[Topic: analytics]
end
subgraph "Systèmes de Destination"
ES[(Elasticsearch)]
DW[(Snowflake<br/>Data Warehouse)]
end
DB -->|Lire changements| SC1
SALES -->|Récupérer données| SC2
SC1 -->|Produire| T1
SC2 -->|Produire| T2
T1 -->|Consommer| SINK1
T3 -->|Consommer| SINK1
SINK1 -->|Écrire| ES
SINK1 -->|Écrire| DW
Types de Connecteurs¶
- Connecteurs Source
Objectif : Importer des données DANS Kafka
Exemples : - CDC bases de données (Debezium) - Sources fichiers (S3, HDFS) - Files de messages (RabbitMQ) - APIs (Salesforce, REST)
Sortie : Topics Kafka
- Connecteurs Sink
Objectif : Exporter des données DEPUIS Kafka
Exemples : - Bases de données (PostgreSQL, MySQL) - Moteurs de recherche (Elasticsearch) - Entrepôts de données (Snowflake) - Stockage objets (S3, GCS)
Entrée : Topics Kafka
Étape 1 : Configurer Kafka Connect¶
Modes de Déploiement¶
Kafka Connect peut s'exécuter en deux modes :
Développement Uniquement
Le mode standalone exécute un seul processus worker. À utiliser uniquement pour le développement et les tests.
| Démarrer worker standalone | |
|---|---|
- Configuration worker : bootstrap servers, convertisseurs key/value
- Fichier de configuration du connecteur (JSON)
Recommandé pour la Production
Le mode distribué exécute un cluster de workers avec équilibrage de charge automatique et tolérance aux pannes.
| Démarrer worker distribué | |
|---|---|
- Configuration cluster : group.id, topics stockage offsets
- Démarrer sur plusieurs machines pour HA
- Utiliser REST API (port 8083 par défaut) pour gérer connecteurs
Configuration Worker¶
- Adresse cluster Kafka
- Comment sérialiser les clés (JSON, Avro, String)
- Comment sérialiser les valeurs
- Stockage offsets basé fichier (standalone uniquement)
- Identifiant unique du cluster - workers avec même ID forment un cluster
- Stocke les configurations des connecteurs
- Suit les positions des connecteurs source
- Stocke les statuts des connecteurs et tâches
- Point de terminaison REST API pour gestion connecteurs
Étape 2 : Configurer les Connecteurs Source¶
Les connecteurs source importent des données DEPUIS des systèmes externes VERS Kafka.
Exemple : CDC PostgreSQL avec Debezium¶
Change Data Capture (CDC)
Les connecteurs CDC capturent les changements de base de données (insertions, mises à jour, suppressions) et les diffusent vers Kafka en temps réel.
- Nom unique du connecteur dans le cluster
- Nom complet de la classe du connecteur
- Nombre de tâches parallèles (échelle selon tables/partitions)
- Détails de connexion base de données
- Nom de la base de données pour capturer changements
- Nom logique utilisé dans le nommage des topics
- Liste blanche tables spécifiques (séparées par virgules)
- Plugin de décodage logique PostgreSQL
- Appliquer des transformations (voir section SMTs)
Déployer le Connecteur¶
# Créer connecteur
curl -X POST http://localhost:8083/connectors \ # (1)!
-H "Content-Type: application/json" \
-d @postgres-source-connector.json
# Vérifier statut
curl http://localhost:8083/connectors/postgres-source/status # (2)!
# Lister tous les connecteurs
curl http://localhost:8083/connectors # (3)!
- POST config connecteur vers REST API
- Vérifier que connecteur est RUNNING
- Voir tous les connecteurs déployés
Connecteurs Source Courants¶
Connecteurs Source Populaires
| Connecteur | Cas d'Usage | Format Données |
|---|---|---|
| Debezium (PostgreSQL, MySQL) | CDC base de données | JSON, Avro |
| JDBC Source | Interroger tables DB | JSON, Avro |
| S3 Source | Lire fichiers depuis S3 | CSV, JSON, Avro |
| Salesforce | Récupérer données CRM | JSON |
| MongoDB | Capturer change streams | JSON, BSON |
| Syslog | Collecter événements log | String, JSON |
Étape 3 : Configurer les Connecteurs Sink¶
Les connecteurs sink envoient des données DEPUIS Kafka VERS des systèmes externes.
Exemple : Elasticsearch Sink¶
- Classe connecteur sink Elasticsearch
- Paralléliser entre partitions
- Liste séparée par virgules des topics à consommer
- Connexion cluster Elasticsearch
- Type de document (déprécié dans ES 7+, utiliser "_doc")
- Utiliser clé message Kafka comme ID document
- Supprimer document ES sur valeur null (tombstone)
- Déballer l'enveloppe CDC Debezium
Exemple : JDBC Sink (PostgreSQL)¶
- Chaîne de connexion JDBC
- Mode insertion : insert, upsert, ou update
- Mode clé primaire : record_key, record_value, ou kafka
- Champs à utiliser comme clé primaire
- Pattern nom table cible
- Auto-créer table si n'existe pas
- Auto-ajouter colonnes quand schéma change
Connecteurs Sink Courants¶
Connecteurs Sink Populaires
| Connecteur | Cas d'Usage | Fonctionnalités |
|---|---|---|
| Elasticsearch | Recherche & analytics | Recherche full-text, indexation temps réel |
| JDBC Sink | Bases relationnelles | Support upsert, auto-créer tables |
| S3 Sink | Stockage data lake | Partitionnement, compression, formats |
| Snowflake | Entrepôt données cloud | Chargement batch, évolution schéma |
| BigQuery | Analytics Google | Insertions streaming, partitionnement |
| MongoDB Sink | Base de données documents | Upsert, compatibilité change stream |
Étape 4 : Appliquer les Single Message Transforms (SMTs)¶
Transformations Légères
Les SMTs effectuent des transformations simples et sans état sur les messages qui transitent par les connecteurs.
**Utiliser les SMTs pour :**
- Ajouter/supprimer champs
- Renommer champs
- Filtrer messages
- Masquer données sensibles
- Router vers différents topics
**Ne PAS utiliser les SMTs pour :**
- Opérations avec état (agrégations, jointures)
- Logique métier complexe
- Transformations multi-messages
Exemples SMT Courants¶
1. Ajouter un Champ (Enrichissement Contexte)¶
{
"transforms": "addSource",
"transforms.addSource.type": "org.apache.kafka.connect.transforms.InsertField$Value", // (1)!
"transforms.addSource.static.field": "source_system", // (2)!
"transforms.addSource.static.value": "production_db" // (3)!
}
- Classe SMT pour ajouter champs à la valeur
- Nom du champ à ajouter
- Valeur statique à définir
Résultat :
// Avant
{"order_id": 123, "amount": 99.99}
// Après
{"order_id": 123, "amount": 99.99, "source_system": "production_db"}
2. Masquer Données Sensibles¶
{
"transforms": "maskPII",
"transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPII.fields": "credit_card,ssn", // (1)!
"transforms.maskPII.replacement": "****" // (2)!
}
- Champs à masquer
- Valeur de remplacement
3. Filtrer Messages¶
{
"transforms": "filter",
"transforms.filter.type": "io.confluent.connect.transforms.Filter$Value", // (1)!
"transforms.filter.filter.condition": "$.status == 'CANCELLED'", // (2)!
"transforms.filter.filter.type": "exclude" // (3)!
}
- Transform filtre (nécessite licence Confluent ou SMT personnalisé)
- Condition JSONPath
- Exclure ou inclure messages correspondants
4. Renommer Champs¶
{
"transforms": "rename",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "old_name:new_name,user_id:customer_id" // (1)!
}
- Mappages champs séparés par virgules
5. Router vers Différents Topics¶
{
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)orders(.*)", // (1)!
"transforms.route.replacement": "$1orders_v2$2" // (2)!
}
- Pattern regex pour matcher nom topic
- Pattern de remplacement
Chaîner Plusieurs Transformations¶
{
"transforms": "unwrap,addTimestamp,maskPII", // (1)!
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "ingestion_time",
"transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPII.fields": "ssn,credit_card"
}
- Appliquées dans l'ordre : unwrap → add timestamp → mask PII
Étape 5 : Surveiller et Gérer les Connecteurs¶
Cycle de Vie des Connecteurs¶
curl http://localhost:8083/connectors/my-connector/status
# Réponse :
{
"name": "my-connector",
"connector": {
"state": "RUNNING", // (1)!
"worker_id": "connect-worker-1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING", // (2)!
"worker_id": "connect-worker-1:8083"
}
]
}
- État connecteur : RUNNING, PAUSED, FAILED
- État tâche - chaque tâche traite sous-ensemble données
# Mettre en pause connecteur
curl -X PUT http://localhost:8083/connectors/my-connector/pause // (1)!
# Reprendre connecteur
curl -X PUT http://localhost:8083/connectors/my-connector/resume // (2)!
- Arrêter traitement sans supprimer configuration
- Redémarrer depuis dernier offset committé
Surveiller les Métriques¶
Métriques Clés à Surveiller
| Métrique | Description | Action sur Alerte |
|---|---|---|
| État connecteur | RUNNING, PAUSED, FAILED | Redémarrer si FAILED |
| État tâche | Statut tâche individuelle | Vérifier logs, redémarrer tâche |
| Records traités | Taux de débit | Augmenter tâches si lent |
| Compte erreurs | Nombre messages échoués | Vérifier logs erreurs, corriger config |
| Lag offset | Lag connecteur source | Augmenter parallélisme |
Problèmes Courants¶
Dépannage Connecteurs
1. Le Connecteur Ne Démarre Pas ```bash # Vérifier logs tail -f logs/connect.log
# Causes courantes :
# - Plugin connecteur manquant
# - Configuration invalide
# - Connectivité réseau
```
**2. Performance Lente**
```json
{
"tasks.max": "4" // Augmenter parallélisme
}
```
**3. Erreurs Évolution Schéma**
```json
{
"value.converter.schemas.enable": "false", // Désactiver schémas
"auto.evolve": "true" // Auto-adapter aux changements schéma
}
```
**4. Corruption Offset**
```bash
# Réinitialiser offsets (consumer group)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group connect-my-connector \
--reset-offsets --to-earliest --execute --all-topics
```
Écosystème des Connecteurs¶
Confluent Hub¶
Dépôt de Connecteurs
Confluent Hub fournit plus de 100 connecteurs certifiés.
**Installer connecteurs :**
```bash
# Installer Confluent Hub CLI
confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 // (1)!
# Lister installés
confluent-hub list
```
1. Installe connecteur et dépendances
Auto-géré vs Géré Cloud¶
- Auto-géré
Avantages : - Contrôle total infrastructure - Pas de verrouillage fournisseur - Connecteurs personnalisés
Inconvénients : - Charge opérationnelle - Scaling manuel - Gestion sécurité
Utiliser quand : On-premises ou besoin connecteurs personnalisés
- Géré Cloud (Confluent Cloud)
Avantages : - Zéro charge opérationnelle - Auto-scaling - 80+ connecteurs gérés
Inconvénients : - Spécifique fournisseur - Personnalisation limitée
Utiliser quand : Préférer services gérés, connecteurs standards
Meilleures Pratiques¶
Liste de Contrôle Déploiement Production
Configuration :
- [x] Utiliser mode distribué pour production
- [x] Définir facteur réplication ≥ 3 pour topics internes
- [x] Activer authentification et chiffrement
- [x] Configurer `tasks.max` approprié pour parallélisme
**Surveillance :**
- [x] Surveiller états connecteur et tâches
- [x] Suivre métriques débit et lag
- [x] Configurer alertes pour état FAILED
- [x] Logger erreurs vers système centralisé
**Qualité Données :**
- [x] Activer validation schéma (Schema Registry)
- [x] Utiliser SMTs pour vérifications qualité données
- [x] Gérer évolution schéma gracieusement
- [x] Tester avec données échantillon d'abord
**Performance :**
- [x] Ajuster `batch.size` et `linger.ms`
- [x] Adapter `tasks.max` selon charge
- [x] Utiliser compression pour gros messages
- [x] Surveiller consumer lag
Meilleures Pratiques SMT¶
Quand NE PAS Utiliser les SMTs
Les SMTs sont sans état et mono-record. Ne PAS utiliser SMTs pour :
- Agrégations (utiliser Kafka Streams/Flink)
- Jointures (utiliser stream processing)
- Logique métier complexe
- Enrichissement nécessitant lookups externes
**À la place :** Streamer données vers Kafka d'abord, puis traiter avec stream processing dédié.
Exemple Complet : Pipeline End-to-End¶
Voici un pipeline de données complet de PostgreSQL vers Elasticsearch :
Prochaines Étapes¶
!!! tip "Continuez Votre Apprentissage" - Utiliser Schema Registry - Gérer schémas pour qualité données - Traiter des Streams - Transformer données avec Flink ou Kafka Streams - Produire des Messages - Écrire producers personnalisés - Consommer des Messages - Écrire consumers personnalisés
Ressources Supplémentaires¶
- Documentation Kafka Connect
- Cours Kafka Connect 101
- Confluent Hub - Dépôt Connecteurs
- Connecteurs CDC Debezium
- Référence Single Message Transforms
Pratique Hands-On
Essayez l'exercice interactif Kafka Connect de Confluent pour pratiquer le déploiement de connecteurs.