Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Стримы (Stream) в Dart — асинхронные потоки данных
Stream — это асинхронная последовательность событий, которые приходят со временем. Это один из ключевых инструментов Dart для работы с асинхронностью, позволяющий обрабатывать данные по мере их поступления, а не ждать полного выполнения операции.
Что такое Stream
Eсли Future — это один результат в будущем, то Stream — это множество результатов во времени. Stream может:
- Генерировать множество значений
- Выбрасывать ошибки
- Завершаться или быть бесконечным
- Обрабатывать данные по мере их появления
// Future: один результат
Future<int> fetchData() async {
return 42;
}
// Stream: множество результатов
Stream<int> fetchDataStream() async* {
yield 1;
yield 2;
yield 3;
}
Основные типы Stream
1. Single-subscription Stream (обычный поток)
Можно подписаться только один раз. При повторной подписке ошибка.
Stream<int> createStream() async* {
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}
final stream = createStream();
// Первая подписка — OK
stream.listen((value) {
print('Value: $value');
});
// Вторая подписка — ошибка!
stream.listen((value) {});
2. Broadcast Stream (трансляционный поток)
Можно подписаться много раз. Все подписчики получают одни и те же данные.
Stream<int> createStream() async* {
for (int i = 1; i <= 5; i++) {
yield i;
}
}
final broadcastStream = createStream().asBroadcastStream();
// Множество подписок — OK
broadcastStream.listen((value) => print('Subscriber 1: $value'));
broadcastStream.listen((value) => print('Subscriber 2: $value'));
Практические примеры Stream
Пример 1: Простой счётчик
Stream<int> countStream() async* {
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}
final stream = countStream();
stream.listen(
(value) => print('Value: $value'), // onData
onError: (error) => print('Error: $error'), // onError
onDone: () => print('Stream closed'), // onDone
);
Пример 2: Stream с обработкой ошибок
Stream<int> apiStream() async* {
yield 1;
yield 2;
throw Exception('Network error');
yield 3; // никогда не выполнится
}
final stream = apiStream();
stream.listen(
(value) => print('Data: $value'),
onError: (error) {
print('Caught error: $error');
// Обрабатываем ошибку
},
onDone: () => print('Done'),
);
Пример 3: Трансформация данных Stream
Stream<int> numbers() async* {
for (int i = 1; i <= 5; i++) {
yield i;
}
}
// map: преобразование каждого значения
final doubled = numbers().map((n) => n * 2);
doubled.listen(print); // 2, 4, 6, 8, 10
// where: фильтрация
final evenOnly = numbers().where((n) => n % 2 == 0);
evenOnly.listen(print); // 2, 4
// expand: раскрытие каждого элемента
final expanded = numbers().expand((n) => [n, n]);
expanded.listen(print); // 1, 1, 2, 2, 3, 3, ...
// take: первые N элементов
final firstThree = numbers().take(3);
firstThree.listen(print); // 1, 2, 3
// skip: пропустить N элементов
final skipTwo = numbers().skip(2);
skipTwo.listen(print); // 3, 4, 5
Пример 4: Работа с Flutter UI (StreamBuilder)
class TemperatureMonitor extends StatelessWidget {
Stream<double> getTemperature() async* {
while (true) {
// Симуляция получения температуры с датчика
await Future.delayed(Duration(seconds: 1));
yield DateTime.now().millisecond / 1000.0 * 100; // 0-100
}
}
@override
Widget build(BuildContext context) {
return StreamBuilder<double>(
stream: getTemperature(),
builder: (context, snapshot) {
if (snapshot.connectionState == ConnectionState.waiting) {
return Center(child: CircularProgressIndicator());
}
if (snapshot.hasError) {
return Center(child: Text('Error: ${snapshot.error}'));
}
final temperature = snapshot.data ?? 0.0;
return Center(
child: Text(
'Temperature: ${temperature.toStringAsFixed(1)}°C',
style: TextStyle(fontSize: 24),
),
);
},
);
}
}
Пример 5: StreamController для управления потоком
class ChatService {
final _messageController = StreamController<String>();
// Получить Stream для прослушивания
Stream<String> get messages => _messageController.stream;
// Добавить сообщение в поток
void sendMessage(String message) {
_messageController.add(message);
}
// Выбросить ошибку
void error(String error) {
_messageController.addError(error);
}
// Закрыть поток
void dispose() {
_messageController.close();
}
}
class ChatWidget extends StatefulWidget {
@override
_ChatWidgetState createState() => _ChatWidgetState();
}
class _ChatWidgetState extends State<ChatWidget> {
final _service = ChatService();
@override
void initState() {
super.initState();
// Слушаем сообщения
_service.messages.listen((msg) {
print('New message: $msg');
});
}
@override
void dispose() {
_service.dispose();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Center(
child: ElevatedButton(
onPressed: () => _service.sendMessage('Hello'),
child: Text('Send Message'),
),
);
}
}
Stream vs Future
// Future: один результат
Future<String> fetchUser() async {
await Future.delayed(Duration(seconds: 1));
return 'John';
}
// Stream: множество результатов
Stream<String> fetchUserUpdates() async* {
yield 'John';
await Future.delayed(Duration(seconds: 1));
yield 'John Doe';
await Future.delayed(Duration(seconds: 1));
yield 'John Doe (Admin)';
}
Практическое применение Stream
- Мониторинг сенсоров — акселерометр, гироскоп
- WebSocket — real-time коммуникация
- Файловый мониторинг — отслеживание изменений
- UI события — клики, скролл, resize
- Асинхронные запросы — получение данных порциями
- Таймеры и периодические задачи — repeat events
Лучшие практики
- Всегда закрывай StreamController через
dispose() - Используй async* для создания Stream генераторов
- Обрабатывай ошибки через
onErrorcallback - Различай subscription и broadcast streams
- Для UI используй StreamBuilder из Flutter
- Комбинируй Stream операции (map, where, take и т.д.)