← Назад к вопросам

Где оптимальнее применять потоки в Python?

1.7 Middle🔥 111 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Где оптимальнее применять потоки в Python

Потоки (threads) в Python имеют очень специфическую область применения из-за GIL (Global Interpreter Lock). Неправильное использование потоков может даже замедлить программу. Давайте разберёмся, когда они действительно полезны.

Проблема: GIL (Global Interpreter Lock)

GIL — это механизм в CPython, который позволяет одному потоку одновременно выполнять bytecode. Это означает, что потоки не работают параллельно для CPU-bound задач, а работают по очереди.

import threading
import time

def cpu_bound_task(n):
    """Вычислительная задача"""
    count = 0
    for i in range(n):
        count += i
    return count

# Однопоточное выполнение
start = time.time()
cpu_bound_task(100_000_000)
cpu_bound_task(100_000_000)
print(f"Single thread: {time.time() - start:.2f}s")

# Многопоточное выполнение (медленнее из-за GIL!)
start = time.time()
threads = [
    threading.Thread(target=cpu_bound_task, args=(100_000_000,)),
    threading.Thread(target=cpu_bound_task, args=(100_000_000,))
]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Multi-thread: {time.time() - start:.2f}s")  # Медленнее!

Оптимальные области применения потоков

1. I/O-Bound операции (ИДЕАЛЬНО)

Потоки превосходны для операций, где программа ждёт внешних ресурсов (сеть, диск, БД). Пока один поток ждёт ответ от сервера, другой может выполнять работу.

import threading
import requests
import time

def fetch_url(url):
    response = requests.get(url)
    print(f"Fetched {url}: {response.status_code}")
    return response

# Одинарный запрос
start = time.time()
for url in ["http://example.com", "http://google.com", "http://github.com"]:
    fetch_url(url)
print(f"Sequential: {time.time() - start:.2f}s")  # ~10 сек

# Многопоточный запрос
start = time.time()
threads = []
for url in ["http://example.com", "http://google.com", "http://github.com"]:
    t = threading.Thread(target=fetch_url, args=(url,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"Threaded: {time.time() - start:.2f}s")  # ~3 сек (3x быстрее!)

Оптимально для:

  • Сетевые запросы (API, HTTP)
  • Чтение/запись файлов
  • Запросы к БД
  • Любые операции с blocking I/O

2. GUI приложения

Потоки предотвращают замораживание интерфейса при выполнении долгих операций.

import tkinter as tk
import threading
import time

class GUIApp:
    def __init__(self, root):
        self.root = root
        self.button = tk.Button(
            root, 
            text="Fetch Data", 
            command=self.on_button_click
        )
        self.button.pack()
        self.label = tk.Label(root, text="Ready")
        self.label.pack()
    
    def on_button_click(self):
        # Запускаем длинную операцию в отдельном потоке
        thread = threading.Thread(target=self.long_operation)
        thread.start()
    
    def long_operation(self):
        self.label.config(text="Loading...")
        time.sleep(3)  # Долгая операция
        self.label.config(text="Done!")

root = tk.Tk()
app = GUIApp(root)
root.mainloop()

3. Фоновые задачи

Потоки отличны для периодических или фоновых операций, которые не требуют вычислений.

import threading
import time
import queue

def background_logger(log_queue):
    """Фоновый logger, пишет логи в файл"""
    with open("app.log", "a") as f:
        while True:
            try:
                message = log_queue.get(timeout=1)
                if message is None:  # Signal to stop
                    break
                f.write(f"{message}\n")
                f.flush()
            except queue.Empty:
                continue

# Запуск логгера в отдельном потоке
log_queue = queue.Queue()
logger_thread = threading.Thread(target=background_logger, args=(log_queue,), daemon=True)
logger_thread.start()

# Основная программа добавляет логи
for i in range(100):
    log_queue.put(f"Event {i}")
    time.sleep(0.1)

4. Многопользовательские серверы

Вебсерверы используют потоки для обработки параллельных клиентских запросов.

from flask import Flask
from threading import Thread
import time

app = Flask(__name__)

@app.route("/api/slow")
def slow_endpoint():
    # Обработчик работает в потоке, не блокируя другие запросы
    time.sleep(5)
    return {"status": "done"}

# Flask автоматически использует потоки для каждого запроса
if __name__ == "__main__":
    app.run(threaded=True)  # Включаем многопоточность

НЕОПТИМАЛЬНЫЕ области применения потоков

1. CPU-bound операции (ПЛОХО)

Для вычислений используйте multiprocessing или asyncio.

from multiprocessing import Pool
import time

def cpu_bound(n):
    count = 0
    for i in range(n):
        count += i
    return count

# Правильно: multiprocessing обходит GIL
if __name__ == "__main__":
    start = time.time()
    with Pool(2) as p:
        results = p.map(cpu_bound, [100_000_000, 100_000_000])
    print(f"Multiprocessing: {time.time() - start:.2f}s")  # Реально параллельно

2. Асинхронные операции (используй asyncio)

import asyncio
import aiohttp

async def fetch_url_async(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = ["http://example.com", "http://google.com"]
    # asyncio работает эффективнее потоков для I/O
    tasks = [fetch_url_async(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

asyncio.run(main())

Таблица выбора инструмента

Тип задачиthreadingmultiprocessingasyncioПримечание
I/O-bound✓ Хорошо✗ Плохо✓✓ ОтличноПотоки или asyncio
CPU-bound✗ Плохо✓✓ Отлично✗ ПлохоТолько multiprocessing
GUI✓ Хорошо✗ Сложно✗ СложноПотоки для UI-блокировки
Webserver✓ Хорошо✓ Можно✓✓ Отличноasyncio для высоконагруженности
Простота✓✓ Легко✗ Сложно✓ СреднеПотоки просто, но ограничены

Лучшие практики использования потоков

import threading
from queue import Queue
import time

class ThreadPool:
    """Простой пул потоков для I/O операций"""
    def __init__(self, num_threads=4):
        self.queue = Queue()
        self.threads = []
        
        for _ in range(num_threads):
            t = threading.Thread(target=self.worker, daemon=True)
            t.start()
            self.threads.append(t)
    
    def worker(self):
        while True:
            func, args = self.queue.get()
            try:
                func(*args)
            finally:
                self.queue.task_done()
    
    def submit(self, func, *args):
        self.queue.put((func, args))
    
    def wait(self):
        self.queue.join()

# Использование
pool = ThreadPool(4)
for url in urls:
    pool.submit(fetch_url, url)
pool.wait()

Вывод

Потоки оптимальны для I/O-bound операций: сетевые запросы, работа с файлами, базами данных, GUI.

Потоки плохи для CPU-bound операций: используйте multiprocessing или asyncio.

Общее правило: если программа ждёт внешних ресурсов — потоки. Если вычисляет — multiprocessing или asyncio.