← Назад к вопросам

Async: Producer-Consumer с использованием Channel

2.0 Middle🔥 151 комментариев
#Основы C# и .NET

Условие

Реализуйте паттерн Producer-Consumer используя System.Threading.Channels.

Сценарий:

  • Producer генерирует сообщения с задержкой (имитация внешнего источника)
  • Consumer обрабатывает сообщения асинхронно
  • Необходима буферизация сообщений (bounded channel)

Требования:

  1. Создать BoundedChannel с ёмкостью 10 сообщений
  2. При переполнении канала Producer должен ждать
  3. Graceful shutdown при отмене через CancellationToken
  4. Несколько Consumer-ов для параллельной обработки

Структура:

public class MessageProcessor { private readonly Channel<Message> _channel;

public async Task ProduceAsync(CancellationToken ct) { }
public async Task ConsumeAsync(int consumerId, CancellationToken ct) { }

}

Критерии оценки:

  • Правильное использование Channel API
  • Корректная обработка отмены
  • Понимание backpressure

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение

Анализ задачи

System.Threading.Channels — это современная альтернатива BlockingCollection для асинхронного взаимодействия между потоками:

Ключевые компоненты:

  1. BoundedChannel — ограниченная ёмкость (10 сообщений)
  2. Producer — генерирует и отправляет сообщения
  3. Consumer — читает и обрабатывает сообщения
  4. Backpressure — автоматическое давление при переполнении
  5. CancellationToken — graceful shutdown

Модели и интерфейсы

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Collections.Generic;

/// <summary>
/// Сообщение для обработки
/// </summary>
public class Message
{
    public int Id { get; set; }
    public string Content { get; set; }
    public DateTime CreatedAt { get; set; }

    public override string ToString() => $"[Message {Id}] {Content}";
}

Базовая реализация MessageProcessor

public class MessageProcessor
{
    private readonly Channel<Message> _channel;
    private int _messageCounter = 0;

    // ВАЖНО: BoundedChannelOptions позволяет контролировать поведение канала
    public MessageProcessor(int capacity = 10)
    {
        var options = new BoundedChannelOptions(capacity)
        {
            // FullMode определяет, что делать при переполнении:
            // Wait — Producer ждёт (по умолчанию)
            // DropNewest — удалить новое сообщение
            // DropOldest — удалить самое старое
            FullMode = BoundedChannelFullMode.Wait
        };
        _channel = Channel.CreateBounded<Message>(options);
    }

    /// <summary>
    /// Producer: генерирует сообщения с задержкой
    /// </summary>
    public async Task ProduceAsync(int messageCount = 20, CancellationToken ct = default)
    {
        try
        {
            Console.WriteLine($"[Producer] Начинаем генерировать {messageCount} сообщений...");

            for (int i = 1; i <= messageCount; i++)
            {
                // Проверяем отмену
                ct.ThrowIfCancellationRequested();

                var message = new Message
                {
                    Id = Interlocked.Increment(ref _messageCounter),
                    Content = $"Message content #{i}",
                    CreatedAt = DateTime.UtcNow
                };

                // Пытаемся отправить сообщение в канал
                // Если канал переполнен, WriteAsync будет ждать
                var writeResult = await _channel.Writer.WriteAsync(message, ct);

                if (writeResult)
                {
                    Console.WriteLine($"[Producer] Отправлено: {message}");
                }
                else
                {
                    Console.WriteLine($"[Producer] Ошибка отправки сообщения {message.Id}");
                }

                // Имитируем задержку генерации
                await Task.Delay(100, ct);
            }

            // Сигнализируем Consumer-ам, что производство завершено
            _channel.Writer.TryComplete();
            Console.WriteLine("[Producer] Завершено. Канал закрыт.");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("[Producer] Отмена через CancellationToken");
            _channel.Writer.TryComplete(new OperationCanceledException("Production cancelled"));
        }
        catch (Exception ex)
        {
            Console.WriteLine($"[Producer] Ошибка: {ex.Message}");
            _channel.Writer.TryComplete(ex);
        }
    }

    /// <summary>
    /// Consumer: обрабатывает сообщения
    /// </summary>
    public async Task ConsumeAsync(int consumerId, CancellationToken ct = default)
    {
        try
        {
            Console.WriteLine($"[Consumer {consumerId}] Готов к обработке сообщений");

            // Читаем сообщения из канала пока они есть
            await foreach (var message in _channel.Reader.ReadAllAsync(ct))
            {
                try
                {
                    Console.WriteLine($"[Consumer {consumerId}] Обработка: {message}");

                    // Имитируем обработку с задержкой
                    await Task.Delay(Random.Shared.Next(200, 500), ct);

                    Console.WriteLine($"[Consumer {consumerId}] ✓ Завершено: Message {message.Id}");
                }
                catch (OperationCanceledException)
                {
                    Console.WriteLine($"[Consumer {consumerId}] Отмена обработки сообщения {message.Id}");
                }
            }

            Console.WriteLine($"[Consumer {consumerId}] Завершена работа");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"[Consumer {consumerId}] Отмена через CancellationToken");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"[Consumer {consumerId}] Ошибка: {ex.Message}");
        }
    }
}

Пример использования

class Program
{
    static async Task Main(string[] args)
    {
        Console.WriteLine("=== Producer-Consumer с System.Threading.Channels ===");
        Console.WriteLine();

        var processor = new MessageProcessor(capacity: 10);
        var cts = new CancellationTokenSource();

        // Запускаем Producer
        var producerTask = processor.ProduceAsync(
            messageCount: 25,
            ct: cts.Token);

        // Запускаем несколько Consumer-ов параллельно
        var consumerTasks = new List<Task>
        {
            processor.ConsumeAsync(consumerId: 1, ct: cts.Token),
            processor.ConsumeAsync(consumerId: 2, ct: cts.Token),
            processor.ConsumeAsync(consumerId: 3, ct: cts.Token)
        };

        try
        {
            // Жидём завершения всех операций
            await Task.WhenAll(
                Task.WhenAll(consumerTasks),
                producerTask);

            Console.WriteLine("\n✓ Все операции завершены успешно");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"\n✗ Ошибка: {ex.Message}");
        }
        finally
        {
            cts.Dispose();
        }
    }
}

Вывод примера

=== Producer-Consumer с System.Threading.Channels ===

[Producer] Начинаем генерировать 25 сообщений...
[Consumer 1] Готов к обработке сообщений
[Consumer 2] Готов к обработке сообщений
[Consumer 3] Готов к обработке сообщений
[Producer] Отправлено: [Message 1] Message content #1
[Producer] Отправлено: [Message 2] Message content #2
[Consumer 1] Обработка: [Message 1] Message content #1
[Consumer 2] Обработка: [Message 2] Message content #2
[Producer] Отправлено: [Message 3] Message content #3
[Consumer 3] Обработка: [Message 3] Message content #3
...
[Producer] Завершено. Канал закрыт.
[Consumer 1] ✓ Завершено: Message 1
[Consumer 2] ✓ Завершено: Message 2
[Consumer 3] ✓ Завершено: Message 3
[Consumer 1] Завершена работа
[Consumer 2] Завершена работа
[Consumer 3] Завершена работа

✓ Все операции завершены успешно

Graceful Shutdown с таймаутом

static async Task MainWithTimeout(string[] args)
{
    var processor = new MessageProcessor(capacity: 10);
    var cts = new CancellationTokenSource();

    // Устанавливаем таймаут 30 секунд
    cts.CancelAfter(TimeSpan.FromSeconds(30));

    var producerTask = processor.ProduceAsync(100, cts.Token);
    var consumerTasks = Enumerable.Range(1, 3)
        .Select(id => processor.ConsumeAsync(id, cts.Token))
        .ToList();

    try
    {
        await Task.WhenAll(
            Task.WhenAll(consumerTasks),
            producerTask);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("\n⏱ Таймаут истекпущении. Graceful shutdown...");
        // Дождёмся завершения текущих операций
        try
        {
            await Task.WhenAll(consumerTasks);
        }
        catch { /* Игнорируем ошибки */ }
    }
    finally
    {
        cts.Dispose();
    }
}

Продвинутая реализация: Метрики и мониторинг

public class MessageProcessorWithMetrics
{
    private readonly Channel<Message> _channel;
    private int _sentCount = 0;
    private int _processedCount = 0;
    private int _failedCount = 0;

    public MessageProcessorWithMetrics(int capacity = 10)
    {
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _channel = Channel.CreateBounded<Message>(options);
    }

    public async Task ProduceAsync(int count, CancellationToken ct)
    {
        for (int i = 1; i <= count; i++)
        {
            ct.ThrowIfCancellationRequested();
            var message = new Message { Id = i, Content = $"Msg {i}", CreatedAt = DateTime.UtcNow };
            await _channel.Writer.WriteAsync(message, ct);
            Interlocked.Increment(ref _sentCount);
            await Task.Delay(50, ct);
        }
        _channel.Writer.TryComplete();
    }

    public async Task ConsumeAsync(int consumerId, CancellationToken ct)
    {
        await foreach (var msg in _channel.Reader.ReadAllAsync(ct))
        {
            try
            {
                await Task.Delay(Random.Shared.Next(100, 300), ct);
                Interlocked.Increment(ref _processedCount);
            }
            catch
            {
                Interlocked.Increment(ref _failedCount);
            }
        }
    }

    public void PrintMetrics()
    {
        Console.WriteLine($"\n=== МЕТРИКИ ===");
        Console.WriteLine($"Отправлено: {_sentCount}");
        Console.WriteLine($"Обработано: {_processedCount}");
        Console.WriteLine($"Ошибок: {_failedCount}");
        Console.WriteLine($"Коэффициент успеха: {(_processedCount * 100.0 / _sentCount):F2}%");
    }
}

Сравнение: BlockingCollection vs Channel

АспектBlockingCollectionChannel
АсинхронностьНет✅ Да (await)
ПроизводительностьХорошо✅ Отличная
СинтаксисСложнее✅ Простой (await foreach)
СовременностьСтарый API✅ Новый (C# 8+)
Graceful shutdownСложнее✅ Встроена (TryComplete)
CancellationTokenОграничен✅ Полная поддержка

Key Points: Backpressure

// BoundedChannelFullMode определяет поведение при переполнении:

// 1. Wait (по умолчанию) — Producer ждёт
var options1 = new BoundedChannelOptions(10) 
    { FullMode = BoundedChannelFullMode.Wait };

// 2. DropNewest — удалить новое сообщение
var options2 = new BoundedChannelOptions(10) 
    { FullMode = BoundedChannelFullMode.DropNewest };

// 3. DropOldest — удалить самое старое
var options3 = new BoundedChannelOptions(10) 
    { FullMode = BoundedChannelFullMode.DropOldest };

Wait — для критических данных (заказы, платежи) DropNewest/DropOldest — для высокочастотных логов


Выводы

System.Threading.Channels:

  1. Асинхронный Producer-Consumer
  2. Встроенное управление буфером (BoundedChannel)
  3. Graceful shutdown через TryComplete()
  4. Поддержка CancellationToken
  5. Высокая производительность
  6. Современный .NET API

Когда использовать:

  • Обработка очередей сообщений
  • Event streaming
  • Конвейеры обработки данных
  • Асинхронная работа между компонентами