← Назад к вопросам

Как влияет добавление или удаление элементов из Flow конкурентно во время его активного сбора

3.0 Senior🔥 61 комментариев
#Многопоточность и асинхронность#Производительность и оптимизация

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Конкурентное добавление/удаление элементов в Flow

Это сложный вопрос о thread-safety и внутреннем устройстве Flow. Важно понимать, как Flow обрабатывает конкурентные операции.

Основное правило: Flow НЕ thread-safe

Критическое понимание: Flow не предназначен для конкурентных операций изнутри.

// ОПАСНО: Race condition
val flow = MutableSharedFlow<Int>()

launchMain {
    flow.collect { value ->
        println("Collected: $value")
    }
}

launchIO {
    flow.emit(1)  // Thread A
    flow.emit(2)  // Thread B может быть одновременно
    flow.emit(3)
}

Типы Flow и их thread-safety

1. Regular Flow (cold)

  • НЕ thread-safe для emit()
  • Каждый subscriber получает свой поток данных
  • Нет внутреннего буфера
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>

val myFlow = flow {
    emit(1)  // Безопасно только внутри этого suspend блока
    emit(2)
}

// Использование
myFlow.collect { value ->
    println(value)  // Безопасно
}

2. MutableSharedFlow (hot)

  • Частично thread-safe благодаря внутренней синхронизации
  • Использует внутренний replay буфер
  • emit() безопасен при вызове из разных потоков
val sharedFlow = MutableSharedFlow<Int>(
    replay = 10,  // Буфер размером 10
    extraBufferCapacity = 5,  // Дополнительный буфер
    onBufferOverflow = BufferOverflow.DROP_OLDEST  // Что делать при переполнении
)

// Безопасно из разных потоков
launchIO {
    for (i in 1..100) {
        sharedFlow.emit(i)  // Thread-safe!
    }
}

launchMain {
    sharedFlow.collect { value ->
        println("Received: $value")
    }
}

StateFlow — частный случай

val stateFlow = MutableStateFlow(0)

// emit() также thread-safe
launchIO {
    for (i in 1..100) {
        stateFlow.value = i  // Thread-safe!
    }
}

launchMain {
    stateFlow.collect { value ->
        println("State: $value")
    }
}

Проблемы конкурентного доступа

1. Race Condition в обычном Flow

val flow = flow {
    var counter = 0
    emit(counter++)  // BAD: может быть race condition если emit вызван из разных потоков
}

2. Lost Updates (потеря обновлений)

val sharedFlow = MutableSharedFlow<Int>(replay = 0)

// Сценарий проблемы
launchIO {
    sharedFlow.emit(1)  // Может быть потеряно если нет активного collector
}

launchMain {
    // Может пропустить значение 1 если collector запустится после emit
    sharedFlow.collect { value ->
        println(value)
    }
}

Правильный подход: синхронизация

1. Используй Mutex для синхронизации

class SafeFlowRepository {
    private val emissionMutex = Mutex()
    val flow = MutableSharedFlow<List<String>>()
    
    suspend fun safeAddElement(element: String) {
        emissionMutex.withLock {
            val newList = getData() + element
            flow.emit(newList)
        }
    }
    
    private suspend fun getData(): List<String> {
        // Получить текущие данные
        return listOf()
    }
}

2. Используй Channel (bounded queue)

val channel = Channel<Int>(capacity = 10)  // Буфер размером 10

launchIO {
    for (i in 1..100) {
        channel.send(i)  // Потокобезопасен!
    }
    channel.close()
}

launchMain {
    for (value in channel) {
        println("Received: $value")
    }
}

// Или как Flow
val flowFromChannel = channel.receiveAsFlow()

Добавление/удаление элементов во время сбора

Сценарий: список, который меняется во время iteration

// ПРОБЛЕМА: ConcurrentModificationException
val list = mutableListOf(1, 2, 3)
list.asFlow().collect { item ->
    println(item)
    list.add(4)  // ОШИБКА: ConcurrentModificationException!
}

// РЕШЕНИЕ: используй copy
val list = mutableListOf(1, 2, 3)
list.asFlow().collect { item ->
    println(item)
}
list.add(4)  // OK: добавляем ПОСЛЕ collect

// ИЛИ: используй правильную структуру
val flow = flow {
    val snapshot = list.toList()  // Копия!
    snapshot.forEach { emit(it) }
}

StateFlow с изменяемыми данными

data class UiState(val items: List<String>)

val state = MutableStateFlow(UiState(emptyList()))

// НЕПРАВИЛЬНО: изменяем list внутри UiState
state.value.items.toMutableList().add("new")  // Не сработает

// ПРАВИЛЬНО: создаем новый UiState
state.value = state.value.copy(
    items = state.value.items + "new"
)

// Или используй коллектор
state.update { current ->
    current.copy(
        items = current.items + "new"
    )
}

Лучшие практики

✅ Делай так:

  • Используй MutableSharedFlow для thread-safe операций
  • Используй Mutex для синхронизации если нужна
  • Используй StateFlow.update() для атомарных обновлений
  • Создавай snapshots данных перед emission
  • Используй Channel для queue-like поведения

❌ Избегай:

  • Конкурентных emit() в обычном Flow
  • Изменения mutable данных во время collect
  • ConcurrentModificationException при изменении list
  • Race conditions при обновлении состояния
  • Assuming Flow thread-safe

Пример: безопасное добавление в Flow

class SafeListFlow {
    private val _items = MutableStateFlow(emptyList<String>())
    val items = _items.asStateFlow()
    
    private val mutex = Mutex()
    
    suspend fun addItem(item: String) {
        mutex.withLock {
            _items.value = _items.value + item  // Безопасно
        }
    }
    
    suspend fun removeItem(item: String) {
        mutex.withLock {
            _items.value = _items.value - item  // Безопасно
        }
    }
}

Вывод

Flow конкурентной безопасности достигается:

  • MutableSharedFlow имеет встроенную синхронизацию
  • StateFlow безопасен благодаря atomic updates
  • Обычный Flow требует внешней синхронизации (Mutex)
  • Всегда создавай snapshots перед emission
  • Используй immutable структуры (data class copies)
  • Помни: Flow холоден, данные текут в момент collection
Как влияет добавление или удаление элементов из Flow конкурентно во время его активного сбора | PrepBro