← Назад к вопросам

Как синхронизировать работу корутин

2.0 Middle🔥 141 комментариев
#Kotlin основы#Многопоточность и асинхронность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Синхронизация корутин

Синхронизация - это управление порядком выполнения корутин и доступом к общим ресурсам.

1. Ожидание завершения (join)

val job1 = launch {
    delay(1000)
    println("Task 1")
}

val job2 = launch {
    delay(500)
    println("Task 2")
}

job1.join()  // Ждать завершения job1
job2.join()  // Ждать завершения job2
println("All done")

// Output:
// Task 2 (через 500ms)
// Task 1 (через 1000ms)
// All done

2. Структурированный параллелизм

launch {
    val job1 = launch {
        delay(1000)
        println("Task 1")
    }
    val job2 = launch {
        delay(500)
        println("Task 2")
    }
    // Автоматически ждет завершения обеих job
    println("Both done")
}

// Output:
// Task 2 (500ms)
// Task 1 (1000ms)
// Both done (1000ms)

3. async/await - получение результата

val result1 = async {
    delay(1000)
    "Result 1"
}

val result2 = async {
    delay(500)
    "Result 2"
}

// Ждать обеих корутин и получить результаты
val combined = "${result1.await()} + ${result2.await()}"
println(combined)

// Output:
// Result 1 + Result 2 (после 1000ms)

4. Mutex - взаимное исключение

var counter = 0
val mutex = Mutex()

launch {
    repeat(1000) {
        mutex.withLock {
            counter++  // Критическая секция
        }
    }
}

launch {
    repeat(1000) {
        mutex.withLock {
            counter++  // Только один поток одновременно
        }
    }
}

// Результат: 2000 (гарантировано)
// Без Mutex: может быть < 2000

5. Semaphore - ограничение доступа

val semaphore = Semaphore(3)  // Максимум 3 одновременно

launch {
    repeat(10) { i ->
        launch {
            semaphore.acquire()
            try {
                println("Task $i started")
                delay(1000)
                println("Task $i done")
            } finally {
                semaphore.release()
            }
        }
    }
}

// Только 3 task'а в одно время

6. Channel - передача данных

val channel = Channel<Int>()

launch {
    for (i in 1..5) {
        channel.send(i)  // Отправить
        delay(100)
    }
    channel.close()  // Закрыть канал
}

launch {
    for (value in channel) {  // Получать
        println("Received: $value")
    }
}

// Output:
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5

7. Flow - реактивная передача

fun produceNumbers() = flow<Int> {
    for (i in 1..5) {
        emit(i)
        delay(100)
    }
}

launch {
    produceNumbers().collect { value ->
        println("Collected: $value")
    }
}

8. CountDownLatch эквивалент

// Ждать N событий
var completed = 0
val totalTasks = 3
val mutex = Mutex()
val condition = suspendCancellableCoroutine<Unit> { cont ->
    // Custom logic
}

launch {
    repeat(totalTasks) {
        launch {
            delay(Random.nextLong(1000))
            mutex.withLock {
                completed++
                if (completed == totalTasks) {
                    println("All tasks done")
                }
            }
        }
    }
}

9. Job hierarchy - управление группой

val parentJob = launch {
    val job1 = launch {
        delay(1000)
        println("Task 1")
    }
    val job2 = launch {
        delay(500)
        println("Task 2")
    }
    // Ждет обеих
}

parentJob.join()  // Ждет всех дочерних job'ов
println("All done")

10. withTimeoutOrNull - таймаут

val result = withTimeoutOrNull(5000L) {
    val job1 = async { heavyTask1() }
    val job2 = async { heavyTask2() }
    
    job1.await() to job2.await()
}

if (result != null) {
    println("Completed in time: $result")
} else {
    println("Timed out")
}

11. BatchingQueue - синхронизация батчей

class BatchProcessor<T>(val batchSize: Int) {
    private val queue = Channel<T>(Channel.UNLIMITED)
    
    suspend fun process(item: T) {
        queue.send(item)
    }
    
    fun startProcessing() = launch {
        val batch = mutableListOf<T>()
        for (item in queue) {
            batch.add(item)
            if (batch.size >= batchSize) {
                processBatch(batch)
                batch.clear()
            }
        }
    }
    
    private suspend fun processBatch(batch: List<T>) {
        println("Processing batch of ${batch.size}")
    }
}

12. Promise-like поведение

fun fetchUser(id: String): Deferred<User> = GlobalScope.async {
    delay(1000)
    User(id, "John")
}

launch {
    val user = fetchUser("123").await()
    println("User: $user")
    
    val posts = fetchPosts(user.id).await()
    println("Posts: $posts")
}

Лучшие практики

✅ Делай так:

  • Используй структурированный параллелизм (не GlobalScope)
  • Используй async/await для значений
  • Используй Channel для потоков данных
  • Используй Mutex для критических секций
  • Используй withTimeout для timeouts

❌ Избегай:

  • GlobalScope.launch (memory leak)
  • Busy waiting (while loops) -직접управления потоками в корутинах
  • Блокировки операций (runBlocking)

Вывод

Синхронизация корутин:

  1. join() - ждать завершения
  2. async/await - получить результат
  3. Mutex - критические секции
  4. Channel - передача данных
  5. Flow - реактивные потоки
  6. Структурированный параллелизм - автоматическое ожидание

Это позволяет писать безопасный параллельный код без callbacks.