Kotlin Coroutine은 단순히 suspend 하나가 아니다. 비동기 프로그래밍을 위한 7가지 핵심 기능을 제공하며, 각각의 역할과 사용 시점이 다르다. 이 글에서는 각 기능을 실제 코드와 함께 정리한다.

기본적인 Reactive/Coroutine 개념은 Spring Reactive Programming - 기초부터 실전 마이그레이션까지 참고


1. suspend — 중단 가능한 함수

코루틴의 가장 기본 단위. 함수 실행 중 일시 중단(suspend)했다가 결과가 준비되면 재개(resume)할 수 있다.

// suspend 함수 — DB 응답 대기 중 스레드를 반납
suspend fun findBySeq(downtimeSeq: Int): Downtime? {
    return Mono.from(
        dsl.selectFrom(DOWNTIMES)
            .where(DOWNTIMES.DOWNTIME_SEQ.eq(downtimeSeq))
    ).awaitSingleOrNull()?.let { mapToDowntime(it) }
}

핵심 원리

일반 함수:  [시작] ──────────────────────────────── [끝]
            스레드가 끝까지 점유

suspend 함수: [시작] ── [중단 → 스레드 반납] ── [재개] ── [끝]
              DB 대기 중 다른 요청 처리 가능

규칙

  • suspend 함수는 코루틴 안 또는 다른 suspend 함수에서만 호출 가능
  • 일반 함수에서 직접 호출하면 컴파일 에러
// ❌ 컴파일 에러
fun main() {
    findBySeq(1)  // suspend 함수를 일반 함수에서 호출 불가
}

// ✅ 코루틴 스코프 안에서 호출
runBlocking {
    findBySeq(1)
}

내부 동작 — CPS (Continuation Passing Style)

컴파일러가 suspend 함수를 콜백 기반 코드로 변환한다. 개발자는 동기 코드처럼 작성하지만, 실제로는 상태 머신으로 동작한다.

// 개발자가 작성한 코드
suspend fun process() {
    val a = stepA()  // 중단점 1
    val b = stepB(a) // 중단점 2
    return a + b
}

// 컴파일러가 변환한 코드 (개념적)
fun process(continuation: Continuation<Int>) {
    when (continuation.label) {
        0 -> { continuation.label = 1; stepA(continuation) }
        1 -> { continuation.label = 2; val a = continuation.result; stepB(a, continuation) }
        2 -> { val b = continuation.result; continuation.resume(a + b) }
    }
}

2. Flow — 비동기 스트림

Flux의 코루틴 버전. 여러 값을 비동기로 순차 발행한다.

// Flux → Flow 변환 (현재 프로젝트)
fun findActive(serviceType: String, baseDt: LocalDateTime): Flow<Downtime> {
    return Flux.from(
        dsl.selectFrom(DOWNTIMES)
            .where(DOWNTIMES.SERVICE_TYPE.eq(serviceType))
            .and(DOWNTIMES.START_DT.le(baseDt))
            .and(DOWNTIMES.END_DT.ge(baseDt))
            .and(DOWNTIMES.IS_DELETED.eq(false))
    ).asFlow().map { mapToDowntime(it) }
}

Flow 직접 만들기

// flow 빌더로 생성
fun countDown(): Flow<Int> = flow {
    for (i in 3 downTo 1) {
        delay(1000)
        emit(i)  // 값 발행
    }
}

// 소비
countDown().collect { value ->
    println(value)  // 1초 간격으로 3, 2, 1 출력
}

Flow 연산자

flowOf(1, 2, 3, 4, 5)
    .filter { it % 2 == 0 }     // 짝수만
    .map { it * 10 }             // 10 곱하기
    .take(2)                     // 2개만
    .collect { println(it) }     // 20, 40

Flow vs Flux

  Flow (Kotlin) Flux (Reactor)
소속 kotlinx.coroutines Project Reactor
타입 Cold (항상) Cold (기본), Hot 가능
소비 collect { } .subscribe()
배압 suspend로 자연 지원 request(n) 명시
에러 처리 catch { } onErrorResume()

왜 Flow를 쓰는가

// Flux 체이닝 — 복잡해지면 읽기 어려움
fun processAll(): Flux<Result> {
    return faxRepository.findAll()
        .filter { it.status == "PENDING" }
        .flatMap { fax ->
            processOne(fax)
                .onErrorResume { Mono.empty() }
        }
}

// Flow — 동기 코드처럼 읽힘
suspend fun processAll(): List<Result> {
    return faxRepository.findAll().asFlow()
        .filter { it.status == "PENDING" }
        .map { fax ->
            try { processOne(fax) }
            catch (e: Exception) { null }
        }
        .filterNotNull()
        .toList()
}

3. runBlocking — 코루틴을 블로킹으로 실행

현재 스레드를 블로킹하면서 코루틴을 실행한다. 일반 함수와 코루틴 세계를 연결하는 브릿지.

// 테스트에서 사용 (현재 프로젝트)
@Test
fun `insert - 정상 생성`(): Unit = runBlocking {
    val session = createSession()
    val dto = DowntimeInsertDto(
        serviceType = DowntimeServiceType.FAX,
        targetType = DowntimeTargetType.FAX_HANAFAX,
        startDt = LocalDateTime.now().plusHours(1),
        endDt = LocalDateTime.now().plusHours(2)
    )

    val result = downtimeService.insert(dto, session)

    assertThat(result.downtimeSeq).isNotNull()
}

동작 원리

runBlocking {
    val a = suspendFuncA()  // 중단 → 현재 스레드가 기다림
    val b = suspendFuncB()  // 중단 → 현재 스레드가 기다림
}
// 여기서 스레드 재개

사용처

용도 설명
main() 함수 최상위 진입점에서 코루틴 시작
테스트 suspend 함수를 JUnit에서 호출
레거시 연동 코루틴 미지원 프레임워크와 연결

주의

// ❌ 절대 금지 — WebFlux 이벤트 루프에서 runBlocking 사용
@GetMapping("/faxes")
fun getFaxes(): List<Fax> = runBlocking {
    faxService.findAll()  // 이벤트 루프 스레드를 블로킹 → 성능 붕괴
}

// ✅ WebFlux에서는 suspend fun 사용
@GetMapping("/faxes")
suspend fun getFaxes(): List<Fax> {
    return faxService.findAll()
}

프로덕션 코드에서는 거의 사용하지 않는다. 주로 테스트와 main 함수에서만 사용한다.


4. async / await — 동시 실행

여러 비동기 작업을 동시에 실행하고 결과를 합칠 때 사용한다.

순차 실행 vs 동시 실행

// 순차 실행 — 총 2초
suspend fun loadPage(): PageData {
    val user = getUser()         // 1초
    val orders = getOrders()     // 1초
    return PageData(user, orders)
}

// 동시 실행 — 총 1초
suspend fun loadPage(): PageData = coroutineScope {
    val user = async { getUser() }       // 동시 시작
    val orders = async { getOrders() }   // 동시 시작
    PageData(user.await(), orders.await()) // 둘 다 끝나면 합치기
}

시간 비교

순차 실행:
getUser()   ████████ 1초
                     getOrders() ████████ 1초
총 시간: ─────────────────────────────── 2초

동시 실행 (async):
getUser()   ████████ 1초
getOrders() ████████ 1초
총 시간: ──────────── 1초

async의 반환 타입 — Deferred

coroutineScope {
    val deferred: Deferred<User> = async { getUser() }  // 바로 반환 (실행 시작)

    // 다른 작업 가능
    println("다른 작업 중...")

    val user: User = deferred.await()  // 결과가 필요한 시점에 대기
}

실전 예제 — 대시보드 API

@GetMapping("/dashboard")
suspend fun dashboard(): DashboardResponse = coroutineScope {
    // 3개 API를 동시에 호출
    val downtimes = async { downtimeService.findActive() }
    val stats = async { statsService.getToday() }
    val alerts = async { alertService.getUnread() }

    DashboardResponse(
        downtimes = downtimes.await(),
        stats = stats.await(),
        alerts = alerts.await()
    )
    // 가장 느린 API 시간 = 총 응답 시간
}

async vs launch 차이

coroutineScope {
    // async — 결과를 반환
    val result: Deferred<Int> = async { calculate() }
    println(result.await())  // 42

    // launch — 결과 없이 실행만 (fire-and-forget)
    launch { sendNotification() }  // Job 반환
}

5. launch — 백그라운드 작업 실행

결과를 기다리지 않고 백그라운드에서 실행한다. “보내고 잊기(fire-and-forget)” 패턴.

// 알림 전송 — 결과를 기다릴 필요 없음
suspend fun insert(dto: DowntimeInsertDto, session: Session): Downtime = coroutineScope {
    val downtime = Downtime.from(dto, session)
    downtimeRepository.insert(downtime)

    // 백그라운드로 알림 전송 (메인 응답에 영향 없음)
    launch {
        notificationService.notify("다운타임 등록: ${downtime.serviceType}")
    }

    downtime  // 알림 완료를 기다리지 않고 바로 반환
}

launch의 반환 타입 — Job

val job: Job = launch {
    repeat(100) { i ->
        println("작업 중... $i")
        delay(100)
    }
}

// Job 제어
job.isActive    // 실행 중인지 확인
job.cancel()    // 취소
job.join()      // 완료될 때까지 대기 (필요한 경우)

실전 예제 — 여러 백그라운드 작업

suspend fun afterInsert(downtime: Downtime) = coroutineScope {
    // 3개 작업을 동시에 백그라운드 실행
    launch { logService.save(Log.from(downtime)) }        // 로그 저장
    launch { cacheService.invalidate(downtime.serviceType) } // 캐시 무효화
    launch { notificationService.notify(downtime) }        // 알림 전송
    // coroutineScope 블록이 끝나면 3개 모두 완료 보장
}

async vs launch 정리

  async launch
반환 Deferred<T> (결과 있음) Job (결과 없음)
결과 수신 .await() 없음
용도 결과가 필요한 동시 작업 fire-and-forget
예시 여러 API 호출 후 합치기 알림, 로그, 캐시 갱신

6. Channel — 코루틴 간 데이터 통신

코루틴끼리 데이터를 주고받는 파이프. BlockingQueue의 코루틴 버전이다.

val channel = Channel<Int>()

// 생산자 코루틴
launch {
    for (i in 1..5) {
        channel.send(i)      // 보내기 (수신자가 받을 때까지 중단)
        println("전송: $i")
    }
    channel.close()           // 더 이상 보낼 데이터 없음
}

// 소비자 코루틴
launch {
    for (value in channel) {  // close될 때까지 반복 수신
        println("수신: $value")
    }
}

채널 용량 (버퍼)

// 버퍼 없음 — send와 receive가 만날 때까지 대기 (기본)
val rendezvous = Channel<Int>()

// 버퍼 있음 — 버퍼가 가득 찰 때까지 send 가능
val buffered = Channel<Int>(capacity = 10)

// 무제한 — send가 절대 중단되지 않음 (메모리 주의)
val unlimited = Channel<Int>(capacity = Channel.UNLIMITED)

실전 예제 — 생산자/소비자 패턴

suspend fun processInBatches(items: List<Item>) = coroutineScope {
    val channel = Channel<Item>(capacity = 100)

    // 생산자 1개
    launch {
        items.forEach { channel.send(it) }
        channel.close()
    }

    // 소비자 3개 (동시 처리)
    repeat(3) { workerId ->
        launch {
            for (item in channel) {
                println("Worker $workerId 처리: ${item.id}")
                processItem(item)
            }
        }
    }
}
Channel (용량 100)
                  ┌─→ Worker 0: [Item1] [Item4] [Item7] ...
생산자 ──→ [||||] ├─→ Worker 1: [Item2] [Item5] [Item8] ...
                  └─→ Worker 2: [Item3] [Item6] [Item9] ...

Channel vs Flow

  Channel Flow
성격 Hot (보내면 바로 흐름) Cold (collect 해야 시작)
소비자 여러 코루틴이 나눠 받음 각 collector가 독립 수신
용도 코루틴 간 통신, 작업 분배 데이터 스트림 변환/처리

7. CoroutineScope — 코루틴 생명주기 관리

코루틴의 범위(scope)를 정의한다. 스코프가 취소되면 그 안의 모든 코루틴이 취소된다.

구조화된 동시성 (Structured Concurrency)

suspend fun loadPage() = coroutineScope {
    val user = async { getUser() }       // 자식 코루틴 1
    val orders = async { getOrders() }   // 자식 코루틴 2

    // getOrders()에서 예외 발생 시 → getUser()도 자동 취소
    PageData(user.await(), orders.await())
}
coroutineScope
├── async { getUser() }     ← 부모가 취소되면 같이 취소
└── async { getOrders() }   ← 여기서 예외 → 부모와 형제 모두 취소

coroutineScope vs supervisorScope

// coroutineScope — 자식 하나 실패 → 전체 취소
suspend fun allOrNothing() = coroutineScope {
    launch { taskA() }  // taskB 실패 시 → taskA도 취소
    launch { taskB() }  // 예외 발생!
}

// supervisorScope — 자식 하나 실패해도 나머지 계속 실행
suspend fun bestEffort() = supervisorScope {
    launch { taskA() }  // taskB 실패해도 계속 실행
    launch { taskB() }  // 예외 발생! (taskA에 영향 없음)
}
  coroutineScope supervisorScope
자식 실패 시 형제 코루틴 전체 취소 실패한 것만 취소
용도 모두 성공해야 의미 있는 작업 독립적인 작업들
예시 결제 = 재고 차감 + 카드 승인 알림 전송 = 이메일 + SMS + 푸시

Spring에서의 CoroutineScope

Spring WebFlux에서는 각 요청마다 코루틴 스코프가 자동 생성된다.

// Spring이 요청마다 자동으로 코루틴 스코프를 만들어준다
@GetMapping("/downtimes")
suspend fun findAll(): List<Downtime> {  // ← 이 suspend가 스코프의 시작점
    return downtimeService.findAll()
}

커스텀 CoroutineScope — 애플리케이션 레벨 백그라운드 작업

@Component
class BackgroundProcessor : CoroutineScope {

    // 이 스코프의 생명주기 = 애플리케이션 생명주기
    override val coroutineContext = SupervisorJob() + Dispatchers.Default

    fun startPeriodicCleanup() {
        launch {
            while (isActive) {
                cleanup()
                delay(60_000)  // 1분마다 실행
            }
        }
    }

    @PreDestroy
    fun shutdown() {
        cancel()  // 애플리케이션 종료 시 모든 코루틴 취소
    }
}

전체 비교표

기능 한줄 요약 반환 타입 핵심 키워드
suspend 중단 가능한 함수 T 중단/재개
Flow 비동기 스트림 Flow<T> 여러 값 순차 발행
runBlocking 코루틴을 블로킹 실행 T 테스트, main
async/await 동시 실행 후 결과 합침 Deferred<T> 병렬 처리
launch 백그라운드 실행 Job fire-and-forget
Channel 코루틴 간 통신 Channel<T> 생산자/소비자
CoroutineScope 생명주기 관리 - 구조화된 동시성

이 프로젝트(downtime)에서 사용 중인 기능

기능 사용 위치 이유
suspend Controller, Service, Repository jOOQ R2DBC 결과를 동기처럼 처리
Flow findActive(), search() 다건 조회 스트림 반환
runBlocking 테스트 코드 JUnit에서 suspend 함수 호출

나머지 async, launch, Channel, CoroutineScope는 현재 프로젝트 규모에서 필요하지 않아 사용하지 않는다. 로직이 복잡해지고 동시 처리나 백그라운드 작업이 필요해지면 도입하게 된다.