← Назад к вопросам
Как посмотреть количество текущих конектов в БД?
1.8 Middle🔥 151 комментариев
#DevOps и инфраструктура#Базы данных (SQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Проверка количества текущих соединений с БД
Мониторинг текущих соединений — критичная задача для диагностики проблем с производительностью и утечек соединений. Подходы зависят от типа БД.
1. PostgreSQL — SQL запросы
-- Посмотреть все активные соединения
SELECT
datname as database,
usename as username,
count(*) as connections
FROM pg_stat_activity
GROUP BY datname, usename
ORDER BY connections DESC;
-- Посмотреть подробно все соединения
SELECT
pid,
datname,
usename,
application_name,
state,
query,
backend_start,
query_start
FROM pg_stat_activity
WHERE datname = 'mydb'
ORDER BY backend_start DESC;
-- Количество соединений для конкретной БД
SELECT count(*) as connections FROM pg_stat_activity WHERE datname = 'mydb';
-- Максимально допустимое количество соединений
SELECT setting FROM pg_settings WHERE name = 'max_connections';
-- Процент используемых соединений
SELECT
count(*) * 100.0 / (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') as percent_used
FROM pg_stat_activity;
2. MySQL — SQL запросы
-- Посмотреть все активные соединения
SHOW PROCESSLIST;
-- Подробнее
SHOW FULL PROCESSLIST;
-- Количество соединений
SHOW STATUS LIKE 'Threads_connected';
-- Максимально допустимое количество
SHOW VARIABLES LIKE 'max_connections';
-- Соединения по пользователям
SELECT user, count(*) as connections FROM information_schema.processlist GROUP BY user;
-- Процент используемых соединений
SELECT
(SELECT count(*) FROM information_schema.processlist) * 100.0 /
(SELECT @@max_connections) as percent_used;
-- Убить конкретное соединение
KILL <process_id>;
3. Python — Проверка из приложения
import psycopg2
from psycopg2 import sql
def check_postgresql_connections(dbname, user, password, host):
"""Проверить количество соединений в PostgreSQL"""
try:
connection = psycopg2.connect(
dbname=dbname,
user=user,
password=password,
host=host,
)
cursor = connection.cursor()
# Общее количество соединений
cursor.execute("""
SELECT count(*) FROM pg_stat_activity WHERE datname = %s
""", (dbname,))
current_connections = cursor.fetchone()[0]
# Максимум соединений
cursor.execute("""
SELECT setting::int FROM pg_settings WHERE name = 'max_connections'
""")
max_connections = cursor.fetchone()[0]
# Детальная информация
cursor.execute("""
SELECT
usename,
application_name,
state,
count(*) as count
FROM pg_stat_activity
WHERE datname = %s
GROUP BY usename, application_name, state
""", (dbname,))
details = cursor.fetchall()
cursor.close()
connection.close()
result = {
'current_connections': current_connections,
'max_connections': max_connections,
'percent_used': (current_connections / max_connections) * 100,
'details': details,
}
return result
except psycopg2.Error as e:
print(f"Error: {e}")
return None
# Использование
info = check_postgresql_connections(
dbname='mydb',
user='postgres',
password='password',
host='localhost',
)
if info:
print(f"Current connections: {info['current_connections']}/{info['max_connections']}")
print(f"Usage: {info['percent_used']:.1f}%")
4. SQLAlchemy — Проверка пула соединений
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
# Создать engine с пулом соединений
engine = create_engine(
'postgresql://user:password@localhost/mydb',
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
echo_pool=True, # Логировать события пула
)
def check_sqlalchemy_pool():
"""Информация о пуле соединений SQLAlchemy"""
pool = engine.pool
print(f"Pool size: {pool.size()}")
print(f"Checked out connections: {pool.checkedout()}")
print(f"Queue size: {pool.queue.qsize()}")
# Также можно из БД
with engine.connect() as connection:
result = connection.execute(text("""
SELECT count(*) FROM pg_stat_activity WHERE datname = current_database()
"""))
count = result.scalar()
print(f"DB connections: {count}")
check_sqlalchemy_pool()
5. Django — Проверка соединений
# Django ORM с использованием inspect
from django.db import connection, connections
from django.test.utils import override_settings
def check_django_connections():
"""Информация о соединениях Django"""
# Текущее соединение
with connection.cursor() as cursor:
if connection.vendor == 'postgresql':
cursor.execute("""
SELECT count(*) FROM pg_stat_activity
WHERE datname = current_database()
""")
count = cursor.fetchone()[0]
print(f"PostgreSQL connections: {count}")
elif connection.vendor == 'mysql':
cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
result = cursor.fetchone()
if result:
print(f"MySQL connections: {result[1]}")
check_django_connections()
6. Класс для мониторинга
import time
from typing import Dict, List
import psycopg2
class ConnectionMonitor:
def __init__(self, dbname, user, password, host, check_interval=60):
self.dbname = dbname
self.user = user
self.password = password
self.host = host
self.check_interval = check_interval
self.history: List[Dict] = []
self.alerts = []
def get_current_connections(self) -> int:
"""Получить текущее количество соединений"""
try:
conn = psycopg2.connect(
dbname=self.dbname,
user=self.user,
password=self.password,
host=self.host,
)
cursor = conn.cursor()
cursor.execute(
"SELECT count(*) FROM pg_stat_activity WHERE datname = %s",
(self.dbname,)
)
count = cursor.fetchone()[0]
cursor.close()
conn.close()
return count
except Exception as e:
print(f"Error: {e}")
return -1
def check(self) -> Dict:
"""Проверить и записать в историю"""
current = self.get_current_connections()
max_conn = self.get_max_connections()
percent = (current / max_conn * 100) if max_conn > 0 else 0
record = {
'timestamp': time.time(),
'connections': current,
'max_connections': max_conn,
'percent_used': percent,
}
self.history.append(record)
# Проверить пороги
if percent > 90:
self.alerts.append(f"WARNING: {percent:.1f}% connections used")
elif percent > 95:
self.alerts.append(f"CRITICAL: {percent:.1f}% connections used")
return record
def get_max_connections(self) -> int:
"""Получить максимум соединений"""
try:
conn = psycopg2.connect(
dbname=self.dbname,
user=self.user,
password=self.password,
host=self.host,
)
cursor = conn.cursor()
cursor.execute(
"SELECT setting::int FROM pg_settings WHERE name = 'max_connections'"
)
max_conn = cursor.fetchone()[0]
cursor.close()
conn.close()
return max_conn
except Exception as e:
print(f"Error: {e}")
return -1
def get_idle_connections(self) -> int:
"""Получить количество неиспользуемых соединений"""
try:
conn = psycopg2.connect(
dbname=self.dbname,
user=self.user,
password=self.password,
host=self.host,
)
cursor = conn.cursor()
cursor.execute("""
SELECT count(*) FROM pg_stat_activity
WHERE datname = %s AND state = 'idle'
""", (self.dbname,))
idle = cursor.fetchone()[0]
cursor.close()
conn.close()
return idle
except Exception as e:
print(f"Error: {e}")
return -1
# Использование
monitor = ConnectionMonitor(
dbname='mydb',
user='postgres',
password='password',
host='localhost',
)
for i in range(5):
status = monitor.check()
print(f"Check {i+1}: {status['connections']}/{status['max_connections']} "
f"({status['percent_used']:.1f}%)")
if monitor.alerts:
print(f"Alerts: {monitor.alerts}")
monitor.alerts.clear()
time.sleep(5)
7. Мониторинг через Prometheus/Grafana
from prometheus_client import Gauge, start_http_server
import psycopg2
import threading
import time
# Метрики
db_connections = Gauge('db_connections', 'Current database connections')
db_max_connections = Gauge('db_max_connections', 'Max database connections')
db_idle_connections = Gauge('db_idle_connections', 'Idle database connections')
def collect_metrics():
while True:
try:
conn = psycopg2.connect(...)
cursor = conn.cursor()
# Текущие соединения
cursor.execute("SELECT count(*) FROM pg_stat_activity WHERE datname = current_database()")
db_connections.set(cursor.fetchone()[0])
# Максимум
cursor.execute("SELECT setting::int FROM pg_settings WHERE name = 'max_connections'")
db_max_connections.set(cursor.fetchone()[0])
# Idle
cursor.execute("SELECT count(*) FROM pg_stat_activity WHERE state = 'idle'")
db_idle_connections.set(cursor.fetchone()[0])
cursor.close()
conn.close()
except Exception as e:
print(f"Error: {e}")
time.sleep(30)
# Запустить сборку метрик в отдельном потоке
threading.Thread(target=collect_metrics, daemon=True).start()
# Запустить HTTP сервер для Prometheus
start_http_server(8000)
Best Practices
Проверяй регулярно:
- Текущее количество соединений
- Процент использования от максимума
- Idle соединения (признак утечки)
- Long-running queries
Установи пороги:
- Warning: 75% от max_connections
- Critical: 90% от max_connections
Профилактика:
- Правильная конфигурация пула соединений
- Регулярный мониторинг
- Анализ long-running queries
- Закрытие неиспользуемых соединений
Регулярный мониторинг соединений — залог стабильной работы приложения.