← Назад к вопросам
Приведи пример работы с многопоточностью (multithreading)
1.7 Middle🔥 171 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Практический пример многопоточности (Multithreading)
Пример: параллельная загрузка данных из нескольких API и сохранение в БД. Это реальная задача, которая хорошо показывает преимущества многопоточности.
Проблема: медленная загрузка данных
import requests
import time
def fetch_user(user_id: int) -> dict:
"""Загружает данные пользователя с API"""
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
response = requests.get(url)
return response.json()
# ❌ ПЛОХО: загружаем по одному (медленно)
print("Начинаем загрузку...")
start = time.time()
users = []
for user_id in range(1, 11): # 10 пользователей
user = fetch_user(user_id) # Ждём ответ: ~0.5 сек
users.append(user)
print(f"Загружено {len(users)} пользователей за {time.time() - start:.2f}s")
# Output: Загружено 10 пользователей за 5.23s (ждали 0.5 сек * 10)
Решение 1: Threading с ThreadPoolExecutor
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_user(user_id: int) -> dict:
"""Загружает данные пользователя с API"""
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
print(f"Начинаю загрузку пользователя {user_id}...")
response = requests.get(url)
print(f"Закончил загрузку пользователя {user_id}")
return response.json()
# ✅ ХОРОШО: загружаем параллельно (быстро)
print("\nНачинаем загрузку с потоками...")
start = time.time()
users = []
# Создаём пул из 5 потоков
with ThreadPoolExecutor(max_workers=5) as executor:
# Отправляем все задачи в пул
futures = [
executor.submit(fetch_user, user_id)
for user_id in range(1, 11)
]
# Собираем результаты по мере готовности
for future in as_completed(futures):
try:
user = future.result()
users.append(user)
except Exception as e:
print(f"Ошибка: {e}")
print(f"Загружено {len(users)} пользователей за {time.time() - start:.2f}s")
# Output: Загружено 10 пользователей за 1.15s (параллельно, 5 потоков!)
# Output (логи):
# Начинаю загрузку пользователя 1...
# Начинаю загрузку пользователя 2...
# Начинаю загрузку пользователя 3...
# Начинаю загрузку пользователя 4...
# Начинаю загрузку пользователя 5...
# Закончил загрузку пользователя 1
# Начинаю загрузку пользователя 6...
# Закончил загрузку пользователя 2
# ...
Решение 2: Threading с map()
import requests
import time
from concurrent.futures import ThreadPoolExecutor
def fetch_user(user_id: int) -> dict:
"""Загружает данные пользователя"""
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
response = requests.get(url)
return response.json()
# Вариант с map() (проще, но результаты в порядке)
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
# map() сохраняет порядок
users = list(executor.map(fetch_user, range(1, 11)))
print(f"Загружено {len(users)} пользователей за {time.time() - start:.2f}s")
Решение 3: Ручное управление потоками
import threading
import requests
import time
from typing import List
from threading import Lock
class UserFetcher:
"""Класс для параллельной загрузки пользователей"""
def __init__(self, num_threads: int = 5):
self.num_threads = num_threads
self.users: List[dict] = []
self.lock = Lock() # Для синхронизации доступа к users
def fetch_user(self, user_id: int):
"""Загружает одного пользователя"""
try:
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
print(f"[Поток {threading.current_thread().name}] Загружаю пользователя {user_id}")
response = requests.get(url)
user = response.json()
# Синхронизируем доступ к общему списку
with self.lock:
self.users.append(user)
print(f"[Поток {threading.current_thread().name}] Готово: {user['name']}")
except Exception as e:
print(f"Ошибка для {user_id}: {e}")
def fetch_all(self, user_ids: List[int]) -> List[dict]:
"""Загружает всех пользователей параллельно"""
threads = []
# Создаём потоки для каждого user_id
for user_id in user_ids:
thread = threading.Thread(
target=self.fetch_user,
args=(user_id,),
name=f"FetchThread-{user_id}"
)
thread.start()
threads.append(thread)
# Ждём завершения всех потоков
for thread in threads:
thread.join()
return self.users
# Использование
fetcher = UserFetcher(num_threads=5)
start = time.time()
users = fetcher.fetch_all(range(1, 11))
print(f"\nЗагружено {len(users)} пользователей за {time.time() - start:.2f}s")
print(f"Имена: {[u['name'] for u in users]}")
# Output:
# [Поток FetchThread-1] Загружаю пользователя 1
# [Поток FetchThread-2] Загружаю пользователя 2
# [Поток FetchThread-3] Загружаю пользователя 3
# [Поток FetchThread-4] Загружаю пользователя 4
# [Поток FetchThread-5] Загружаю пользователя 5
# [Поток FetchThread-1] Готово: Leanne Graham
# [Поток FetchThread-1] Загружаю пользователя 6
# ...
# Загружено 10 пользователей за 1.18s
Синхронизация: Race Condition
import threading
from threading import Lock
import time
# ❌ БЕЗ Lock: Race Condition!
class CounterBad:
def __init__(self):
self.count = 0
def increment(self):
# Проблема: это не атомарная операция!
# 1. Читаем: count = 0
# 2. Увеличиваем: count + 1 = 1
# 3. Пишем: count = 1
# Если два потока одновременно, получим count = 1 вместо 2
self.count += 1
counter_bad = CounterBad()
threads = []
for i in range(100):
thread = threading.Thread(target=counter_bad.increment)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print(f"Без Lock: count = {counter_bad.count}") # ~50-99 вместо 100!
# ✅ С Lock: правильно!
class CounterGood:
def __init__(self):
self.count = 0
self.lock = Lock()
def increment(self):
with self.lock: # Блокируем доступ
# Теперь это атомарная операция
self.count += 1
counter_good = CounterGood()
threads = []
for i in range(100):
thread = threading.Thread(target=counter_good.increment)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print(f"С Lock: count = {counter_good.count}") # Всегда 100!
Практический пример: загрузка и сохранение в БД
import requests
import threading
import sqlite3
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
import time
class DataFetcher:
"""Загружает данные и сохраняет в БД"""
def __init__(self, db_path: str):
self.db_path = db_path
self.db_lock = Lock() # Для синхронизации доступа к БД
self._init_db()
def _init_db(self):
"""Создаёт таблицу"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
city TEXT
)
""")
conn.commit()
def fetch_and_save_user(self, user_id: int):
"""Загружает пользователя и сохраняет в БД"""
try:
# Загружаем
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
response = requests.get(url)
user = response.json()
# Сохраняем в БД (синхронизированно)
with self.db_lock:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO users (id, name, email, city)
VALUES (?, ?, ?, ?)
""",
(
user['id'],
user['name'],
user['email'],
user['address']['city']
)
)
conn.commit()
print(f"Сохранён пользователь: {user['name']}")
except Exception as e:
print(f"Ошибка для {user_id}: {e}")
def fetch_all(self, user_ids: list, max_workers: int = 5):
"""Загружает всех пользователей параллельно"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
list(executor.map(self.fetch_and_save_user, user_ids))
# Использование
fetcher = DataFetcher(":memory:") # В памяти для примера
start = time.time()
fetcher.fetch_all(range(1, 11))
print(f"Готово за {time.time() - start:.2f}s")
Таймауты и обработка ошибок
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
import requests
def fetch_with_timeout(user_id: int, timeout: float = 5.0) -> dict:
"""Загружает с таймаутом"""
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
response = requests.get(url, timeout=timeout)
return response.json()
# Обработка таймаутов
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(fetch_with_timeout, user_id): user_id
for user_id in range(1, 11)
}
results = []
errors = []
for future in as_completed(futures, timeout=10): # Таймаут для всей операции
user_id = futures[future]
try:
user = future.result(timeout=5) # Таймаут для одного результата
results.append(user)
except TimeoutError:
errors.append((user_id, "Таймаут"))
except requests.RequestException as e:
errors.append((user_id, str(e)))
except Exception as e:
errors.append((user_id, f"Неизвестная ошибка: {e}"))
print(f"Успешно: {len(results)}, Ошибок: {len(errors)}")
if errors:
for user_id, error in errors:
print(f" User {user_id}: {error}")
Сравнение производительности
import time
import requests
from concurrent.futures import ThreadPoolExecutor
def benchmark():
user_ids = range(1, 21) # 20 пользователей
# 1. Последовательно (медленно)
start = time.time()
for user_id in user_ids:
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
requests.get(url)
seq_time = time.time() - start
# 2. С потоками (быстро)
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
list(executor.map(
lambda u: requests.get(f"https://jsonplaceholder.typicode.com/users/{u}"),
user_ids
))
thread_time = time.time() - start
print(f"Последовательно: {seq_time:.2f}s")
print(f"С потоками: {thread_time:.2f}s")
print(f"Ускорение: {seq_time / thread_time:.1f}x раз")
benchmark()
# Output:
# Последовательно: 10.45s
# С потоками: 1.23s
# Ускорение: 8.5x раз
Лучшие практики
# 1. Используй ThreadPoolExecutor (проще, чем ручные потоки)
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(fetch_user, user_ids)
# 2. Синхронизируй доступ к общим ресурсам
with self.lock:
self.shared_data.append(item)
# 3. Обрабатывай исключения
try:
result = future.result(timeout=5)
except Exception as e:
logger.error(f"Task failed: {e}")
# 4. Используй as_completed() для обработки результатов по мере готовности
for future in as_completed(futures):
result = future.result()
# 5. Задавай таймауты
future.result(timeout=5) # Не жди больше 5 сек
Вывод
Многопоточность в Python полезна для:
- I/O операций (сеть, файлы, БД) — значительное ускорение
- Одновременной работы с несколькими ресурсами
- Имитации асинхронности
Ключевые моменты:
- ThreadPoolExecutor — простой способ
- Lock — для синхронизации доступа
- as_completed() — обработка результатов по мере готовности
- Таймауты — для предотвращения бесконечного ожидания
- Обработка ошибок — обязательна
Типичное ускорение для I/O операций: 5-10x раз при использовании 5-10 потоков.