Skip to content

Latest commit

ย 

History

History
368 lines (289 loc) ยท 10.3 KB

File metadata and controls

368 lines (289 loc) ยท 10.3 KB

Coroutine

๊ฐœ๋…

Coroutine์€ ์ค‘๋‹จ ๊ฐ€๋Šฅํ•œ(suspendable) ๊ฒฝ๋Ÿ‰ ์“ฐ๋ ˆ๋“œ์ด๋‹ค.

์ผ๋ฐ˜ ํ•จ์ˆ˜๋Š” ์‹œ์ž‘ํ•˜๋ฉด ๋๊นŒ์ง€ ์‹คํ–‰๋˜์–ด์•ผ ํ•˜์ง€๋งŒ, Coroutine์€ ์‹คํ–‰ ์ค‘ ์ผ์‹œ ์ค‘๋‹จ(suspend)ํ–ˆ๋‹ค๊ฐ€ ๋‚˜์ค‘์— ์žฌ๊ฐœํ•  ์ˆ˜ ์žˆ๋‹ค. OS ์“ฐ๋ ˆ๋“œ๋ฅผ ์ง์ ‘ ์ƒ์„ฑํ•˜์ง€ ์•Š๊ณ ๋„ ๋น„๋™๊ธฐ ์ž‘์—…์„ ํšจ์œจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

ํ•ต์‹ฌ ํŠน์ง•

  • ๊ฒฝ๋Ÿ‰์„ฑ: ์ˆ˜์ฒœ ๊ฐœ์˜ ์ฝ”๋ฃจํ‹ด์„ ์ƒ์„ฑํ•ด๋„ ์“ฐ๋ ˆ๋“œ๋ณด๋‹ค ๋ฉ”๋ชจ๋ฆฌ์™€ ์„ฑ๋Šฅ ๋ถ€๋‹ด์ด ์ ๋‹ค
  • ์ค‘๋‹จ ๊ฐ€๋Šฅ: suspend ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด ์“ฐ๋ ˆ๋“œ๋ฅผ ๋ธ”๋กœํ‚นํ•˜์ง€ ์•Š๊ณ  ์ž‘์—…์„ ์ผ์‹œ ์ค‘๋‹จํ•  ์ˆ˜ ์žˆ๋‹ค
  • ๊ตฌ์กฐํ™”๋œ ๋™์‹œ์„ฑ: CoroutineScope๋กœ ์ƒ๋ช…์ฃผ๊ธฐ๋ฅผ ๊ด€๋ฆฌํ•˜๋ฉฐ, Scope๊ฐ€ ์ข…๋ฃŒ๋˜๋ฉด ํ•˜์œ„ ์ฝ”๋ฃจํ‹ด๋„ ์ž๋™์œผ๋กœ ์ทจ์†Œ๋œ๋‹ค

์™œ ํ•„์š”ํ•œ๊ฐ€?

ํ•ด๊ฒฐํ•˜๋ ค๋Š” ๋ฌธ์ œ

๋น„๋™๊ธฐ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•  ๋•Œ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ง์ ‘ ๊ด€๋ฆฌํ•˜๊ฑฐ๋‚˜ ์ฝœ๋ฐฑ ํŒจํ„ด์„ ์‚ฌ์šฉํ•˜๋ฉด ๋ณต์žก๋„๊ฐ€ ์ฆ๊ฐ€ํ•˜๊ณ  ๊ฐ€๋…์„ฑ์ด ๋–จ์–ด์ง„๋‹ค.

๊ธฐ์กด ๋ฐฉ์‹์˜ ํ•œ๊ณ„

  1. ์“ฐ๋ ˆ๋“œ(Thread): ์ƒ์„ฑ ๋น„์šฉ์ด ํฌ๊ณ  ์ปจํ…์ŠคํŠธ ์Šค์œ„์นญ ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค
  2. ์ฝœ๋ฐฑ(Callback): ์ค‘์ฒฉ์ด ๊นŠ์–ด์ง€๋ฉด ์ฝœ๋ฐฑ ์ง€์˜ฅ(Callback Hell)์ด ๋ฐœ์ƒํ•œ๋‹ค
  3. RxJava/Future: ์ฒด์ด๋‹์ด ๋ณต์žกํ•˜๊ณ  ๋Ÿฌ๋‹ ์ปค๋ธŒ๊ฐ€ ๋†’๋‹ค

์ œ๊ณตํ•˜๋Š” ๊ฐ€์น˜

  • ๊ฐ€๋…์„ฑ: ๋™๊ธฐ ์ฝ”๋“œ์ฒ˜๋Ÿผ ์ž‘์„ฑํ•˜๋ฉด์„œ ๋น„๋™๊ธฐ๋กœ ์‹คํ–‰๋œ๋‹ค
  • ํšจ์œจ์„ฑ: ์“ฐ๋ ˆ๋“œ๋ณด๋‹ค ํ›จ์”ฌ ์ ์€ ๋ฆฌ์†Œ์Šค๋กœ ์ˆ˜๋งŽ์€ ๋™์‹œ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•œ๋‹ค
  • ์•ˆ์ „์„ฑ: ๊ตฌ์กฐํ™”๋œ ๋™์‹œ์„ฑ์œผ๋กœ ๋ฉ”๋ชจ๋ฆฌ ๋ˆ„์ˆ˜์™€ ์ž‘์—… ๋ˆ„๋ฝ์„ ๋ฐฉ์ง€ํ•œ๋‹ค

๋™์ž‘ ์›๋ฆฌ

ํ•ต์‹ฌ ๋ฉ”์ปค๋‹ˆ์ฆ˜

Coroutine์€ CPS(Continuation-Passing Style) ๋ณ€ํ™˜์„ ํ†ตํ•ด suspend ํ•จ์ˆ˜๋ฅผ ์ƒํƒœ ๋จธ์‹ ์œผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค. ์ปดํŒŒ์ผ ์‹œ์ ์— ํ•จ์ˆ˜๊ฐ€ ์—ฌ๋Ÿฌ ๋‹จ๊ณ„๋กœ ๋ถ„ํ• ๋˜๋ฉฐ, ๊ฐ ์ค‘๋‹จ ์ง€์ ๋งˆ๋‹ค ์ƒํƒœ๋ฅผ ์ €์žฅํ•˜๊ณ  ๋ณต์›ํ•œ๋‹ค.

์ฒ˜๋ฆฌ ๊ณผ์ •

  1. suspend ํ•จ์ˆ˜ ํ˜ธ์ถœ: ํ˜„์žฌ ์ƒํƒœ๋ฅผ Continuation ๊ฐ์ฒด์— ์ €์žฅ
  2. ์ค‘๋‹จ: ์ œ์–ด๊ถŒ์„ ๋ฐ˜ํ™˜ํ•˜๊ณ  ์“ฐ๋ ˆ๋“œ๋Š” ๋‹ค๋ฅธ ์ž‘์—… ์ˆ˜ํ–‰
  3. ์žฌ๊ฐœ: ์ €์žฅ๋œ Continuation์—์„œ ์ด์–ด์„œ ์‹คํ–‰

์ฃผ์š” ์ปดํฌ๋„ŒํŠธ

  • Continuation: ์ค‘๋‹จ๋œ ์ง€์ ์˜ ์ƒํƒœ์™€ ์žฌ๊ฐœ ์ •๋ณด๋ฅผ ๋‹ด๋Š” ๊ฐ์ฒด
  • Dispatcher: ์ฝ”๋ฃจํ‹ด์ด ์‹คํ–‰๋  ์“ฐ๋ ˆ๋“œ๋ฅผ ๊ฒฐ์ • (Main, IO, Default ๋“ฑ)
  • CoroutineScope: ์ฝ”๋ฃจํ‹ด์˜ ์ƒ๋ช…์ฃผ๊ธฐ๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ๋ฒ”์œ„
// ์ปดํŒŒ์ผ ์ „
suspend fun fetchData(): String {
    delay(1000)
    return "Data"
}

// ์ปดํŒŒ์ผ ํ›„ (๊ฐœ๋…์ )
fun fetchData(continuation: Continuation<String>): Any {
    when (continuation.label) {
        0 -> {
            continuation.label = 1
            return delay(1000, continuation) // COROUTINE_SUSPENDED ๋ฐ˜ํ™˜
        }
        1 -> {
            return "Data"
        }
    }
}

์ฃผ์˜์‚ฌํ•ญ

1. GlobalScope ์‚ฌ์šฉ ๊ธˆ์ง€

์ƒ๋ช…์ฃผ๊ธฐ ๊ด€๋ฆฌ๊ฐ€ ๋ถˆ๊ฐ€๋Šฅํ•˜์—ฌ ๋ฉ”๋ชจ๋ฆฌ ๋ˆ„์ˆ˜๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.

// โŒ ์ž˜๋ชป๋œ ์˜ˆ
GlobalScope.launch {
    // ์„œ๋ฒ„๊ฐ€ ์ข…๋ฃŒ๋˜์–ด๋„ ๊ณ„์† ์‹คํ–‰๋  ์ˆ˜ ์žˆ์Œ
    processBackgroundJob()
}

// โœ… ์˜ฌ๋ฐ”๋ฅธ ์˜ˆ
class OrderService {
    private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
    
    fun processOrder(orderId: String) {
        scope.launch {
            // scope๊ฐ€ ์ทจ์†Œ๋˜๋ฉด ์ž๋™์œผ๋กœ ์ข…๋ฃŒ
            processOrderInternal(orderId)
        }
    }
    
    fun shutdown() {
        scope.cancel() // ์„œ๋น„์Šค ์ข…๋ฃŒ ์‹œ ๋ชจ๋“  ์ฝ”๋ฃจํ‹ด ์ทจ์†Œ
    }
}

2. ๋ธ”๋กœํ‚น ์ž‘์—…์„ ์ž˜๋ชป๋œ Dispatcher์—์„œ ์‹คํ–‰

์“ฐ๋ ˆ๋“œ ํ’€์ด ๊ณ ๊ฐˆ๋˜์–ด ๋‹ค๋ฅธ ์š”์ฒญ์ด ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š๋Š”๋‹ค.

// โŒ ์ž˜๋ชป๋œ ์˜ˆ
@GetMapping("/users/{id}")
suspend fun getUser(@PathVariable id: Long): User {
    // Dispatchers.Default์—์„œ ๋ธ”๋กœํ‚น I/O ์‹คํ–‰
    val data = readFromDatabase(id) // ์“ฐ๋ ˆ๋“œ ํ’€ ๊ณ ๊ฐˆ
    return data
}

// โœ… ์˜ฌ๋ฐ”๋ฅธ ์˜ˆ
@GetMapping("/users/{id}")
suspend fun getUser(@PathVariable id: Long): User = withContext(Dispatchers.IO) {
    // I/O ์ž‘์—…์€ Dispatchers.IO์—์„œ ์‹คํ–‰
    readFromDatabase(id)
}

3. suspend ํ•จ์ˆ˜๋ฅผ ์ผ๋ฐ˜ ํ•จ์ˆ˜์—์„œ ํ˜ธ์ถœ

suspend ํ•จ์ˆ˜๋Š” ์ฝ”๋ฃจํ‹ด ์Šค์ฝ”ํ”„๋‚˜ ๋‹ค๋ฅธ suspend ํ•จ์ˆ˜ ๋‚ด์—์„œ๋งŒ ํ˜ธ์ถœ ๊ฐ€๋Šฅํ•˜๋‹ค.

// โŒ ์ž˜๋ชป๋œ ์˜ˆ
fun loadData() {
    delay(1000) // ์ปดํŒŒ์ผ ์—๋Ÿฌ
}

// โœ… ์˜ฌ๋ฐ”๋ฅธ ์˜ˆ
suspend fun loadData() {
    delay(1000)
}

// ๋˜๋Š”
fun loadData() {
    CoroutineScope(Dispatchers.Main).launch {
        delay(1000)
    }
}

Best Practices

  • ์ ์ ˆํ•œ Scope ์‚ฌ์šฉ (viewModelScope, lifecycleScope ๋“ฑ)
  • I/O ์ž‘์—…์€ Dispatchers.IO์—์„œ ์‹คํ–‰
  • CPU ์ง‘์•ฝ์  ์ž‘์—…์€ Dispatchers.Default์—์„œ ์‹คํ–‰
  • ๊ตฌ์กฐํ™”๋œ ๋™์‹œ์„ฑ(Structured Concurrency) ์›์น™ ์ค€์ˆ˜

์‹ค์ „ ์ ์šฉ

๊ธฐ๋ณธ ์‚ฌ์šฉ๋ฒ•

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch {
        delay(1000L) // 1์ดˆ ๋™์•ˆ ์ผ์‹œ ์ค‘๋‹จ (์“ฐ๋ ˆ๋“œ ๋ธ”๋กœํ‚น ์•„๋‹˜)
        println("World!")
    }
    println("Hello,")
}

// ๊ฒฐ๊ณผ
// Hello,
// World!

์ฃผ์š” ์‚ฌ์šฉ ํŒจํ„ด

ํŒจํ„ด 1: ๋ณ‘๋ ฌ ์‹คํ–‰

์—ฌ๋Ÿฌ ๋น„๋™๊ธฐ ์ž‘์—…์„ ๋™์‹œ์— ์‹คํ–‰ํ•˜๊ณ  ๋ชจ๋“  ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋‹ค๋ฆด ๋•Œ

suspend fun loadDashboard() = coroutineScope {
    val user = async { fetchUser() }
    val posts = async { fetchPosts() }
    val notifications = async { fetchNotifications() }
    
    Dashboard(
        user = user.await(),
        posts = posts.await(),
        notifications = notifications.await()
    )
}

ํŒจํ„ด 2: ์ทจ์†Œ ๊ฐ€๋Šฅํ•œ ์ž‘์—…๊ณผ ๋ฆฌ์†Œ์Šค ์ •๋ฆฌ

์ฝ”๋ฃจํ‹ด์€ suspend ์ง€์ ์—์„œ ์ทจ์†Œ๋ฅผ ํ™•์ธํ•˜๊ณ  ๋ฆฌ์†Œ์Šค๋ฅผ ์ •๋ฆฌํ•œ๋‹ค

// โŒ ์ผ๋ฐ˜ ํ•จ์ˆ˜ - ์ทจ์†Œ ๋ถˆ๊ฐ€๋Šฅ
fun processLargeFile(filePath: String): Int {
    var totalBytes = 0
    File(filePath).inputStream().use { input ->
        val buffer = ByteArray(8192)
        while (true) {
            val bytesRead = input.read(buffer) // ๋ธ”๋กœํ‚น - ์ฒญํฌ ์ฝ๋Š” ๋™์•ˆ ๋ฉˆ์ถœ ์ˆ˜ ์—†์Œ
            if (bytesRead == -1) break
            totalBytes += bytesRead
            // ์ทจ์†Œ ๋ถˆ๊ฐ€๋Šฅ - ์“ฐ๋ ˆ๋“œ ๊ฐ•์ œ ์ข…๋ฃŒ๋งŒ ๊ฐ€๋Šฅ
        }
    }
    return totalBytes
}

// โŒ ์ฝ”๋ฃจํ‹ด์ธ๋ฐ ์ทจ์†Œ ์ง€์ ์ด ์—†์Œ
suspend fun processLargeFile(filePath: String): Int = withContext(Dispatchers.IO) {
    var totalBytes = 0
    File(filePath).inputStream().use { input ->
        val buffer = ByteArray(8192)
        while (true) {
            val bytesRead = input.read(buffer) // ๋ธ”๋กœํ‚น - ์—ฌ์ „ํžˆ ์ทจ์†Œ ๋ถˆ๊ฐ€
            if (bytesRead == -1) break
            totalBytes += bytesRead
        }
    }
    totalBytes
}

// โœ… ์ฝ”๋ฃจํ‹ด - ์ทจ์†Œ ๊ฐ€๋Šฅ
suspend fun processLargeFile(filePath: String): Int = withContext(Dispatchers.IO) {
    var totalBytes = 0
    File(filePath).inputStream().use { input ->
        val buffer = ByteArray(8192)
        while (true) {
            ensureActive() // ์ฒญํฌ๋งˆ๋‹ค ์ทจ์†Œ ํ™•์ธ - ์ทจ์†Œ๋˜๋ฉด CancellationException ๋ฐœ์ƒ
            val bytesRead = input.read(buffer)
            if (bytesRead == -1) break
            totalBytes += bytesRead
        }
    }
    totalBytes
}

// ์‚ฌ์šฉ ์˜ˆ์‹œ
class FileService {
    private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    
    fun startProcessing(filePath: String): Job {
        return scope.launch {
            try {
                val result = processLargeFile(filePath)
                println("Processed ${result.size} lines")
            } catch (e: CancellationException) {
                println("Processing cancelled - resources cleaned up")
                throw e // ์ทจ์†Œ ์˜ˆ์™ธ๋Š” ๋ฐ˜๋“œ์‹œ ์žฌ์ „ํŒŒ
            }
        }
    }
    
    // 5์ดˆ ํ›„ ์ž๋™ ์ทจ์†Œ ์˜ˆ์‹œ
    fun startWithTimeout(filePath: String) {
        scope.launch {
            val job = launch { processLargeFile(filePath) }
            delay(5000)
            job.cancel() // 5์ดˆ ํ›„ ์ทจ์†Œ - ensureActive()์—์„œ ๊ฐ์ง€
        }
    }
}

ํŒจํ„ด 3: ๊ตฌ์กฐํ™”๋œ ๋™์‹œ์„ฑ์œผ๋กœ ์˜ˆ์™ธ ์ „ํŒŒ

ํ•˜์œ„ ์ฝ”๋ฃจํ‹ด์˜ ์˜ˆ์™ธ๊ฐ€ ์ƒ์œ„๋กœ ์ „ํŒŒ๋˜๋„๋ก ์ฒ˜๋ฆฌ

// SupervisorJob: ํ•˜๋‚˜์˜ ์ž์‹์ด ์‹คํŒจํ•ด๋„ ๋‹ค๋ฅธ ์ž์‹์€ ๊ณ„์† ์‹คํ–‰
class NotificationService {
    private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
    
    suspend fun sendNotifications(userIds: List<String>) = coroutineScope {
        userIds.map { userId ->
            async {
                try {
                    sendNotification(userId)
                } catch (e: Exception) {
                    logger.error("Failed to send notification to $userId", e)
                    null // ์‹คํŒจํ•ด๋„ ๋‹ค๋ฅธ ์•Œ๋ฆผ์€ ๊ณ„์† ์ „์†ก
                }
            }
        }.awaitAll()
    }
}

ํŒจํ„ด 4: ๋ธ”๋กœํ‚น/๋…ผ๋ธ”๋กœํ‚น ์กฐํ•ฉ ์ดํ•ด

๋น„๋™๊ธฐ ์‹คํ–‰ ํŒจํ„ด์˜ ์กฐํ•ฉ

// ๋…ผ๋ธ”๋กœํ‚น + ๋™๊ธฐ
runBlocking {
    delay(100)        // (1) ๋…ผ๋ธ”๋กœํ‚น
    test()            // (2) ๋…ผ๋ธ”๋กœํ‚น
    println("Done")   // (3) 1, 2๊ฐ€ ๋ชจ๋‘ ๋๋‚œ ํ›„ ์‹คํ–‰
}
suspend fun test() { delay(200) }

// ๋…ผ๋ธ”๋กœํ‚น + ๋น„๋™๊ธฐ
runBlocking {
    val result1 = async { delay(100) }  // (1) ๋น„๋™๊ธฐ ์‹œ์ž‘
    val result2 = async { delay(200) }  // (2) ๋น„๋™๊ธฐ ์‹œ์ž‘
    println("Done")                     // (3) ๋จผ์ € ์‹คํ–‰
    result1.await()                     // (4) 1์ด ๋๋‚  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
    result2.await()                     // (5) 2๊ฐ€ ๋๋‚  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
}

// ๋ธ”๋กœํ‚น + ๋น„๋™๊ธฐ
runBlocking {
    val result = launch {
        withContext(Dispatchers.Default) { 
            Thread.sleep(100)  // ๋ธ”๋กœํ‚นํ•˜์ง€๋งŒ ๋ณ„๋„ ์“ฐ๋ ˆ๋“œ
        }
    }
    println("Done")    // (2) ๋จผ์ € ์‹คํ–‰ ๊ฐ€๋Šฅ
    result.join()      // (3) result๊ฐ€ ๋๋‚  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
}

Before/After ๋น„๊ต

// โŒ ์ฝœ๋ฐฑ ๋ฐฉ์‹
fun fetchData(callback: (Result<Data>) -> Unit) {
    api.getData(object : Callback {
        override fun onSuccess(data: Data) {
            callback(Result.success(data))
        }
        override fun onError(e: Exception) {
            callback(Result.failure(e))
        }
    })
}

// โœ… ์ฝ”๋ฃจํ‹ด ๋ฐฉ์‹
suspend fun fetchData(): Data {
    return api.getData() // ๊ฐ„๊ฒฐํ•˜๊ณ  ์ง๊ด€์ 
}

์ฐธ๊ณ  ์ž๋ฃŒ

๊ณต์‹ ๋ฌธ์„œ

์ถ”์ฒœ ์•„ํ‹ฐํด

๊ด€๋ จ TIL