Pattern Outbox Transactionnel
Apprenez à publier des événements de domaine de manière fiable avec le Pattern Outbox Transactionnel.
Le problème
Lorsque vous devez mettre à jour une base de données et publier des événements, vous faites face à un problème de transaction distribuée:
// ❌ Approche peu fiable
suspend fun createOrder(command: CreateOrderCommand): Result<OrderId> {
return runCatching {
val order = Order.create(command.customerId)
orderRepository.save(order) // Opération de base de données
// Et si cela échoue? L'événement est perdu!
eventPublisher.publish(order.domainEvents) // Système externe
order.id
}
}
Problèmes:
- Si la publication d'événement échoue, les modifications de la base de données sont validées mais les événements sont perdus
- Si la base de données échoue après la publication, les événements sont envoyés mais les données sont incohérentes
- Pas d'atomicité entre la base de données et le courtier de messages
La solution
Le Pattern Outbox Transactionnel résout ce problème en:
- Sauvegardant les événements dans une table de base de données dans la même transaction que l'agrégat
- Publiant les événements depuis la table outbox dans un processus séparé
- Marquant les événements comme publiés après une livraison réussie
┌─────────────────────────────────────────┐
│ Gestionnaire de commandes │
│ 1. Sauvegarder l'agrégat │
│ 2. Sauvegarder les événements dans │
│ l'outbox (même transaction) │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Éditeur de l'Outbox │
│ 1. Lire les événements non publiés │
│ 2. Publier vers le broker de messages │
│ 3. Marquer comme publiés │
└─────────────────────────────────────────┘
Implémentation
1. Définir le repository Outbox
import com.melsardes.libraries.structuskotlin.domain.MessageOutboxRepository
import com.melsardes.libraries.structuskotlin.domain.OutboxMessage
interface MessageOutboxRepository : Repository {
suspend fun save(event: DomainEvent)
suspend fun findUnpublished(limit: Int): List<OutboxMessage>
suspend fun markAsPublished(messageId: String)
suspend fun incrementRetryCount(messageId: String)
suspend fun deletePublishedOlderThan(olderThanDays: Int): Int
suspend fun findFailedEvents(maxRetries: Int): List<OutboxMessage>
}
2. Créer la table Outbox
CREATE TABLE message_outbox (
id VARCHAR(36) PRIMARY KEY,
event_id VARCHAR(36) NOT NULL,
event_type VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMP NOT NULL,
published_at TIMESTAMP,
retry_count INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
3. Utiliser dans le gestionnaire de commandes
class CreateOrderCommandHandler(
private val orderRepository: OrderRepository,
private val outboxRepository: MessageOutboxRepository,
private val database: Database
) : CommandHandler<CreateOrderCommand, Result<OrderId>> {
override suspend operator fun invoke(
command: CreateOrderCommand
): Result<OrderId> {
return runCatching {
database.transaction {
// 1. Créer et sauvegarder l'agrégat
val order = Order.create(command.customerId, command.items)
orderRepository.save(order)
// 2. Sauvegarder les événements dans l'outbox (même transaction!)
order.domainEvents.forEach { event ->
outboxRepository.save(event)
}
// 3. Effacer les événements de l'agrégat
order.clearEvents()
order.id
}
}
}
}
4. Implémenter le Publisher de l'Outbox
class OutboxPublisher(
private val outboxRepository: MessageOutboxRepository,
private val eventPublisher: DomainEventPublisher,
private val maxRetries: Int = 3
) {
suspend fun publishPendingEvents() {
val messages = outboxRepository.findUnpublished(limit = 100)
messages.forEach { message ->
try {
// Publier vers le système externe
val event = deserializeEvent(message.payload, message.eventType)
eventPublisher.publish(event)
// Marquer comme publié
outboxRepository.markAsPublished(message.id)
} catch (e: Exception) {
// Incrémenter le compteur de tentatives
outboxRepository.incrementRetryCount(message.id)
}
}
}
}
5. Planifier le Publisher
// Utilisation des coroutines Kotlin
class OutboxScheduler(
private val outboxPublisher: OutboxPublisher
) {
fun start(scope: CoroutineScope) {
scope.launch {
while (isActive) {
try {
outboxPublisher.publishPendingEvents()
} catch (e: Exception) {
logger.error("Erreur dans le publisher de l'outbox", e)
}
delay(5000) // Sonder toutes les 5 secondes
}
}
}
}
Avantages
✅ Atomicité: Les événements et les changements de données sont atomiques ✅ Fiabilité: Les événements ne sont jamais perdus ✅ Cohérence: La base de données et les événements restent synchronisés ✅ Logique de nouvelle tentative: Les publications échouées sont réessayées ✅ Surveillance: Suivi des événements non publiés
Compromis
⚠️ Cohérence éventuelle: Les événements sont publiés de façon asynchrone ⚠️ Complexité: Infrastructure additionnelle nécessaire ⚠️ Stockage: La table outbox grandit avec le temps ⚠️ Sondage: Le publisher sonde la base de données
Bonnes pratiques
✅ À faire
- Utiliser des transactions pour les sauvegardes d'agrégat + outbox
- Effacer les événements après sauvegarde dans l'outbox
- Implémenter une logique de nouvelle tentative avec backoff
- Surveiller le nombre d'événements non publiés
- Nettoyer les anciens événements publiés
- Gérer les événements échoués (file d'attente des lettres mortes)
❌ À éviter
- Publier des événements directement depuis les gestionnaires de commandes
- Oublier d'effacer les événements des agrégats
- Ignorer la gestion des erreurs dans le publisher
- Laisser la table outbox croître indéfiniment
- Ignorer les événements échoués
Prochaines étapes
- Architecture Orientée Événements - Construire avec des événements
- Gestion des erreurs - Gérer les échecs
- Stratégies de test - Modèles de test