← Назад к вопросам

Какие знаешь IPC в Linux?

1.8 Middle🔥 81 комментариев
#DevOps и инфраструктура#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# IPC (Inter-Process Communication) в Linux

IPC — механизмы взаимодействия между процессами в Linux. Они позволяют процессам обмениваться данными, синхронизироваться и координировать работу.

Основные механизмы IPC

1. Pipes (Трубы)

Простейший механизм для передачи данных между процессами.

import os
import subprocess

# Создать анонимную трубу (pipe)
read_fd, write_fd = os.pipe()

# Дочерний процесс пишет, родительский читает
pid = os.fork()

if pid == 0:  # Дочерний процесс
    os.close(read_fd)  # Закрыть конец для чтения
    message = b"Hello from child"
    os.write(write_fd, message)
    os.close(write_fd)
    os._exit(0)
else:  # Родительский процесс
    os.close(write_fd)  # Закрыть конец для записи
    data = os.read(read_fd, 1024)
    print(f"Received: {data.decode()}")
    os.close(read_fd)
    os.waitpid(pid, 0)

# Использование в shell
# ls | grep .py  # Вывод ls проходит через grep

Характеристики:

  • Однонаправленные (unidirectional)
  • Работают только между родителем и ребенком
  • Буфер в памяти ядра

2. Named Pipes (FIFO)

Пайпы с именем, доступные разным процессам.

import os
import subprocess

# Создать именованный pipe
fifo_path = "/tmp/my_fifo"
try:
    os.mkfifo(fifo_path)
except FileExistsError:
    pass

# Процесс 1: Писатель
def writer():
    with open(fifo_path, "w") as fifo:
        fifo.write("Hello from writer")
        fifo.flush()

# Процесс 2: Читатель
def reader():
    with open(fifo_path, "r") as fifo:
        data = fifo.read()
        print(f"Received: {data}")

# ВАЖНО: одновременно запустить читателя и писателя
# reader_process = subprocess.Popen(reader)
# writer_process = subprocess.Popen(writer)

Характеристики:

  • Именованные
  • Можно использовать разные процессы
  • Персистентны в файловой системе

3. Message Queues (Очереди сообщений)

Расширенный способ передачи сообщений между процессами.

import sys
sys.path.append('/usr/lib/python3/dist-packages')

try:
    from posix_ipc import MessageQueue
    
    # Создать очередь сообщений
    mq = MessageQueue("/my_queue", flags=1)
    # flags=1 означает создать если нет
    
    # Отправить сообщение
    message = b"Important message"
    mq.send(message, priority=1)
    
    # Получить сообщение
    received, priority = mq.receive()
    print(f"Message: {received.decode()}, Priority: {priority}")
    
    # Удалить очередь
    mq.unlink()
except ImportError:
    # Альтернатива через queue.Queue
    from multiprocessing import Queue
    
    q = Queue()
    q.put("Message from process 1")
    msg = q.get(timeout=1)
    print(f"Received: {msg}")

Характеристики:

  • Асинхронная доставка
  • Поддержка приоритетов
  • Буферизация сообщений

4. Shared Memory (Общая память)

Процессы получают доступ к одной области памяти.

from multiprocessing import shared_memory
import ctypes

# Создать общую память (100 целых чисел)
shm = shared_memory.SharedMemory(create=True, size=100 * ctypes.sizeof(ctypes.c_int))

# Создать array в общей памяти
shared_array = ctypes.cast(shm.buf, ctypes.POINTER(ctypes.c_int * 100)).contents

# Процесс 1: Записать данные
for i in range(100):
    shared_array[i] = i * 2

# Процесс 2: Прочитать данные (в другом процессе)
# shm2 = shared_memory.SharedMemory(name=shm.name)
# shared_array2 = ctypes.cast(shm2.buf, ctypes.POINTER(ctypes.c_int * 100)).contents
# for i in range(100):
#     print(shared_array2[i])

# Освободить память
shm.close()
shm.unlink()

Характеристики:

  • Самый быстрый способ (прямой доступ к памяти)
  • Требует синхронизации (мьютексы, семафоры)
  • Опасен — race conditions

5. Semaphores (Семафоры)

Механизм синхронизации для контроля доступа к ресурсам.

from multiprocessing import Semaphore, Process
import time

# Семафор с начальным значением 2 (до 2 процессов могут войти)
semaphore = Semaphore(2)

def worker(name):
    with semaphore:  # Уменьшить счетчик или ждать
        print(f"{name} вошел в критическую секцию")
        time.sleep(1)
        print(f"{name} покидает критическую секцию")
        # Увеличить счетчик при выходе

# Запустить 5 процессов, но только 2 будут одновременно
processes = []
for i in range(5):
    p = Process(target=worker, args=(f"Worker {i}",))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

print("Все процессы завершены")

Характеристики:

  • Счетчик для контроля доступа
  • Может быть бинарный (0/1) или многозначный
  • Использует wait() и post()

6. Mutexes (Мьютексы)

Бинарные семафоры для взаимного исключения.

from multiprocessing import Lock
import time

# Мьютекс (взаимное исключение)
mutex = Lock()

shared_counter = 0  # Глобальная переменная

def increment_counter():
    global shared_counter
    with mutex:  # Заблокировать доступ
        # Критическая секция — только один процесс может быть здесь
        temp = shared_counter
        time.sleep(0.001)  # Имитировать работу
        shared_counter = temp + 1
        # Автоматическое разблокирование при выходе

# Без мьютекса было бы race condition
# Со временем shared_counter < 1000 (потери обновлений)

Характеристики:

  • Всегда 0 (заблокирован) или 1 (разблокирован)
  • Может быть владельцем только один процесс
  • Рекурсивные мьютексы позволяют одному процессу блокировать дважды

7. Condition Variables (Условные переменные)

Позволяют процессам ждать определенного условия.

from multiprocessing import Condition, Process
import time

condition = Condition()
data_ready = False
data = None

def producer():
    global data_ready, data
    with condition:
        print("Producer: создание данных")
        time.sleep(1)
        data = "Important data"
        data_ready = True
        print("Producer: уведомляю потребителя")
        condition.notify()  # Разбудить ожидающие процессы

def consumer():
    global data_ready, data
    with condition:
        print("Consumer: ожидаю данные")
        condition.wait()  # Ждать уведомления
        print(f"Consumer: получил {data}")

if __name__ == "__main__":
    p = Process(target=producer)
    c = Process(target=consumer)
    
    c.start()
    time.sleep(0.1)  # Убедиться что consumer готов
    p.start()
    
    p.join()
    c.join()

Характеристики:

  • Комбинирует мьютекс и сигнал
  • wait() освобождает мьютекс и ждет
  • notify() пробуждает ожидающие процессы

8. Sockets (Сокеты)

Для межпроцессного взаимодействия через сеть (локальной или удаленной).

import socket
import os

# Unix domain socket (локальный)
socket_path = "/tmp/my_socket"

# Сервер
if os.path.exists(socket_path):
    os.remove(socket_path)

server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_socket.bind(socket_path)
server_socket.listen(1)

print("Сервер слушает на", socket_path)

connection, client_address = server_socket.accept()
try:
    data = connection.recv(1024)
    print(f"Получено: {data.decode()}")
finally:
    connection.close()
    server_socket.close()

# Клиент
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client_socket.connect(socket_path)
client_socket.sendall(b"Hello from client")
client_socket.close()

Характеристики:

  • Unix domain sockets для локального IPC
  • TCP/UDP для сетевого IPC
  • Наиболее гибкий способ

Сравнение механизмов IPC

Механизм      | Скорость | Сложность | Потокобезопасность | Использование
-------------|----------|-----------|-------------------|-----------------------
Pipes        | Быстро   | Низкая    | Да                | Shell pipeline
Named Pipes  | Быстро   | Средняя   | Да                | Несвязанные процессы
Msg Queues   | Средняя  | Средняя   | Да                | Асинхронный обмен
Shared Mem   | Очень    | Высокая   | Нет (нужны mutex) | Высокопроизводительный
Semaphores   | Очень    | Средняя   | Да                | Синхронизация
Mutexes      | Очень    | Низкая    | Да                | Критические секции
CondVars     | Очень    | Средняя   | Да                | Ожидание условий
Sockets      | Медленно  | Средняя   | Да                | Локальная и сетевая

Практический пример: многопроцессный сервер

from multiprocessing import Process, Queue, Lock
import time
import os

# Очередь для задач
task_queue = Queue()
# Замок для логирования
log_lock = Lock()

def worker(worker_id):
    while True:
        try:
            task = task_queue.get(timeout=1)
            if task is None:  # Сигнал завершить
                break
            
            with log_lock:
                print(f"Worker {worker_id} выполняет {task}")
            
            time.sleep(1)  # Имитировать работу
            
            with log_lock:
                print(f"Worker {worker_id} завершил {task}")
        except:
            pass

if __name__ == "__main__":
    # Запустить 4 рабочих
    workers = []
    for i in range(4):
        p = Process(target=worker, args=(i,))
        p.start()
        workers.append(p)
    
    # Добавить задачи
    for task in range(10):
        task_queue.put(f"Task {task}")
    
    # Сигнализировать завершение
    for _ in range(4):
        task_queue.put(None)
    
    # Ждать завершения
    for w in workers:
        w.join()
    
    print("Все процессы завершены")

Заключение

Основные механизмы IPC в Linux:

  1. Pipes — простые, однонаправленные
  2. Named Pipes (FIFO) — для несвязанных процессов
  3. Message Queues — асинхронный обмен с приоритетами
  4. Shared Memory — самый быстрый, требует синхронизации
  5. Semaphores — счетчик для контроля доступа
  6. Mutexes — бинарные семафоры для взаимного исключения
  7. Condition Variables — ожидание определенных условий
  8. Sockets — универсальное решение для локального и сетевого IPC

Выбор механизма зависит от требований к скорости, сложности синхронизации и типа обмена данными.

Какие знаешь IPC в Linux? | PrepBro