Kotlin Coroutines Guide
Kotlin Coroutines are a fundamental part of Structus's asynchronous processing capabilities. This guide provides an overview of how coroutines work and how they're used within Structus.
Basics of Coroutines
Coroutines are Kotlin's solution for asynchronous programming. They enable you to write asynchronous code in a sequential style, making it easier to understand and maintain.
Suspending Functions
Suspending functions are at the core of coroutines. These functions can pause their execution and resume later, allowing for non-blocking operations.
// Defining a suspending function
suspend fun fetchData(): Data {
// This function can be paused and resumed
return networkService.getData()
}
Coroutine Builders
Coroutine builders are used to start new coroutines. The most common ones are:
// launch: starts a new coroutine without returning a result
suspend fun main() = coroutineScope {
launch {
delay(1000)
println("Kotlin Coroutines World!")
}
println("Hello")
}
// async: starts a coroutine that returns a result
suspend fun fetchTwoValues() = coroutineScope {
val valueOne = async { getValue1() }
val valueTwo = async { getValue2() }
// Await both results
valueOne.await() + valueTwo.await()
}
Using Coroutines in Structus
Structus uses coroutines extensively to ensure non-blocking operations, especially in repositories and command/query handlers.
Repositories
All repository methods in Structus are suspending functions:
interface OrderRepository : Repository {
suspend fun findById(id: OrderId): Order?
suspend fun save(order: Order)
suspend fun delete(id: OrderId)
}
Command Handlers
Command handlers use the suspend modifier to enable asynchronous processing:
class CreateOrderCommandHandler(
private val orderRepository: OrderRepository,
private val outboxRepository: MessageOutboxRepository
) : CommandHandler<CreateOrderCommand, Result<OrderId>> {
override suspend operator fun invoke(command: CreateOrderCommand): Result<OrderId> {
return runCatching {
val order = Order.create(command.customerId)
orderRepository.save(order)
order.domainEvents.forEach { outboxRepository.save(it) }
order.clearEvents()
order.id
}
}
}
Coroutines and Transactional Outbox
Structus's Transactional Outbox pattern relies heavily on coroutines for reliable event publishing:
class OutboxPublisher(
private val outboxRepository: MessageOutboxRepository,
private val eventPublisher: DomainEventPublisher
) {
suspend fun publishPendingEvents() {
val messages = outboxRepository.findUnpublished(limit = 100)
messages.forEach { message ->
try {
val event = deserializeEvent(message.payload, message.eventType)
eventPublisher.publish(event)
outboxRepository.markAsPublished(message.id)
} catch (e: Exception) {
outboxRepository.incrementRetryCount(message.id)
}
}
}
}
Best Practices
Use Structured Concurrency
Always use structured concurrency to ensure all coroutines are properly managed:
suspend fun fetchData() = coroutineScope {
// All coroutines started in this scope will be completed
// before the function returns
val result1 = async { api.fetchFirstPart() }
val result2 = async { api.fetchSecondPart() }
combineResults(result1.await(), result2.await())
}
Handle Exceptions Properly
Use runCatching or try-catch blocks to handle exceptions in coroutines:
suspend fun safeApiCall() = runCatching {
api.fetchData()
}.getOrElse { error ->
// Handle error
logger.error(error)
defaultValue
}
Choose the Right Dispatcher
Select an appropriate dispatcher based on the work being done:
- Dispatchers.IO: For IO-bound work (network, disk)
- Dispatchers.Default: For CPU-intensive work
- Dispatchers.Main: For UI operations (Android)
withContext(Dispatchers.IO) {
// IO operations (network, database)
}