Какие знаешь механизмы конкурентности в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
# Механизмы конкурентности в Python
Python предоставляет несколько способов реализации конкурентных программ для выполнения нескольких операций одновременно. Выбор механизма зависит от типа задач: I/O-bound или CPU-bound.
1. Threading (многопоточность)
Несколько потоков выполняются в одном процессе. Идеальна для I/O-bound задач (сетевые запросы, файловые операции).
import threading
import requests
from concurrent.futures import ThreadPoolExecutor
import time
# Простой пример с Thread
def fetch_url(url):
response = requests.get(url)
print(f"Fetched {url}: {len(response.content)} bytes")
return response.content
# Способ 1: Создание потоков вручную
threads = []
for url in ['http://example.com', 'http://google.com', 'http://github.com']:
thread = threading.Thread(target=fetch_url, args=(url,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# Способ 2: ThreadPoolExecutor (рекомендуется)
with ThreadPoolExecutor(max_workers=5) as executor:
urls = [
'http://example.com',
'http://google.com',
'http://github.com'
]
results = executor.map(fetch_url, urls)
# Способ 3: Синхронизация потоков
lock = threading.Lock()
counter = 0
def increment_counter():
global counter
with lock: # Гарантирует, что только один поток может выполнять
for _ in range(1000000):
counter += 1
threads = [threading.Thread(target=increment_counter) for _ in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"Counter: {counter}") # Безопасно равен 5000000
Проблема: Global Interpreter Lock (GIL) в CPython ограничивает выполнение байткода Python одним потоком одновременно. Threading НЕ помогает для CPU-bound задач.
2. Multiprocessing (многопроцессность)
Несколько отдельных процессов. Обходит GIL и подходит для CPU-bound задач.
from multiprocessing import Process, Pool, Queue
import time
# CPU-bound задача
def calculate_fibonacci(n):
if n < 2:
return n
return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)
# Способ 1: Использование Pool
with Pool(processes=4) as pool:
numbers = [30, 31, 32, 33]
results = pool.map(calculate_fibonacci, numbers)
print(results) # [832040, 1346269, 2178309, 3524578]
# Способ 2: Процессы с Queue для обмена данными
def worker(queue, task_id):
result = calculate_fibonacci(30 + task_id)
queue.put((task_id, result))
queue = Queue()
processes = []
for i in range(4):
p = Process(target=worker, args=(queue, i))
p.start()
processes.append(p)
results = {}
for _ in range(4):
task_id, result = queue.get()
results[task_id] = result
for p in processes:
p.join()
print(results)
# Способ 3: ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_task(n):
return sum(i*i for i in range(n))
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_bound_task, 10**7) for _ in range(4)]
results = [future.result() for future in futures]
Минусы: Overhead создания процессов, сложность обмена данными, больше памяти.
3. Asyncio (асинхронное программирование)
Легковесная конкурентность на основе coroutines (сопрограмм). Идеальна для I/O-bound задач.
import asyncio
import aiohttp
import time
# Асинхронная функция
async def fetch_url_async(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_urls():
urls = [
'http://example.com',
'http://google.com',
'http://github.com'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Запуск асинхронного кода
start = time.time()
results = asyncio.run(fetch_multiple_urls())
print(f"Completed in {time.time() - start:.2f}s")
print(f"Fetched {len(results)} URLs")
# Работа с таймаутами
async def fetch_with_timeout():
try:
async with aiohttp.ClientSession() as session:
async with session.get('http://example.com', timeout=aiohttp.ClientTimeout(total=5)) as response:
return await response.text()
except asyncio.TimeoutError:
print("Request timed out")
# Управление задачами
async def main():
# Создание задач
task1 = asyncio.create_task(fetch_url_async(session, 'http://example.com'))
task2 = asyncio.create_task(fetch_url_async(session, 'http://google.com'))
# Ожидание первого завершения
done, pending = await asyncio.wait(
[task1, task2],
return_when=asyncio.FIRST_COMPLETED
)
# Отмена оставшихся задач
for task in pending:
task.cancel()
Преимущества: Низкий overhead, простой синтаксис, отлично для I/O.
Недостаток: Все должно быть асинхронным; невозможна настоящая параллелизм.
4. Gevent (микропотоки)
Легковесные потоки с автоматическим переключением контекста.
from gevent import spawn, joinall
import gevent
import requests
def fetch_url_gevent(url):
response = requests.get(url)
print(f"Fetched {url}: {len(response.content)} bytes")
# Создание greenlets
urls = ['http://example.com', 'http://google.com', 'http://github.com']
greenlets = [spawn(fetch_url_gevent, url) for url in urls]
# Ожидание завершения всех
joinall(greenlets)
# Монкипатчинг для использования стандартных библиотек асинхронно
from gevent import monkey
monkey.patch_all() # Патчит socket, threading и т.д.
# Теперь стандартный код работает асинхронно
threads = []
for url in urls:
thread = threading.Thread(target=fetch_url, args=(url,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Работает асинхронно благодаря монкипатчингу
5. Ray (распределённые вычисления)
Для масштабирования на несколько машин.
import ray
# Инициализация Ray
ray.init()
# Определение удалённых функций
@ray.remote
def slow_function(x):
import time
time.sleep(1)
return x ** 2
# Выполнение параллельно
futures = [slow_function.remote(x) for x in range(10)]
results = ray.get(futures)
print(results)
# Определение класса актора
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
counter = Counter.remote()
for _ in range(5):
count = ray.get(counter.increment.remote())
print(f"Count: {count}")
ray.shutdown()
6. Concurrent.futures (высокоуровневый API)
Унифицированный интерфейс для Threading и Multiprocessing.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
def io_bound_task(n):
import requests
return len(requests.get(f'http://httpbin.org/delay/{n}').content)
def cpu_bound_task(n):
return sum(i*i for i in range(n))
# ThreadPoolExecutor для I/O
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(io_bound_task, i): i for i in range(1, 4)}
for future in as_completed(futures):
result = future.result()
print(f"Task {futures[future]} result: {result}")
# ProcessPoolExecutor для CPU
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(cpu_bound_task, 10**6): i for i in range(4)}
for future in as_completed(futures):
result = future.result()
print(f"CPU Task {futures[future]} result: {result}")
Сравнение механизмов
| Механизм | Тип | I/O-bound | CPU-bound | Сложность | GIL |
|---|---|---|---|---|---|
| Threading | Многопоточность | Отлично | Плохо | Низкая | Да |
| Multiprocessing | Многопроцессность | Хорошо | Отлично | Высокая | Нет |
| Asyncio | Асинхронность | Отлично | Нет | Средняя | Да |
| Gevent | Микропотоки | Отлично | Плохо | Средняя | Да |
| Ray | Распределённые | Отлично | Отлично | Очень высокая | Нет |
Практические примеры
# I/O-bound: API запросы
# Используй: Asyncio или Threading
async def get_users_async():
async with aiohttp.ClientSession() as session:
tasks = [fetch_user(session, i) for i in range(100)]
return await asyncio.gather(*tasks)
# CPU-bound: Обработка данных
# Используй: Multiprocessing или Ray
from multiprocessing import Pool
def process_data(data):
return sum(calculate_complex(item) for item in data)
with Pool() as p:
results = p.map(process_data, chunks)
# Смешанное: API + обработка
# Используй: asyncio + ProcessPoolExecutor
async def hybrid():
with ProcessPoolExecutor() as pool:
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession() as session:
# Fetch API асинхронно
data = await fetch_api_data(session)
# Обработай в процессах
result = await loop.run_in_executor(pool, process_data, data)
return result
Выводы
- I/O-bound (сеть, файлы): Asyncio > Threading > Gevent
- CPU-bound (вычисления): Multiprocessing > Ray
- Простота: Threading > Asyncio > Multiprocessing
- Масштабируемость: Ray > Multiprocessing > Asyncio
Лучший выбор зависит от конкретной задачи, требований к производительности и поддерживаемости кода.