← Назад к вопросам

Что такое iter в SQLAlchemy?

1.8 Middle🔥 61 комментариев
#Python Core#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

iter() в SQLAlchemy — Итерация по результатам запроса

iter() в SQLAlchemy — встроенная функция Python для создания итератора из объекта результата запроса. В контексте SQLAlchemy это позволяет ленивую загрузку данных из базы данных, обрабатывая результаты постепенно, а не загружая всё в памяти сразу.

Основы: ScalarResult и Result в SQLAlchemy 2.0

SQLAlchemy 2.0 отличается от 1.x тем, что результаты запроса — это специальные объекты:

from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session

engine = create_engine('postgresql://user:password@localhost/db')

# SQLAlchemy 2.0 + синтаксис
with Session(engine) as session:
    # select() возвращает объект Select
    query = select(User)
    
    # execute() возвращает объект Result
    result = session.execute(query)
    
    # Результаты ещё НЕ загружены из БД!
    # Они загружаются только при итерации:
    for user in result:  # iter() вызывается автоматически
        print(user.name)  # Данные загружаются по мере необходимости

Явное использование iter()

Хотя Python автоматически вызывает iter() в цикле for, можно использовать явно:

with Session(engine) as session:
    result = session.execute(select(User))
    
    # Способ 1: Неявно (автоматически)
    for user in result:
        print(user.name)
    
    # Способ 2: Явно через iter()
    iterator = iter(result)
    
    # Получаем одного юзера за раз
    first_user = next(iterator)
    print(first_user.name)
    
    second_user = next(iterator)
    print(second_user.name)

Память: iter() vs fetchall()

Критическое различие между подходами:

with Session(engine) as session:
    result = session.execute(select(User))
    
    # Плохо: загружает ВСЕ данные в памяти
    all_users = result.fetchall()  # ~100 MB если 1 млн юзеров
    for user in all_users:
        process_user(user)
    
    # Хорошо: загружает данные потоком (batch_size по умолчанию ~5000)
    result2 = session.execute(select(User))
    for user in result2:  # iter() используется
        process_user(user)

ScalarResult для скалярных значений

ScalarResult — специальный результат для скалярных значений (одно значение на строку):

with Session(engine) as session:
    # Обычный Result (возвращает Row объекты)
    result = session.execute(select(User.name))
    # Row(name='Alice'), Row(name='Bob')...
    
    # ScalarResult (возвращает просто строки)
    scalar_result = session.scalars(select(User))
    # User(name='Alice'), User(name='Bob')...
    
    for user in scalar_result:  # iter() вызывается здесь
        print(user.name)

Размер батча и производительность

SQLAlchemy загружает данные батчами для оптимизации:

with Session(engine) as session:
    # По умолчанию batch_size = 5000 строк на один fetch из БД
    query = select(User)
    result = session.execute(query)
    
    count = 0
    for user in result:  # iter() + автоматический batching
        count += 1
        if count == 1:
            print('Первый fetch из БД (до 5000 строк)')
        if count == 5001:
            print('Второй fetch из БД (ещё до 5000 строк)')

Практический пример: обработка больших датасетов

from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import Session

engine = create_engine('postgresql://user:password@localhost/db')

def process_large_dataset():
    with Session(engine) as session:
        # Задача: обработать 10 млн записей логов
        # Это упадёт (MemoryError):
        # all_logs = session.query(Log).all()
        
        # Это работает (константная память):
        result = session.execute(select(Log).order_by(Log.created_at))
        
        batch = []
        batch_size = 1000
        
        for log in result:  # iter() — ленивая загрузка
            batch.append(log)
            
            if len(batch) == batch_size:
                # Обрабатываем батч
                process_logs_batch(batch)
                batch.clear()
        
        # Остаток
        if batch:
            process_logs_batch(batch)

def process_logs_batch(logs):
    # Какая-то обработка...
    for log in logs:
        # Анализ, фильтрация, трансформация...
        pass

yield с iter() — для потоковой обработки

from typing import Iterator
from sqlalchemy.orm import Session

def stream_users(session: Session) -> Iterator[User]:
    """Генератор для потоковой обработки юзеров"""
    result = session.execute(select(User))
    
    for user in result:  # iter() вызывается внутри
        yield user  # Отдаём пользователя по одному

# Использование
with Session(engine) as session:
    for user in stream_users(session):
        print(user.name)  # Данные загружаются постепенно

Проблема: Query закрывается после session.close()

with Session(engine) as session:
    result = session.execute(select(User))
    iterator = iter(result)
    first = next(iterator)
    # session закроется после выхода из with блока

# Ошибка: session закрыта, нельзя загружать данные
# for user in iterator:
#     print(user)

# Правильно: загрузить данные до закрытия session
with Session(engine) as session:
    result = session.execute(select(User))
    users = list(result)  # Загружаем всё здесь

# Теперь можно работать с users вне session
for user in users:
    print(user.name)

Чтение из больших таблиц в батчах

from sqlalchemy import select, func

def process_table_in_batches(batch_size=10000):
    with Session(engine) as session:
        # Подсчитываем общее количество
        total = session.scalar(select(func.count()).select_from(User))
        
        for offset in range(0, total, batch_size):
            # Загружаем батч
            result = session.execute(
                select(User).offset(offset).limit(batch_size)
            )
            
            batch = list(result)  # iter() + материализация батча
            process_batch(batch)
            
            print(f'Processed {offset + len(batch)}/{total}')

def process_batch(users):
    for user in users:
        # Обработка...
        pass

stream() для потоковой обработки (в старых версиях)

# SQLAlchemy 1.4+ имеет stream() для явного потокового режима
# В SQLAlchemy 2.0 это встроено в поведение по умолчанию

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

engine = create_engine(
    'postgresql://user:password@localhost/db',
    execution_options={'stream_results': True}  # Потоковый режим
)

with Session(engine) as session:
    result = session.execute(select(User))
    
    for user in result:  # iter() в потоковом режиме
        # Данные загружаются максимально эффективно
        process_user(user)

Типизация итераторов в SQLAlchemy

from typing import Iterator
from sqlalchemy.engine import Result
from sqlalchemy.orm import Session

def get_users() -> Iterator[User]:
    with Session(engine) as session:
        result: Result = session.execute(select(User))
        
        for user in result:  # iter() типизирован
            yield user

# Использование
for user in get_users():
    print(user.name)

Резюме: iter() в SQLAlchemy — это ленивая загрузка результатов запроса, которая позволяет обрабатывать большие датасеты эффективно по памяти. Вместо загрузки всех данных сразу (fetchall()), SQLAlchemy загружает данные батчами по мере итерации. Это критично для масштабируемых приложений, обрабатывающих миллионы записей.