Какие знаешь параметры переключения между потоками?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Параметры переключения между потоками в Python
В Python работаю как с потоками (threading), так и с асинхронностью. Расскажу о механизмах переключения.
Global Interpreter Lock (GIL)
Важнейший момент — в CPython только один поток выполняет Python код одновременно:
import threading
import time
def worker(name):
for i in range(3):
print(f"{name}: iteration {i}")
time.sleep(0.1) # Отпускает GIL
threads = [threading.Thread(target=worker, args=(f"T{i}",)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
Итерпретатор переключается между потоками по времени (tick-based switching), а не параллельно выполняет их.
Параметры потока (Thread)
import threading
thread = threading.Thread(
target=worker_function,
args=(arg1, arg2), # позиционные аргументы
kwargs={"key": "value"}, # именованные аргументы
daemon=False, # daemon=True завершается с основной программой
name="WorkerThread" # имя для логирования
)
thread.start()
thread.join(timeout=5) # Ждать завершения до 5 сек
print(thread.is_alive()) # Проверить запущен ли
print(threading.active_count()) # Кол-во активных потоков
Синхронизация потоков
Lock — взаимное исключение:
import threading
lock = threading.Lock()
shared_data = 0
def increment():
global shared_data
with lock: # Критическая секция
temp = shared_data
temp += 1
shared_data = temp
for i in range(10):
t = threading.Thread(target=increment)
t.start()
RLock — рекурсивная блокировка (один поток может захватить несколько раз):
rlock = threading.RLock()
def recursive_function():
with rlock:
print("First lock")
recursive_function_inner()
def recursive_function_inner():
with rlock: # Один и тот же поток может захватить снова
print("Nested lock")
Semaphore — ограничение доступа к ресурсам:
semaphore = threading.Semaphore(3) # Максимум 3 потока одновременно
def limited_resource():
with semaphore:
print("Working...")
time.sleep(1)
Event — сигнал между потоками:
event = threading.Event()
def waiter():
print("Waiting for signal...")
event.wait() # Блокируется до signal()
print("Signal received!")
def signaler():
time.sleep(2)
event.set() # Отправить сигнал
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=signaler)
t1.start()
t2.start()
t1.join()
t2.join()
Condition — условная переменная:
condition = threading.Condition()
data = []
def producer():
with condition:
data.append(42)
condition.notify_all() # Разбудить ожидающие потоки
def consumer():
with condition:
while not data: # Проверка условия
condition.wait() # Ждать уведомления
print(f"Got: {data.pop()}")
ThreadLocal Storage
Данные, уникальные для каждого потока:
local = threading.local()
local.user_id = 123 # Для текущего потока
def thread_func():
print(local.user_id) # AttributeError в другом потоке
Context Switching механизм
Переключение происходит:
- По времени — каждые N инструкций Python (sys.setswitchinterval())
- При блокировке — I/O операции, time.sleep(), threading примитивы
- Явно — через time.sleep(0)
import sys
print(sys.getswitchinterval()) # По умолчанию 0.005 сек (5мс)
sys.setswitchinterval(0.001) # Переключаться чаще
asyncio альтернатива
Для I/O-bound задач лучше asyncio (кооперативная многозадачность):
import asyncio
async def fetch_data(url):
await asyncio.sleep(1) # Имитация сетевого запроса
return f"Data from {url}"
async def main():
urls = ["url1", "url2", "url3"]
results = await asyncio.gather(*[fetch_data(url) for url in urls])
print(results)
asyncio.run(main())
Asyncio быстрее потоков для I/O, потому что нет overhead переключения контекста.
Когда использовать что
- Потоки (threading) — I/O-bound + нужна простота, но помни про GIL
- asyncio — I/O-bound + много операций одновременно
- multiprocessing — CPU-bound задачи (обходит GIL)
- Executor — встроенные потоки/процессы для простых сценариев
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(worker, i) for i in range(10)]
for future in futures:
result = future.result()