← Назад к вопросам

Приведи пример работы с многопоточностью (multithreading)

1.7 Middle🔥 171 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Практический пример многопоточности (Multithreading)

Пример: параллельная загрузка данных из нескольких API и сохранение в БД. Это реальная задача, которая хорошо показывает преимущества многопоточности.

Проблема: медленная загрузка данных

import requests
import time

def fetch_user(user_id: int) -> dict:
    """Загружает данные пользователя с API"""
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    response = requests.get(url)
    return response.json()

# ❌ ПЛОХО: загружаем по одному (медленно)
print("Начинаем загрузку...")
start = time.time()

users = []
for user_id in range(1, 11):  # 10 пользователей
    user = fetch_user(user_id)  # Ждём ответ: ~0.5 сек
    users.append(user)

print(f"Загружено {len(users)} пользователей за {time.time() - start:.2f}s")
# Output: Загружено 10 пользователей за 5.23s (ждали 0.5 сек * 10)

Решение 1: Threading с ThreadPoolExecutor

import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_user(user_id: int) -> dict:
    """Загружает данные пользователя с API"""
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    print(f"Начинаю загрузку пользователя {user_id}...")
    response = requests.get(url)
    print(f"Закончил загрузку пользователя {user_id}")
    return response.json()

# ✅ ХОРОШО: загружаем параллельно (быстро)
print("\nНачинаем загрузку с потоками...")
start = time.time()

users = []
# Создаём пул из 5 потоков
with ThreadPoolExecutor(max_workers=5) as executor:
    # Отправляем все задачи в пул
    futures = [
        executor.submit(fetch_user, user_id)
        for user_id in range(1, 11)
    ]
    
    # Собираем результаты по мере готовности
    for future in as_completed(futures):
        try:
            user = future.result()
            users.append(user)
        except Exception as e:
            print(f"Ошибка: {e}")

print(f"Загружено {len(users)} пользователей за {time.time() - start:.2f}s")
# Output: Загружено 10 пользователей за 1.15s (параллельно, 5 потоков!)
# Output (логи):
# Начинаю загрузку пользователя 1...
# Начинаю загрузку пользователя 2...
# Начинаю загрузку пользователя 3...
# Начинаю загрузку пользователя 4...
# Начинаю загрузку пользователя 5...
# Закончил загрузку пользователя 1
# Начинаю загрузку пользователя 6...
# Закончил загрузку пользователя 2
# ...

Решение 2: Threading с map()

import requests
import time
from concurrent.futures import ThreadPoolExecutor

def fetch_user(user_id: int) -> dict:
    """Загружает данные пользователя"""
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    response = requests.get(url)
    return response.json()

# Вариант с map() (проще, но результаты в порядке)
start = time.time()

with ThreadPoolExecutor(max_workers=5) as executor:
    # map() сохраняет порядок
    users = list(executor.map(fetch_user, range(1, 11)))

print(f"Загружено {len(users)} пользователей за {time.time() - start:.2f}s")

Решение 3: Ручное управление потоками

import threading
import requests
import time
from typing import List
from threading import Lock

class UserFetcher:
    """Класс для параллельной загрузки пользователей"""
    
    def __init__(self, num_threads: int = 5):
        self.num_threads = num_threads
        self.users: List[dict] = []
        self.lock = Lock()  # Для синхронизации доступа к users
    
    def fetch_user(self, user_id: int):
        """Загружает одного пользователя"""
        try:
            url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
            print(f"[Поток {threading.current_thread().name}] Загружаю пользователя {user_id}")
            response = requests.get(url)
            user = response.json()
            
            # Синхронизируем доступ к общему списку
            with self.lock:
                self.users.append(user)
            
            print(f"[Поток {threading.current_thread().name}] Готово: {user['name']}")
        
        except Exception as e:
            print(f"Ошибка для {user_id}: {e}")
    
    def fetch_all(self, user_ids: List[int]) -> List[dict]:
        """Загружает всех пользователей параллельно"""
        threads = []
        
        # Создаём потоки для каждого user_id
        for user_id in user_ids:
            thread = threading.Thread(
                target=self.fetch_user,
                args=(user_id,),
                name=f"FetchThread-{user_id}"
            )
            thread.start()
            threads.append(thread)
        
        # Ждём завершения всех потоков
        for thread in threads:
            thread.join()
        
        return self.users

# Использование
fetcher = UserFetcher(num_threads=5)
start = time.time()
users = fetcher.fetch_all(range(1, 11))
print(f"\nЗагружено {len(users)} пользователей за {time.time() - start:.2f}s")
print(f"Имена: {[u['name'] for u in users]}")

# Output:
# [Поток FetchThread-1] Загружаю пользователя 1
# [Поток FetchThread-2] Загружаю пользователя 2
# [Поток FetchThread-3] Загружаю пользователя 3
# [Поток FetchThread-4] Загружаю пользователя 4
# [Поток FetchThread-5] Загружаю пользователя 5
# [Поток FetchThread-1] Готово: Leanne Graham
# [Поток FetchThread-1] Загружаю пользователя 6
# ...
# Загружено 10 пользователей за 1.18s

Синхронизация: Race Condition

import threading
from threading import Lock
import time

# ❌ БЕЗ Lock: Race Condition!
class CounterBad:
    def __init__(self):
        self.count = 0
    
    def increment(self):
        # Проблема: это не атомарная операция!
        # 1. Читаем: count = 0
        # 2. Увеличиваем: count + 1 = 1
        # 3. Пишем: count = 1
        # Если два потока одновременно, получим count = 1 вместо 2
        self.count += 1

counter_bad = CounterBad()
threads = []

for i in range(100):
    thread = threading.Thread(target=counter_bad.increment)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

print(f"Без Lock: count = {counter_bad.count}")  # ~50-99 вместо 100!

# ✅ С Lock: правильно!
class CounterGood:
    def __init__(self):
        self.count = 0
        self.lock = Lock()
    
    def increment(self):
        with self.lock:  # Блокируем доступ
            # Теперь это атомарная операция
            self.count += 1

counter_good = CounterGood()
threads = []

for i in range(100):
    thread = threading.Thread(target=counter_good.increment)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

print(f"С Lock: count = {counter_good.count}")  # Всегда 100!

Практический пример: загрузка и сохранение в БД

import requests
import threading
import sqlite3
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
import time

class DataFetcher:
    """Загружает данные и сохраняет в БД"""
    
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.db_lock = Lock()  # Для синхронизации доступа к БД
        self._init_db()
    
    def _init_db(self):
        """Создаёт таблицу"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY,
                    name TEXT,
                    email TEXT,
                    city TEXT
                )
            """)
            conn.commit()
    
    def fetch_and_save_user(self, user_id: int):
        """Загружает пользователя и сохраняет в БД"""
        try:
            # Загружаем
            url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
            response = requests.get(url)
            user = response.json()
            
            # Сохраняем в БД (синхронизированно)
            with self.db_lock:
                with sqlite3.connect(self.db_path) as conn:
                    conn.execute(
                        """
                        INSERT INTO users (id, name, email, city)
                        VALUES (?, ?, ?, ?)
                        """,
                        (
                            user['id'],
                            user['name'],
                            user['email'],
                            user['address']['city']
                        )
                    )
                    conn.commit()
            
            print(f"Сохранён пользователь: {user['name']}")
        
        except Exception as e:
            print(f"Ошибка для {user_id}: {e}")
    
    def fetch_all(self, user_ids: list, max_workers: int = 5):
        """Загружает всех пользователей параллельно"""
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            list(executor.map(self.fetch_and_save_user, user_ids))

# Использование
fetcher = DataFetcher(":memory:")  # В памяти для примера
start = time.time()
fetcher.fetch_all(range(1, 11))
print(f"Готово за {time.time() - start:.2f}s")

Таймауты и обработка ошибок

from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
import requests

def fetch_with_timeout(user_id: int, timeout: float = 5.0) -> dict:
    """Загружает с таймаутом"""
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    response = requests.get(url, timeout=timeout)
    return response.json()

# Обработка таймаутов
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {
        executor.submit(fetch_with_timeout, user_id): user_id
        for user_id in range(1, 11)
    }
    
    results = []
    errors = []
    
    for future in as_completed(futures, timeout=10):  # Таймаут для всей операции
        user_id = futures[future]
        try:
            user = future.result(timeout=5)  # Таймаут для одного результата
            results.append(user)
        
        except TimeoutError:
            errors.append((user_id, "Таймаут"))
        
        except requests.RequestException as e:
            errors.append((user_id, str(e)))
        
        except Exception as e:
            errors.append((user_id, f"Неизвестная ошибка: {e}"))
    
    print(f"Успешно: {len(results)}, Ошибок: {len(errors)}")
    if errors:
        for user_id, error in errors:
            print(f"  User {user_id}: {error}")

Сравнение производительности

import time
import requests
from concurrent.futures import ThreadPoolExecutor

def benchmark():
    user_ids = range(1, 21)  # 20 пользователей
    
    # 1. Последовательно (медленно)
    start = time.time()
    for user_id in user_ids:
        url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
        requests.get(url)
    seq_time = time.time() - start
    
    # 2. С потоками (быстро)
    start = time.time()
    with ThreadPoolExecutor(max_workers=10) as executor:
        list(executor.map(
            lambda u: requests.get(f"https://jsonplaceholder.typicode.com/users/{u}"),
            user_ids
        ))
    thread_time = time.time() - start
    
    print(f"Последовательно:  {seq_time:.2f}s")
    print(f"С потоками:       {thread_time:.2f}s")
    print(f"Ускорение:        {seq_time / thread_time:.1f}x раз")

benchmark()
# Output:
# Последовательно:  10.45s
# С потоками:       1.23s
# Ускорение:        8.5x раз

Лучшие практики

# 1. Используй ThreadPoolExecutor (проще, чем ручные потоки)
with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(fetch_user, user_ids)

# 2. Синхронизируй доступ к общим ресурсам
with self.lock:
    self.shared_data.append(item)

# 3. Обрабатывай исключения
try:
    result = future.result(timeout=5)
except Exception as e:
    logger.error(f"Task failed: {e}")

# 4. Используй as_completed() для обработки результатов по мере готовности
for future in as_completed(futures):
    result = future.result()

# 5. Задавай таймауты
future.result(timeout=5)  # Не жди больше 5 сек

Вывод

Многопоточность в Python полезна для:

  • I/O операций (сеть, файлы, БД) — значительное ускорение
  • Одновременной работы с несколькими ресурсами
  • Имитации асинхронности

Ключевые моменты:

  1. ThreadPoolExecutor — простой способ
  2. Lock — для синхронизации доступа
  3. as_completed() — обработка результатов по мере готовности
  4. Таймауты — для предотвращения бесконечного ожидания
  5. Обработка ошибок — обязательна

Типичное ускорение для I/O операций: 5-10x раз при использовании 5-10 потоков.