Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как реализовывал параллелизм в Python?
Параллелизм в Python — это выполнение нескольких задач одновременно. Однако Python имеет Global Interpreter Lock (GIL), который ограничивает одновременное выполнение байт-кода. Существует несколько подходов для реального параллелизма в зависимости от типа задач.
1. Threading (многопоточность) — для I/O-bound задач
Идеален для операций с блокировкой (сетевые запросы, файловые операции):
import threading
import time
import requests
def fetch_url(url, results, index):
try:
response = requests.get(url, timeout=5)
results[index] = response.status_code
except Exception as e:
results[index] = f"Error: {e}"
def threaded_requests(urls):
threads = []
results = [None] * len(urls)
for i, url in enumerate(urls):
thread = threading.Thread(target=fetch_url, args=(url, results, i))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Ждём завершения всех потоков
return results
urls = ['https://api.example.com/users', 'https://api.example.com/posts']
start = time.time()
results = threaded_requests(urls)
print(f"Threading: {time.time() - start:.2f}s")
2. ThreadPoolExecutor — более удобная обёртка
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def fetch_url(url):
response = requests.get(url)
return response.status_code
def fetch_urls_pool(urls, max_workers=5):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Запускаем задачи
futures = {executor.submit(fetch_url, url): url for url in urls}
results = {}
for future in as_completed(futures):
url = futures[future]
try:
results[url] = future.result(timeout=10)
except Exception as e:
results[url] = f"Error: {e}"
return results
urls = ['https://api.example.com/1', 'https://api.example.com/2']
results = fetch_urls_pool(urls, max_workers=10)
for url, status in results.items():
print(f"{url}: {status}")
3. Multiprocessing — для CPU-bound задач
Обходит GIL, используя отдельные процессы Python:
from multiprocessing import Pool, Process
import math
def cpu_intensive_task(n):
# Вычисляем факториал — CPU-bound операция
return math.factorial(n)
def multiprocessing_example(numbers):
# Используем пул процессов
with Pool(processes=4) as pool:
results = pool.map(cpu_intensive_task, numbers)
return results
numbers = [100, 200, 300, 400]
results = multiprocessing_example(numbers)
print(results)
# Альтернатива: явное управление процессами
if __name__ == '__main__':
processes = []
for n in numbers:
p = Process(target=cpu_intensive_task, args=(n,))
processes.append(p)
p.start()
for p in processes:
p.join()
4. asyncio — асинхронное программирование
Лучший выбор для I/O-bound операций в современном Python:
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return response.status
async def fetch_all_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Запуск
urls = ['https://api.example.com/1', 'https://api.example.com/2']
results = asyncio.run(fetch_all_urls(urls))
print(results)
Преимущество asyncio: одиночный поток, но может обрабатывать тысячи I/O операций одновременно.
5. Практический пример: параллельная обработка данных из БД
from concurrent.futures import ThreadPoolExecutor
import psycopg2
from psycopg2.pool import SimpleConnectionPool
class ParallelDatabaseProcessor:
def __init__(self, connection_string, num_workers=5):
self.pool = SimpleConnectionPool(1, num_workers, connection_string)
self.executor = ThreadPoolExecutor(max_workers=num_workers)
def process_batch(self, query, batch_params):
"""Обрабатываем пакет параметров параллельно"""
futures = []
for params in batch_params:
future = self.executor.submit(self._execute_query, query, params)
futures.append(future)
results = [f.result() for f in futures]
return results
def _execute_query(self, query, params):
conn = self.pool.getconn()
try:
cursor = conn.cursor()
cursor.execute(query, params)
result = cursor.fetchall()
cursor.close()
return result
finally:
self.pool.putconn(conn)
# Использование
processor = ParallelDatabaseProcessor(
"postgres://user:password@localhost/db",
num_workers=10
)
batch_params = [(user_id,) for user_id in range(1000, 1010)]
query = "SELECT * FROM users WHERE id = %s"
results = processor.process_batch(query, batch_params)
6. Ray — высокоуровневый фреймворк для параллельных вычислений
Лучше всего для data engineering и ML pipeline:
import ray
@ray.remote
def process_data(data_chunk):
# Обрабатываем данные в отдельном процессе
return sum(data_chunk)
ray.init() # Инициализируем Ray
# Создаём задачи
data_chunks = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]]
futures = [process_data.remote(chunk) for chunk in data_chunks]
# Получаем результаты
results = ray.get(futures)
print(results) # [10, 26, 42]
ray.shutdown()
7. Сравнение подходов
| Подход | Тип | Лучше для | GIL | Сложность |
|---|---|---|---|---|
| Threading | I/O | Сетевые запросы | ✓ Обходит | Низкая |
| Multiprocessing | CPU | Вычисления | ✗ Нет | Средняя |
| asyncio | I/O | 1000+ одновременных операций | ✓ Обходит | Средняя |
| Ray | GPU/CPU | Big Data, ML | ✗ Нет | Высокая |
| joblib | CPU | scikit-learn парралель | ✗ Нет | Низкая |
8. Практический выбор для Data Engineer
# Загрузка данных из API (I/O-bound)
from concurrent.futures import ThreadPoolExecutor
# → Используй ThreadPoolExecutor
# Обработка CSV файлов (I/O-bound)
import asyncio
# → Используй asyncio или ThreadPoolExecutor
# Обработка данных с ML моделью (CPU-bound)
from multiprocessing import Pool
# → Используй multiprocessing или Ray
# Big Data pipeline с Spark
from pyspark.sql import SparkSession
# → Используй Spark (встроенный параллелизм)
Выбор правильного подхода к параллелизму — ключ к масштабируемым и эффективным data pipeline'ам.