В каких задачах стоит применять синхронный код
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
# Когда стоит применять синхронный код
Синхронный код имеет место быть! Несмотря на модный асинхронный Python, есть категории задач, где синхронный подход оптимален или даже необходим.
1. CPU-bound задачи (вычисления)
Почему синхронный?
# Асинхронность помогает при I/O (сеть, диск)
# Но при вычислениях async не даёт выигрыша
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
# Синхронная функция (вычисления)
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
# Неправильно использовать async для вычислений
async def async_fib(n):
return fibonacci(n) # Всё равно блокирует event loop!
# Правильно: для CPU-bound используем ProcessPoolExecutor
async def better_async_fib(n):
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as executor:
return await loop.run_in_executor(executor, fibonacci, n)
Пример: Обработка матриц
# Синхронный код — просто, быстро
import numpy as np
def matrix_multiply(A, B):
return np.dot(A, B)
A = np.random.rand(1000, 1000)
B = np.random.rand(1000, 1000)
C = matrix_multiply(A, B) # Мультиплицирование — это вычисление
Этот код правильно написан синхронно. Async здесь не поможет.
2. CLI инструменты и скрипты
Почему синхронный?
CLI инструменты обычно:
- Выполняют задачи последовательно
- Не требуют высокой конкурентности
- Должны быть простыми в поддержке
- Нет нужды в параллельной обработке
Пример: Миграция данных
#!/usr/bin/env python
# migrate_users.py
import click
import psycopg2
from typing import List
@click.command()
@click.option('--source', required=True, help='Source database')
@click.option('--target', required=True, help='Target database')
def migrate(source, target):
"""Мигрируем пользователей из source в target"""
# Подключаемся к исходной БД
source_conn = psycopg2.connect(source)
target_conn = psycopg2.connect(target)
try:
# Получаем пользователей
source_cursor = source_conn.cursor()
source_cursor.execute("SELECT id, name, email FROM users")
users = source_cursor.fetchall()
# Мигрируем
target_cursor = target_conn.cursor()
for user_id, name, email in users:
target_cursor.execute(
"INSERT INTO users (id, name, email) VALUES (%s, %s, %s)",
(user_id, name, email)
)
click.echo(f"✓ Migrated user {name}")
target_conn.commit()
click.echo(f"✓ Successfully migrated {len(users)} users")
finally:
source_conn.close()
target_conn.close()
if __name__ == '__main__':
migrate()
Этот скрипт: синхронный, понятный, не требует async сложности.
3. Рабочие процессы (Workers) с очередями
Когда синхронный?
Если рабочий обрабатывает одну задачу за раз и она IO-bound:
# Worker.py — обработчик задач из очереди
from celery import Celery
import stripe
import time
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def process_payment(order_id, amount):
"""
Синхронная обработка: одна задача в одном воркере.
Очередь обеспечивает параллелизм, не нужен async.
"""
# Получаем заказ
order = Order.get(order_id)
# Берём платёж (I/O operation)
try:
charge = stripe.Charge.create(
amount=int(amount * 100),
currency='usd',
source='tok_visa'
)
order.status = 'completed'
order.payment_id = charge.id
except stripe.error.CardError as e:
order.status = 'failed'
order.error = str(e)
order.save()
@app.task
def send_email_notification(user_id, subject, body):
"""
Синхронное отправление письма.
Задача: 1 воркер = 1 письмо.
"""
user = User.get(user_id)
# Отправляем письмо (может быть медленным)
send_email(
to=user.email,
subject=subject,
body=body
)
user.last_email_sent = datetime.now(UTC)
user.save()
Очередь (Celery, RQ) обеспечивает параллелизм. 100 воркеров = 100 одновременных задач. Async не нужен.
4. Простые REST API с ORM
Когда синхронный?
Если:
- ORM синхронная (Django ORM, SQLAlchemy)
- Нет сложной конкурентности
- Простая логика обработки
# Django REST API
from rest_framework import viewsets
from rest_framework.response import Response
from django.db import models
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
def list(self, request, *args, **kwargs):
"""Синхронно получаем пользователей"""
users = self.get_queryset().filter(
created_at__gte='2024-01-01'
)
serializer = self.get_serializer(users, many=True)
return Response(serializer.data)
def create(self, request, *args, **kwargs):
"""Синхронно создаём пользователя"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
return Response(serializer.data, status=201)
Django отлично работает синхронно. Gunicorn обеспечивает параллелизм (многопроцессность).
5. Обработка файлов локально
Когда синхронный?
Обработка локальных файлов обычно CPU-bound:
import csv
import json
from pathlib import Path
def process_csv_file(filepath: str) -> dict:
"""Синхронно обрабатываем CSV"""
results = {}
# Чтение файла
with open(filepath, 'r') as f:
reader = csv.DictReader(f)
# Обработка данных
for row in reader:
user_id = int(row['user_id'])
results[user_id] = {
'name': row['name'],
'email': row['email'],
'score': int(row['score'])
}
return results
def generate_report(data: dict) -> None:
"""Синхронно генерируем отчёт"""
# Вычисления
total_score = sum(d['score'] for d in data.values())
avg_score = total_score / len(data)
# Запись результатов
with open('report.json', 'w') as f:
json.dump({
'total_users': len(data),
'average_score': avg_score,
'total_score': total_score
}, f)
Этот код правильно написан синхронно. Async не даст выигрыша.
6. Batch обработка
Когда синхронный?
Обработка больших объёмов данных за раз:
from django.core.management.base import BaseCommand
from django.db import transaction
class Command(BaseCommand):
help = 'Синхронная batch обработка пользователей'
def handle(self, *args, **options):
# Получаем все неактивные аккаунты
inactive_users = User.objects.filter(
last_login__lt='2024-01-01'
).values_list('id', flat=True)
# Обрабатываем батчами
batch_size = 1000
for i in range(0, len(inactive_users), batch_size):
batch_ids = list(inactive_users[i:i+batch_size])
# Отправляем письмо
with transaction.atomic():
for user_id in batch_ids:
send_reactivation_email(user_id)
# Обновляем флаг
User.objects.filter(id__in=batch_ids).update(
notification_sent=True
)
self.stdout.write(
f"✓ Processed batch {i//batch_size + 1}"
)
7. Legacy системы и монолиты
Когда синхронный?
Если основная кодовая база синхронная:
# Django + Celery + остальное синхронное
# Добавлять async async = путаница
# Лучше оставить синхронным
class OrderService:
def create_order(self, user_id, items):
# Синхронная обработка
order = Order.create(
user_id=user_id,
items=items
)
# Отправляем задачу в очередь
send_order_confirmation_email.delay(order.id)
return order
Когда выбирать СИНХРОННЫЙ код?
| Сценарий | Синхронный? | Асинхронный? |
|---|---|---|
| CLI скрипты | ✓ ✓ ✓ | ✗ ✗ ✗ |
| Batch обработка | ✓ ✓ ✓ | ✗ ✗ ✗ |
| Celery Workers | ✓ ✓ ✓ | ✗ (очередь обеспечивает параллелизм) |
| CPU-bound вычисления | ✓ ✓ ✓ | ✗ ✗ ✗ |
| Обработка файлов | ✓ ✓ ✓ | ✗ ✗ ✗ |
| Django ORM | ✓ ✓ | Может быть, но сложнее |
| WebSocket сервер | ✗ | ✓ ✓ ✓ |
| GraphQL API | Зависит | Может быть ✓ |
| Много одновременных HTTP | ✗ | ✓ ✓ ✓ |
| gRPC высоконагруженный | ✗ | ✓ ✓ ✓ |
Основное правило
Асинхронный код нужен ТОЛЬКО КОГДА:
1. Много ОДНОВРЕМЕННЫХ I/O операций
(не затраты на I/O, а количество параллельных операций)
2. Нельзя добавить больше процессов
(ограничение по памяти, нельзя масштабировать горизонтально)
3. Действительно нужна высокая конкурентность
(10000+ одновременных соединений)
Практический совет
- Начинай с синхронного — проще, понятнее, меньше ошибок
- Когда появляются I/O bottlenecks — профилируй, смотри где именно
- Если это очередь (Celery) — синхронный воркер отлично масштабируется
- Если это веб-сервер — может быть, асинхронный фреймворк (FastAPI + uvicorn) приносит выигрыш
- Не смешивай — либо полностью синхронный, либо полностью асинхронный код
Много проектов страдает от преждевременного перехода на асинхронность. Синхронный Python — это тоже отличный выбор для многих задач.