← Назад к вопросам
Как синхронизировать работу корутин
2.0 Middle🔥 141 комментариев
#Kotlin основы#Многопоточность и асинхронность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Синхронизация корутин
Синхронизация - это управление порядком выполнения корутин и доступом к общим ресурсам.
1. Ожидание завершения (join)
val job1 = launch {
delay(1000)
println("Task 1")
}
val job2 = launch {
delay(500)
println("Task 2")
}
job1.join() // Ждать завершения job1
job2.join() // Ждать завершения job2
println("All done")
// Output:
// Task 2 (через 500ms)
// Task 1 (через 1000ms)
// All done
2. Структурированный параллелизм
launch {
val job1 = launch {
delay(1000)
println("Task 1")
}
val job2 = launch {
delay(500)
println("Task 2")
}
// Автоматически ждет завершения обеих job
println("Both done")
}
// Output:
// Task 2 (500ms)
// Task 1 (1000ms)
// Both done (1000ms)
3. async/await - получение результата
val result1 = async {
delay(1000)
"Result 1"
}
val result2 = async {
delay(500)
"Result 2"
}
// Ждать обеих корутин и получить результаты
val combined = "${result1.await()} + ${result2.await()}"
println(combined)
// Output:
// Result 1 + Result 2 (после 1000ms)
4. Mutex - взаимное исключение
var counter = 0
val mutex = Mutex()
launch {
repeat(1000) {
mutex.withLock {
counter++ // Критическая секция
}
}
}
launch {
repeat(1000) {
mutex.withLock {
counter++ // Только один поток одновременно
}
}
}
// Результат: 2000 (гарантировано)
// Без Mutex: может быть < 2000
5. Semaphore - ограничение доступа
val semaphore = Semaphore(3) // Максимум 3 одновременно
launch {
repeat(10) { i ->
launch {
semaphore.acquire()
try {
println("Task $i started")
delay(1000)
println("Task $i done")
} finally {
semaphore.release()
}
}
}
}
// Только 3 task'а в одно время
6. Channel - передача данных
val channel = Channel<Int>()
launch {
for (i in 1..5) {
channel.send(i) // Отправить
delay(100)
}
channel.close() // Закрыть канал
}
launch {
for (value in channel) { // Получать
println("Received: $value")
}
}
// Output:
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5
7. Flow - реактивная передача
fun produceNumbers() = flow<Int> {
for (i in 1..5) {
emit(i)
delay(100)
}
}
launch {
produceNumbers().collect { value ->
println("Collected: $value")
}
}
8. CountDownLatch эквивалент
// Ждать N событий
var completed = 0
val totalTasks = 3
val mutex = Mutex()
val condition = suspendCancellableCoroutine<Unit> { cont ->
// Custom logic
}
launch {
repeat(totalTasks) {
launch {
delay(Random.nextLong(1000))
mutex.withLock {
completed++
if (completed == totalTasks) {
println("All tasks done")
}
}
}
}
}
9. Job hierarchy - управление группой
val parentJob = launch {
val job1 = launch {
delay(1000)
println("Task 1")
}
val job2 = launch {
delay(500)
println("Task 2")
}
// Ждет обеих
}
parentJob.join() // Ждет всех дочерних job'ов
println("All done")
10. withTimeoutOrNull - таймаут
val result = withTimeoutOrNull(5000L) {
val job1 = async { heavyTask1() }
val job2 = async { heavyTask2() }
job1.await() to job2.await()
}
if (result != null) {
println("Completed in time: $result")
} else {
println("Timed out")
}
11. BatchingQueue - синхронизация батчей
class BatchProcessor<T>(val batchSize: Int) {
private val queue = Channel<T>(Channel.UNLIMITED)
suspend fun process(item: T) {
queue.send(item)
}
fun startProcessing() = launch {
val batch = mutableListOf<T>()
for (item in queue) {
batch.add(item)
if (batch.size >= batchSize) {
processBatch(batch)
batch.clear()
}
}
}
private suspend fun processBatch(batch: List<T>) {
println("Processing batch of ${batch.size}")
}
}
12. Promise-like поведение
fun fetchUser(id: String): Deferred<User> = GlobalScope.async {
delay(1000)
User(id, "John")
}
launch {
val user = fetchUser("123").await()
println("User: $user")
val posts = fetchPosts(user.id).await()
println("Posts: $posts")
}
Лучшие практики
✅ Делай так:
- Используй структурированный параллелизм (не GlobalScope)
- Используй async/await для значений
- Используй Channel для потоков данных
- Используй Mutex для критических секций
- Используй withTimeout для timeouts
❌ Избегай:
- GlobalScope.launch (memory leak)
- Busy waiting (while loops) -직접управления потоками в корутинах
- Блокировки операций (runBlocking)
Вывод
Синхронизация корутин:
- join() - ждать завершения
- async/await - получить результат
- Mutex - критические секции
- Channel - передача данных
- Flow - реактивные потоки
- Структурированный параллелизм - автоматическое ожидание
Это позволяет писать безопасный параллельный код без callbacks.