← Назад к вопросам

Как реализовывал параллелизм в Python?

1.3 Junior🔥 201 комментариев
#Python

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как реализовывал параллелизм в Python?

Параллелизм в Python — это выполнение нескольких задач одновременно. Однако Python имеет Global Interpreter Lock (GIL), который ограничивает одновременное выполнение байт-кода. Существует несколько подходов для реального параллелизма в зависимости от типа задач.

1. Threading (многопоточность) — для I/O-bound задач

Идеален для операций с блокировкой (сетевые запросы, файловые операции):

import threading
import time
import requests

def fetch_url(url, results, index):
    try:
        response = requests.get(url, timeout=5)
        results[index] = response.status_code
    except Exception as e:
        results[index] = f"Error: {e}"

def threaded_requests(urls):
    threads = []
    results = [None] * len(urls)
    
    for i, url in enumerate(urls):
        thread = threading.Thread(target=fetch_url, args=(url, results, i))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()  # Ждём завершения всех потоков
    
    return results

urls = ['https://api.example.com/users', 'https://api.example.com/posts']
start = time.time()
results = threaded_requests(urls)
print(f"Threading: {time.time() - start:.2f}s")

2. ThreadPoolExecutor — более удобная обёртка

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def fetch_url(url):
    response = requests.get(url)
    return response.status_code

def fetch_urls_pool(urls, max_workers=5):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Запускаем задачи
        futures = {executor.submit(fetch_url, url): url for url in urls}
        
        results = {}
        for future in as_completed(futures):
            url = futures[future]
            try:
                results[url] = future.result(timeout=10)
            except Exception as e:
                results[url] = f"Error: {e}"
    
    return results

urls = ['https://api.example.com/1', 'https://api.example.com/2']
results = fetch_urls_pool(urls, max_workers=10)
for url, status in results.items():
    print(f"{url}: {status}")

3. Multiprocessing — для CPU-bound задач

Обходит GIL, используя отдельные процессы Python:

from multiprocessing import Pool, Process
import math

def cpu_intensive_task(n):
    # Вычисляем факториал — CPU-bound операция
    return math.factorial(n)

def multiprocessing_example(numbers):
    # Используем пул процессов
    with Pool(processes=4) as pool:
        results = pool.map(cpu_intensive_task, numbers)
    return results

numbers = [100, 200, 300, 400]
results = multiprocessing_example(numbers)
print(results)

# Альтернатива: явное управление процессами
if __name__ == '__main__':
    processes = []
    for n in numbers:
        p = Process(target=cpu_intensive_task, args=(n,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

4. asyncio — асинхронное программирование

Лучший выбор для I/O-bound операций в современном Python:

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return response.status

async def fetch_all_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

# Запуск
urls = ['https://api.example.com/1', 'https://api.example.com/2']
results = asyncio.run(fetch_all_urls(urls))
print(results)

Преимущество asyncio: одиночный поток, но может обрабатывать тысячи I/O операций одновременно.

5. Практический пример: параллельная обработка данных из БД

from concurrent.futures import ThreadPoolExecutor
import psycopg2
from psycopg2.pool import SimpleConnectionPool

class ParallelDatabaseProcessor:
    def __init__(self, connection_string, num_workers=5):
        self.pool = SimpleConnectionPool(1, num_workers, connection_string)
        self.executor = ThreadPoolExecutor(max_workers=num_workers)
    
    def process_batch(self, query, batch_params):
        """Обрабатываем пакет параметров параллельно"""
        futures = []
        
        for params in batch_params:
            future = self.executor.submit(self._execute_query, query, params)
            futures.append(future)
        
        results = [f.result() for f in futures]
        return results
    
    def _execute_query(self, query, params):
        conn = self.pool.getconn()
        try:
            cursor = conn.cursor()
            cursor.execute(query, params)
            result = cursor.fetchall()
            cursor.close()
            return result
        finally:
            self.pool.putconn(conn)

# Использование
processor = ParallelDatabaseProcessor(
    "postgres://user:password@localhost/db",
    num_workers=10
)

batch_params = [(user_id,) for user_id in range(1000, 1010)]
query = "SELECT * FROM users WHERE id = %s"
results = processor.process_batch(query, batch_params)

6. Ray — высокоуровневый фреймворк для параллельных вычислений

Лучше всего для data engineering и ML pipeline:

import ray

@ray.remote
def process_data(data_chunk):
    # Обрабатываем данные в отдельном процессе
    return sum(data_chunk)

ray.init()  # Инициализируем Ray

# Создаём задачи
data_chunks = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]]
futures = [process_data.remote(chunk) for chunk in data_chunks]

# Получаем результаты
results = ray.get(futures)
print(results)  # [10, 26, 42]

ray.shutdown()

7. Сравнение подходов

ПодходТипЛучше дляGILСложность
ThreadingI/OСетевые запросы✓ ОбходитНизкая
MultiprocessingCPUВычисления✗ НетСредняя
asyncioI/O1000+ одновременных операций✓ ОбходитСредняя
RayGPU/CPUBig Data, ML✗ НетВысокая
joblibCPUscikit-learn парралель✗ НетНизкая

8. Практический выбор для Data Engineer

# Загрузка данных из API (I/O-bound)
from concurrent.futures import ThreadPoolExecutor
# → Используй ThreadPoolExecutor

# Обработка CSV файлов (I/O-bound)
import asyncio
# → Используй asyncio или ThreadPoolExecutor

# Обработка данных с ML моделью (CPU-bound)
from multiprocessing import Pool
# → Используй multiprocessing или Ray

# Big Data pipeline с Spark
from pyspark.sql import SparkSession
# → Используй Spark (встроенный параллелизм)

Выбор правильного подхода к параллелизму — ключ к масштабируемым и эффективным data pipeline'ам.