Aller au contenu principal
Version: Suivant

Ktor Integration

This guide demonstrates how to integrate Structus with Ktor, a lightweight and flexible Kotlin web framework.

Overview

Ktor is an asynchronous web framework built from the ground up to leverage Kotlin's language features like coroutines. It's an excellent match for Structus since both are designed with Kotlin-first principles.

Setup

Dependencies

Add the following to your build.gradle.kts:

dependencies {
    // Structus
    implementation("com.github.melsardes:structus-kotlin:0.1.0")
    
    // Ktor Server
    implementation("io.ktor:ktor-server-core:2.3.7")
    implementation("io.ktor:ktor-server-netty:2.3.7")
    implementation("io.ktor:ktor-server-content-negotiation:2.3.7")
    implementation("io.ktor:ktor-serialization-jackson:2.3.7")
    
    // Database (using Exposed)
    implementation("org.jetbrains.exposed:exposed-core:0.45.0")
    implementation("org.jetbrains.exposed:exposed-dao:0.45.0")
    implementation("org.jetbrains.exposed:exposed-jdbc:0.45.0")
    implementation("org.jetbrains.exposed:exposed-kotlin-datetime:0.45.0")
    
    // Coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
}

Project Structure

Following clean architecture principles:

src/
├── main/
│   ├── kotlin/
│   │   ├── com/example/
│   │   │   ├── domain/          # Domain layer (entities, repositories)
│   │   │   ├── application/     # Application layer (commands, queries)
│   │   │   ├── infrastructure/  # Infrastructure layer (DB, external services)
│   │   │   ├── presentation/    # Presentation layer (Ktor routes)
│   │   │   └── Application.kt   # Main application entry point

Domain Layer

Create your domain entities and repositories:

// domain/Product.kt
data class ProductId(val value: String) : ValueObject

class Product private constructor(
    override val id: ProductId,
    private var name: String,
    private var price: Money
) : AggregateRoot<ProductId>() {

    val name: String get() = name
    val price: Money get() = price
    
    fun updateDetails(name: String, price: Money) {
        this.name = name
        this.price = price
        recordEvent(ProductUpdatedEvent(id.value, name, price.amount))
    }
    
    companion object {
        fun create(id: ProductId, name: String, price: Money): Product {
            val product = Product(id, name, price)
            product.recordEvent(ProductCreatedEvent(id.value, name, price.amount))
            return product
        }
    }
}

// domain/ProductRepository.kt
interface ProductRepository : Repository {
    suspend fun findById(id: ProductId): Product?
    suspend fun save(product: Product)
    suspend fun delete(id: ProductId)
    suspend fun findAll(): List<Product>
}

Application Layer

Create command and query handlers:

// application/commands/CreateProductCommand.kt
data class CreateProductCommand(
    val name: String,
    val price: BigDecimal,
    val currency: String
) : Command

// application/commands/CreateProductCommandHandler.kt
class CreateProductCommandHandler(
    private val productRepository: ProductRepository,
    private val outboxRepository: MessageOutboxRepository
) : CommandHandler<CreateProductCommand, Result<ProductId>> {
    
    override suspend operator fun invoke(command: CreateProductCommand): Result<ProductId> {
        return runCatching {
            val id = ProductId(UUID.randomUUID().toString())
            val price = Money(command.price, Currency.getInstance(command.currency))
            
            val product = Product.create(id, command.name, price)
            productRepository.save(product)
            
            product.domainEvents.forEach { outboxRepository.save(it) }
            product.clearEvents()
            
            id
        }
    }
}

// application/queries/GetProductQuery.kt
data class GetProductQuery(val id: String) : Query

data class ProductDto(
    val id: String,
    val name: String,
    val price: BigDecimal,
    val currency: String
)

// application/queries/GetProductQueryHandler.kt
class GetProductQueryHandler(
    private val productRepository: ProductRepository
) : QueryHandler<GetProductQuery, ProductDto?> {
    
    override suspend operator fun invoke(query: GetProductQuery): ProductDto? {
        val product = productRepository.findById(ProductId(query.id)) ?: return null
        
        return ProductDto(
            id = product.id.value,
            name = product.name,
            price = product.price.amount,
            currency = product.price.currency.currencyCode
        )
    }
}

Infrastructure Layer

Implement repositories using Exposed:

// infrastructure/persistence/ProductTable.kt
object ProductTable : Table("products") {
    val id = varchar("id", 36)
    val name = varchar("name", 255)
    val price = decimal("price", 19, 4)
    val currency = varchar("currency", 3)
    
    override val primaryKey = PrimaryKey(id)
}

// infrastructure/persistence/ProductRepositoryImpl.kt
class ProductRepositoryImpl : ProductRepository {
    
    override suspend fun findById(id: ProductId): Product? = dbQuery {
        ProductTable.select { ProductTable.id eq id.value }
            .singleOrNull()
            ?.let { mapToProduct(it) }
    }
    
    override suspend fun save(product: Product) = dbQuery {
        val exists = ProductTable.select { ProductTable.id eq product.id.value }.count() > 0
        
        if (exists) {
            ProductTable.update({ ProductTable.id eq product.id.value }) {
                it[name] = product.name
                it[price] = product.price.amount
                it[currency] = product.price.currency.currencyCode
            }
        } else {
            ProductTable.insert {
                it[id] = product.id.value
                it[name] = product.name
                it[price] = product.price.amount
                it[currency] = product.price.currency.currencyCode
            }
        }
    }
    
    override suspend fun delete(id: ProductId) = dbQuery {
        ProductTable.deleteWhere { ProductTable.id eq id.value }
    }
    
    override suspend fun findAll(): List<Product> = dbQuery {
        ProductTable.selectAll().map { mapToProduct(it) }
    }
    
    private fun mapToProduct(row: ResultRow): Product {
        val id = ProductId(row[ProductTable.id])
        val name = row[ProductTable.name]
        val price = row[ProductTable.price]
        val currency = Currency.getInstance(row[ProductTable.currency])
        
        // Use reflection or a special factory method
        return ProductFactory.reconstruct(id, name, Money(price, currency))
    }
    
    private suspend fun <T> dbQuery(block: () -> T): T =
        withContext(Dispatchers.IO) {
            transaction { block() }
        }
}

// infrastructure/persistence/OutboxTable.kt
object OutboxTable : Table("message_outbox") {
    val id = varchar("id", 36)
    val eventType = varchar("event_type", 255)
    val aggregateId = varchar("aggregate_id", 255)
    val eventData = text("event_data")
    val published = bool("published")
    val timestamp = timestamp("timestamp")
    val publishedAt = timestamp("published_at").nullable()
    
    override val primaryKey = PrimaryKey(id)
}

// infrastructure/persistence/MessageOutboxRepositoryImpl.kt
class MessageOutboxRepositoryImpl(
    private val objectMapper: ObjectMapper
) : MessageOutboxRepository {
    
    override suspend fun save(event: DomainEvent) = dbQuery {
        val eventId = UUID.randomUUID().toString()
        val eventType = event.javaClass.name
        val aggregateId = event.aggregateId
        val eventData = objectMapper.writeValueAsString(event)
        
        OutboxTable.insert {
            it[id] = eventId
            it[OutboxTable.eventType] = eventType
            it[OutboxTable.aggregateId] = aggregateId
            it[OutboxTable.eventData] = eventData
            it[published] = false
            it[timestamp] = LocalDateTime.now()
        }
    }
    
    // Other methods omitted
}

Presentation Layer

Set up Ktor routes to expose your application:

// presentation/Routes.kt
fun Application.configureRouting(
    createProductHandler: CreateProductCommandHandler,
    getProductHandler: GetProductQueryHandler
) {
    routing {
        route("/api/v1") {
            route("/products") {
                post {
                    val request = call.receive<CreateProductRequestDto>()
                    
                    val command = CreateProductCommand(
                        name = request.name,
                        price = request.price,
                        currency = request.currency
                    )
                    
                    when (val result = createProductHandler(command)) {
                        is Result.Success -> {
                            call.respond(
                                HttpStatusCode.Created,
                                mapOf("id" to result.value.value)
                            )
                        }
                        is Result.Failure -> {
                            call.respond(
                                HttpStatusCode.BadRequest,
                                mapOf("error" to (result.error.message ?: "Unknown error"))
                            )
                        }
                    }
                }
                
                get("/{id}") {
                    val id = call.parameters["id"] ?: run {
                        call.respond(HttpStatusCode.BadRequest, "Missing id parameter")
                        return@get
                    }
                    
                    val query = GetProductQuery(id)
                    val product = getProductHandler(query)
                    
                    if (product != null) {
                        call.respond(product)
                    } else {
                        call.respond(HttpStatusCode.NotFound)
                    }
                }
            }
        }
    }
}

// presentation/DTOs.kt
@Serializable
data class CreateProductRequestDto(
    val name: String,
    val price: BigDecimal,
    val currency: String
)

Main Application

Create the main application entry point:

// Application.kt
fun main() {
    embeddedServer(Netty, port = 8080) {
        configureSerialization()
        configureDatabase()
        
        // Setup DI
        val objectMapper = jacksonObjectMapper()
        
        val productRepository = ProductRepositoryImpl()
        val outboxRepository = MessageOutboxRepositoryImpl(objectMapper)
        
        val createProductHandler = CreateProductCommandHandler(productRepository, outboxRepository)
        val getProductHandler = GetProductQueryHandler(productRepository)
        
        // Configure routes
        configureRouting(createProductHandler, getProductHandler)
        
        // Start outbox processor
        launch {
            val outboxProcessor = OutboxProcessor(outboxRepository)
            outboxProcessor.start()
        }
    }.start(wait = true)
}

// Configure Jackson serialization
fun Application.configureSerialization() {
    install(ContentNegotiation) {
        jackson {
            registerModule(JavaTimeModule())
            disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
            registerModule(KotlinModule.Builder().build())
        }
    }
}

// Configure database
fun Application.configureDatabase() {
    Database.connect(
        url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1",
        driver = "org.h2.Driver"
    )
    
    transaction {
        SchemaUtils.create(ProductTable, OutboxTable)
    }
}

Event Processing

Create an outbox processor to publish events:

// infrastructure/events/OutboxProcessor.kt
class OutboxProcessor(
    private val outboxRepository: MessageOutboxRepository,
    private val kafkaProducer: KafkaProducer<String, String> = createKafkaProducer()
) {
    private val logger = LoggerFactory.getLogger(OutboxProcessor::class.java)
    private val objectMapper = jacksonObjectMapper()
    
    suspend fun start() {
        while (true) {
            try {
                val messages = outboxRepository.findUnpublished(100)
                
                messages.forEach { message ->
                    try {
                        // Publish to Kafka
                        val record = ProducerRecord(
                            "domain-events",
                            message.aggregateId,
                            objectMapper.writeValueAsString(message.event)
                        )
                        
                        kafkaProducer.send(record).get()
                        
                        // Mark as published
                        outboxRepository.markAsPublished(message.id)
                    } catch (e: Exception) {
                        logger.error("Failed to publish event ${message.id}", e)
                        outboxRepository.incrementRetryCount(message.id)
                    }
                }
            } catch (e: Exception) {
                logger.error("Error processing outbox", e)
            }
            
            delay(1000)
        }
    }
    
    private companion object {
        private fun createKafkaProducer(): KafkaProducer<String, String> {
            val props = Properties()
            props["bootstrap.servers"] = "localhost:9092"
            props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
            props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
            return KafkaProducer(props)
        }
    }
}

Testing

Create tests for your application:

// ApplicationTest.kt
class ApplicationTest {
    private val testEngine = TestApplicationEngine(createTestEnvironment())
    private val productRepository = mockk<ProductRepository>()
    private val outboxRepository = mockk<MessageOutboxRepository>()
    
    @BeforeTest
    fun setup() {
        testEngine.start(wait = false)
        testEngine.application.apply {
            configureSerialization()
            configureRouting(
                createProductHandler = CreateProductCommandHandler(
                    productRepository, outboxRepository
                ),
                getProductHandler = GetProductQueryHandler(
                    productRepository
                )
            )
        }
    }
    
    @AfterTest
    fun teardown() {
        testEngine.stop(0L, 0L)
    }
    
    @Test
    fun `test create product`() = runBlocking {
        // Setup mocks
        coEvery { productRepository.save(any()) } just Runs
        coEvery { outboxRepository.save(any()) } just Runs
        
        // Perform test
        with(testEngine) {
            handleRequest(HttpMethod.Post, "/api/v1/products") {
                addHeader(HttpHeaders.ContentType, ContentType.Application.Json.toString())
                setBody("""{"name":"Test Product","price":19.99,"currency":"USD"}""")
            }.apply {
                assertEquals(HttpStatusCode.Created, response.status())
                assertNotNull(response.content)
            }
        }
        
        // Verify
        coVerify { productRepository.save(any()) }
        coVerify { outboxRepository.save(any()) }
    }
}

Full Example

Check out our complete Ktor example on GitHub to see a full integration with Structus.

Next Steps

Now that you've integrated Structus with Ktor, explore our PostgreSQL Integration guide to learn how to use Structus with PostgreSQL.