← Назад к вопросам

Какие знаешь рекомендации при использовании многопоточности?

2.0 Middle🔥 121 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Рекомендации при использовании многопоточности

Многопоточность — сложная тема, которая требует глубокого понимания проблем, которые она решает и создаёт. Вот основные рекомендации из моего опыта работы с высоконагруженными системами.

1. Понимание GIL (Global Interpreter Lock)

В CPython многопоточность не даёт истинный параллелизм из-за GIL:

import threading
import time

def cpu_bound_work(n):
    result = 0
    for i in range(n):
        result += i
    return result

def io_bound_work():
    time.sleep(1)
    return "done"

start = time.time()
threads = [threading.Thread(target=cpu_bound_work, args=(100_000_000,)) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Многопоточность: {time.time() - start:.2f}s")

from multiprocessing import Pool
with Pool(4) as p:
    results = p.map(cpu_bound_work, [100_000_000] * 4)
print(f"Multiprocessing: {time.time() - start:.2f}s")

import asyncio
async def async_io_work():
    tasks = [asyncio.sleep(1) for _ in range(4)]
    await asyncio.gather(*tasks)
print(f"Async: {asyncio.run(async_io_work())}s")

Рекомендация: Используйте многопоточность только для I/O-bound операций. Для CPU-bound используйте multiprocessing или asyncio.

2. Race Conditions и общее состояние

Одна из главных проблем — конкурентный доступ к данным:

import threading
from threading import Lock

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self._lock = Lock()
    
    def deposit(self, amount):
        with self._lock:
            self.balance += amount
    
    def withdraw(self, amount):
        with self._lock:
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False

account = BankAccount(1000)

def worker():
    for _ in range(100_000):
        account.deposit(1)
        account.withdraw(1)

threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(account.balance)

3. Deadlock — опасность глубоко вложенных lock'ов

Самая опасная ошибка в многопоточности:

from threading import Lock
import time

lock_a = Lock()
lock_b = Lock()

def thread1_work():
    with lock_a:
        time.sleep(0.1)
        with lock_b:
            print("Thread 1")

def thread2_work():
    with lock_b:
        time.sleep(0.1)
        with lock_a:
            print("Thread 2")

def thread1_work_fixed():
    with lock_a:
        with lock_b:
            print("Thread 1")

def thread2_work_fixed():
    with lock_a:
        with lock_b:
            print("Thread 2")

Рекомендация: Избегайте вложенных lock'ов. Если необходимо — всегда захватывайте в одинаковом порядке.

4. Thread-Local Storage

Каждый поток имеет свои данные:

import threading

thread_local = threading.local()

def worker(user_id):
    thread_local.user_id = user_id
    thread_local.connection = create_db_connection()
    
    result = thread_local.connection.query("SELECT ...")
    print(f"User {thread_local.user_id}: {result}")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

5. ThreadPool vs ProcessPool

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def io_task(n):
    time.sleep(1)
    return n * 2

def cpu_task(n):
    return sum(i*i for i in range(100_000_000))

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(io_task, range(10)))

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_task, range(4)))

6. Queue для безопасной передачи данных

import threading
from queue import Queue
import time

work_queue = Queue(maxsize=100)
result_queue = Queue()

def producer():
    for i in range(100):
        work_queue.put(f"task_{i}")
        time.sleep(0.01)

def consumer(worker_id):
    while True:
        try:
            task = work_queue.get(timeout=1)
            if task is None:
                break
            result = process_task(task)
            result_queue.put((worker_id, result))
        except:
            break

producer_thread = threading.Thread(target=producer)
consumer_threads = [threading.Thread(target=consumer, args=(i,)) for i in range(4)]

producer_thread.start()
for t in consumer_threads:
    t.start()

producer_thread.join()
for _ in range(4):
    work_queue.put(None)
for t in consumer_threads:
    t.join()

7. Asyncio — альтернатива для I/O-bound

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [f"https://api.example.com/data/{i}" for i in range(100)]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    return results

results = asyncio.run(main())

8. Condition Variables для синхронизации

from threading import Condition
import time

data_ready = Condition()
data = None

def producer():
    global data
    for i in range(10):
        time.sleep(0.1)
        with data_ready:
            data = f"data_{i}"
            data_ready.notify_all()

def consumer(consumer_id):
    global data
    while True:
        with data_ready:
            data_ready.wait(timeout=1)
            print(f"Consumer {consumer_id}: {data}")

threads = [threading.Thread(target=producer)]
threads += [threading.Thread(target=consumer, args=(i,)) for i in range(2)]

for t in threads:
    t.start()
for t in threads:
    t.join()

9. Избегайте общего состояния

from threading import Lock

class Counter:
    def __init__(self):
        self._value = 0
        self._lock = Lock()
    
    def increment(self):
        with self._lock:
            self._value += 1
    
    def get(self):
        with self._lock:
            return self._value

10. Профилирование и отладка

import threading
import cProfile

threading.current_thread().name
threading.enumerate()
threading.active_count()

cProfile.run('your_function()')

Резюме рекомендаций

  1. Избегайте многопоточности для CPU-bound — используйте multiprocessing
  2. Используйте asyncio для I/O — проще и быстрее чем многопоточность
  3. Многопоточность только для блокирующего I/O — БД, сеть, файлы
  4. Минимизируйте общее состояние — используйте Queue и передачу сообщений
  5. Lock всегда для изменения данных — race conditions очень коварны
  6. Избегайте вложенных lock'ов — высокий риск deadlock
  7. Тестируйте при высокой конкурентности — проблемы проявляются редко
  8. Профилируйте перед оптимизацией — где реальное узкое место
  9. Документируйте потокобезопасность — явно указывайте какие методы потокобезопасны
  10. Рассмотрите альтернативы — multiprocessing, asyncio, coroutines

В 90% случаев asyncio или ProcessPoolExecutor дают лучший результат, чем ручная многопоточность.

Какие знаешь рекомендации при использовании многопоточности? | PrepBro