← Назад к вопросам

Какие знаешь механизмы конкурентности в Python?

2.3 Middle🔥 221 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Механизмы конкурентности в Python

Python предоставляет несколько способов реализации конкурентных программ для выполнения нескольких операций одновременно. Выбор механизма зависит от типа задач: I/O-bound или CPU-bound.

1. Threading (многопоточность)

Несколько потоков выполняются в одном процессе. Идеальна для I/O-bound задач (сетевые запросы, файловые операции).

import threading
import requests
from concurrent.futures import ThreadPoolExecutor
import time

# Простой пример с Thread
def fetch_url(url):
    response = requests.get(url)
    print(f"Fetched {url}: {len(response.content)} bytes")
    return response.content

# Способ 1: Создание потоков вручную
threads = []
for url in ['http://example.com', 'http://google.com', 'http://github.com']:
    thread = threading.Thread(target=fetch_url, args=(url,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

# Способ 2: ThreadPoolExecutor (рекомендуется)
with ThreadPoolExecutor(max_workers=5) as executor:
    urls = [
        'http://example.com',
        'http://google.com',
        'http://github.com'
    ]
    results = executor.map(fetch_url, urls)

# Способ 3: Синхронизация потоков
lock = threading.Lock()
counter = 0

def increment_counter():
    global counter
    with lock:  # Гарантирует, что только один поток может выполнять
        for _ in range(1000000):
            counter += 1

threads = [threading.Thread(target=increment_counter) for _ in range(5)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print(f"Counter: {counter}")  # Безопасно равен 5000000

Проблема: Global Interpreter Lock (GIL) в CPython ограничивает выполнение байткода Python одним потоком одновременно. Threading НЕ помогает для CPU-bound задач.

2. Multiprocessing (многопроцессность)

Несколько отдельных процессов. Обходит GIL и подходит для CPU-bound задач.

from multiprocessing import Process, Pool, Queue
import time

# CPU-bound задача
def calculate_fibonacci(n):
    if n < 2:
        return n
    return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)

# Способ 1: Использование Pool
with Pool(processes=4) as pool:
    numbers = [30, 31, 32, 33]
    results = pool.map(calculate_fibonacci, numbers)
    print(results)  # [832040, 1346269, 2178309, 3524578]

# Способ 2: Процессы с Queue для обмена данными
def worker(queue, task_id):
    result = calculate_fibonacci(30 + task_id)
    queue.put((task_id, result))

queue = Queue()
processes = []
for i in range(4):
    p = Process(target=worker, args=(queue, i))
    p.start()
    processes.append(p)

results = {}
for _ in range(4):
    task_id, result = queue.get()
    results[task_id] = result

for p in processes:
    p.join()

print(results)

# Способ 3: ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_task(n):
    return sum(i*i for i in range(n))

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(cpu_bound_task, 10**7) for _ in range(4)]
    results = [future.result() for future in futures]

Минусы: Overhead создания процессов, сложность обмена данными, больше памяти.

3. Asyncio (асинхронное программирование)

Легковесная конкурентность на основе coroutines (сопрограмм). Идеальна для I/O-bound задач.

import asyncio
import aiohttp
import time

# Асинхронная функция
async def fetch_url_async(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_multiple_urls():
    urls = [
        'http://example.com',
        'http://google.com',
        'http://github.com'
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# Запуск асинхронного кода
start = time.time()
results = asyncio.run(fetch_multiple_urls())
print(f"Completed in {time.time() - start:.2f}s")
print(f"Fetched {len(results)} URLs")

# Работа с таймаутами
async def fetch_with_timeout():
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get('http://example.com', timeout=aiohttp.ClientTimeout(total=5)) as response:
                return await response.text()
    except asyncio.TimeoutError:
        print("Request timed out")

# Управление задачами
async def main():
    # Создание задач
    task1 = asyncio.create_task(fetch_url_async(session, 'http://example.com'))
    task2 = asyncio.create_task(fetch_url_async(session, 'http://google.com'))
    
    # Ожидание первого завершения
    done, pending = await asyncio.wait(
        [task1, task2],
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # Отмена оставшихся задач
    for task in pending:
        task.cancel()

Преимущества: Низкий overhead, простой синтаксис, отлично для I/O.

Недостаток: Все должно быть асинхронным; невозможна настоящая параллелизм.

4. Gevent (микропотоки)

Легковесные потоки с автоматическим переключением контекста.

from gevent import spawn, joinall
import gevent
import requests

def fetch_url_gevent(url):
    response = requests.get(url)
    print(f"Fetched {url}: {len(response.content)} bytes")

# Создание greenlets
urls = ['http://example.com', 'http://google.com', 'http://github.com']
greenlets = [spawn(fetch_url_gevent, url) for url in urls]

# Ожидание завершения всех
joinall(greenlets)

# Монкипатчинг для использования стандартных библиотек асинхронно
from gevent import monkey
monkey.patch_all()  # Патчит socket, threading и т.д.

# Теперь стандартный код работает асинхронно
threads = []
for url in urls:
    thread = threading.Thread(target=fetch_url, args=(url,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()  # Работает асинхронно благодаря монкипатчингу

5. Ray (распределённые вычисления)

Для масштабирования на несколько машин.

import ray

# Инициализация Ray
ray.init()

# Определение удалённых функций
@ray.remote
def slow_function(x):
    import time
    time.sleep(1)
    return x ** 2

# Выполнение параллельно
futures = [slow_function.remote(x) for x in range(10)]
results = ray.get(futures)
print(results)

# Определение класса актора
@ray.remote
class Counter:
    def __init__(self):
        self.count = 0
    
    def increment(self):
        self.count += 1
        return self.count

counter = Counter.remote()
for _ in range(5):
    count = ray.get(counter.increment.remote())
    print(f"Count: {count}")

ray.shutdown()

6. Concurrent.futures (высокоуровневый API)

Унифицированный интерфейс для Threading и Multiprocessing.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

def io_bound_task(n):
    import requests
    return len(requests.get(f'http://httpbin.org/delay/{n}').content)

def cpu_bound_task(n):
    return sum(i*i for i in range(n))

# ThreadPoolExecutor для I/O
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(io_bound_task, i): i for i in range(1, 4)}
    
    for future in as_completed(futures):
        result = future.result()
        print(f"Task {futures[future]} result: {result}")

# ProcessPoolExecutor для CPU
with ProcessPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(cpu_bound_task, 10**6): i for i in range(4)}
    
    for future in as_completed(futures):
        result = future.result()
        print(f"CPU Task {futures[future]} result: {result}")

Сравнение механизмов

МеханизмТипI/O-boundCPU-boundСложностьGIL
ThreadingМногопоточностьОтличноПлохоНизкаяДа
MultiprocessingМногопроцессностьХорошоОтличноВысокаяНет
AsyncioАсинхронностьОтличноНетСредняяДа
GeventМикропотокиОтличноПлохоСредняяДа
RayРаспределённыеОтличноОтличноОчень высокаяНет

Практические примеры

# I/O-bound: API запросы
# Используй: Asyncio или Threading

async def get_users_async():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_user(session, i) for i in range(100)]
        return await asyncio.gather(*tasks)

# CPU-bound: Обработка данных
# Используй: Multiprocessing или Ray

from multiprocessing import Pool

def process_data(data):
    return sum(calculate_complex(item) for item in data)

with Pool() as p:
    results = p.map(process_data, chunks)

# Смешанное: API + обработка
# Используй: asyncio + ProcessPoolExecutor

async def hybrid():
    with ProcessPoolExecutor() as pool:
        loop = asyncio.get_event_loop()
        async with aiohttp.ClientSession() as session:
            # Fetch API асинхронно
            data = await fetch_api_data(session)
            # Обработай в процессах
            result = await loop.run_in_executor(pool, process_data, data)
            return result

Выводы

  • I/O-bound (сеть, файлы): Asyncio > Threading > Gevent
  • CPU-bound (вычисления): Multiprocessing > Ray
  • Простота: Threading > Asyncio > Multiprocessing
  • Масштабируемость: Ray > Multiprocessing > Asyncio

Лучший выбор зависит от конкретной задачи, требований к производительности и поддерживаемости кода.

Какие знаешь механизмы конкурентности в Python? | PrepBro