← Назад к вопросам
Какие фреймворки использовал для работы с NoSQL БД?
2.0 Middle🔥 191 комментариев
#Базы данных (NoSQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Фреймворки и библиотеки для работы с NoSQL БД в Python
MongoDB
1. PyMongo — официальный синхронный драйвер
# pip install pymongo
from pymongo import MongoClient
from bson.objectid import ObjectId
import datetime
# Подключение
client = MongoClient('mongodb://localhost:27017/')
db = client['myapp_db']
collection = db['users']
# CRUD операции
# Create
user_doc = {
'name': 'John Doe',
'email': 'john@example.com',
'age': 30,
'created_at': datetime.datetime.now(),
'tags': ['python', 'web'],
'metadata': {'city': 'New York', 'country': 'USA'}
}
result = collection.insert_one(user_doc)
user_id = result.inserted_id
# Read
user = collection.find_one({'_id': user_id})
users_list = collection.find({'age': {'$gte': 25}}).limit(10)
# Update
collection.update_one(
{'_id': user_id},
{'$set': {'age': 31, 'updated_at': datetime.datetime.now()}}
)
# Delete
collection.delete_one({'_id': user_id})
# Агрегация (мощная фишка MongoDB)
pipeline = [
{'$match': {'age': {'$gte': 25}}},
{'$group': {
'_id': None,
'average_age': {'$avg': '$age'},
'count': {'$sum': 1}
}}
]
results = collection.aggregate(pipeline)
for result in results:
print(result)
# Индексы
collection.create_index('email', unique=True)
collection.create_index([('name', 1), ('age', -1)])
Плюсы:
- ✅ Официальный драйвер MongoDB
- ✅ Полная поддержка MongoDB features
- ✅ Отлично задокументирован
- ✅ Встроенная поддержка connection pooling
Минусы:
- ❌ Синхронный (блокирует)
- ❌ Низкоуровневый — много boilerplate кода
2. Motor — асинхронный драйвер для MongoDB
# pip install motor
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio
async def main():
client = AsyncIOMotorClient('mongodb://localhost:27017/')
db = client['myapp_db']
collection = db['users']
# Асинхронные операции
user_doc = {'name': 'Alice', 'email': 'alice@example.com'}
result = await collection.insert_one(user_doc)
user = await collection.find_one({'_id': result.inserted_id})
print(user)
# Асинхронный поиск
async for user in collection.find({'age': {'$gte': 25}}):
print(user['name'])
client.close()
asyncio.run(main())
Плюсы:
- ✅ Асинхронный (non-blocking)
- ✅ Совместим с asyncio и FastAPI
- ✅ Отлично для high-load приложений
Минусы:
- ❌ Всё ещё low-level
- ❌ Нет type hints
3. MongoEngine — ODM (Object Document Mapper)
# pip install mongoengine
from mongoengine import Document, StringField, IntField, ListField, DictField
from mongoengine import connect
from datetime import datetime
# Подключение
connect('myapp_db', host='mongodb://localhost:27017')
# Определение модели (похоже на SQLAlchemy)
class User(Document):
name = StringField(required=True, unique=True)
email = StringField(required=True, unique=True)
age = IntField(min_value=0, max_value=150)
tags = ListField(StringField())
metadata = DictField()
created_at = datetime.now()
meta = {'collection': 'users'}
def __str__(self):
return f'{self.name} ({self.email})'
# Создание
user = User(
name='Bob Smith',
email='bob@example.com',
age=28,
tags=['python', 'django'],
metadata={'city': 'Boston'}
)
user.save()
# Чтение
user = User.objects.get(email='bob@example.com')
users_list = User.objects(age__gte=25).limit(10)
# Обновление
user.update(age=29)
user.save()
# Удаление
user.delete()
# Запросы
active_users = User.objects(age__gte=18, age__lte=65)
old_users = User.objects(age__gt=60)
by_tag = User.objects(tags='python')
Плюсы:
- ✅ ORM-like experience (знакомо разработчикам SQL)
- ✅ Валидация данных встроена
- ✅ Queries как в Django ORM
Минусы:
- ❌ Синхронный
- ❌ Медленнее raw PyMongo
- ❌ Менее гибкий
4. Beanie — асинхронный ODM
# pip install beanie
from beanie import Document, Indexed
from pymongo import AsyncIOMotorClient
from pydantic import BaseModel, Field
from typing import List, Optional
import asyncio
class User(Document):
name: str = Indexed(unique=True)
email: str = Indexed(unique=True)
age: int
tags: List[str] = []
metadata: Optional[dict] = None
class Settings:
name = 'users' # Имя коллекции
async def main():
# Инициализация
client = AsyncIOMotorClient('mongodb://localhost:27017')
await client.drop_database('myapp_db')
await Beanie.init_beanie(
database=client['myapp_db'],
models=[User]
)
# Create
user = User(
name='Charlie',
email='charlie@example.com',
age=32,
tags=['python', 'fastapi']
)
await user.insert()
# Read
user = await User.find_one(User.email == 'charlie@example.com')
users = await User.find(User.age > 25).to_list()
# Update
await user.set({User.age: 33})
# Delete
await user.delete()
client.close()
asyncio.run(main())
Плюсы:
- ✅ Асинхронный ODM
- ✅ Использует Pydantic (type hints)
- ✅ Идеален для FastAPI
- ✅ Современный подход
Минусы:
- ❌ Более молодой проект, может быть менее стабилен
- ❌ Меньше документации
Redis
1. Redis-py — синхронный клиент
# pip install redis
import redis
import json
# Подключение
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Строковые операции
r.set('user:1:name', 'John') # SET
value = r.get('user:1:name') # GET
# Счётчики
r.incr('page_views') # Инкрементировать на 1
r.incrby('score', 10) # Инкрементировать на 10
# Expires (TTL)
r.setex('session:abc123', 3600, 'session_data') # Set with expire
r.expire('key', 3600) # Установить время жизни
# Списки
r.rpush('queue', 'job1', 'job2') # Добавить в конец
r.lpop('queue') # Получить с начала
r.lrange('queue', 0, -1) # Получить все
# Множества
r.sadd('tags', 'python', 'web', 'api')
r.smembers('tags')
r.scard('tags') # Количество
# Хеши
r.hset('user:1', mapping={
'name': 'Alice',
'email': 'alice@example.com',
'age': 28
})
r.hgetall('user:1')
r.hget('user:1', 'name')
# JSON (Redis 7.0+)
user_data = {'name': 'Bob', 'age': 30}
r.json().set('user:2', '$', user_data)
user = r.json().get('user:2')
# Pub/Sub
pubsub = r.pubsub()
pubsub.subscribe('events')
for message in pubsub.listen():
print(message['data'])
# Pipeline (batch операции)
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.execute()
Плюсы:
- ✅ Простой и быстрый
- ✅ Полная поддержка Redis features
- ✅ Встроенная Pub/Sub
Минусы:
- ❌ Синхронный
- ❌ Требует careful connection pooling
2. aioredis / redis-py async — асинхронный клиент
# pip install redis[asyncio]
import asyncio
import redis.asyncio as redis
async def main():
r = await redis.from_url('redis://localhost')
# Асинхронные операции
await r.set('key', 'value')
value = await r.get('key')
# Pipeline
async with r.pipeline(transaction=True) as pipe:
await pipe.set('key1', 'value1')
await pipe.set('key2', 'value2')
await pipe.execute()
await r.close()
asyncio.run(main())
Плюсы:
- ✅ Асинхронный
- ✅ Совместим с FastAPI
3. Walrus — ORM для Redis
# pip install walrus
from walrus import Database
db = Database(host='localhost', port=6379)
# Определение моделей
class User(db.Model):
username = db.TextField()
email = db.TextField()
created = db.DateTimeField()
# Использование
user = User.create(username='john', email='john@example.com')
users = User.all()
user_by_id = User[1]
DynamoDB (Amazon)
Boto3 — официальный SDK
# pip install boto3
import boto3
from boto3.dynamodb.conditions import Key, Attr
# Клиент
dynamodb = boto3.resource(
'dynamodb',
region_name='us-east-1',
endpoint_url='http://localhost:8000' # Local DynamoDB
)
table = dynamodb.Table('users')
# Put item
table.put_item(
Item={
'user_id': 'user123',
'name': 'John',
'email': 'john@example.com',
'age': 30
}
)
# Get item
response = table.get_item(Key={'user_id': 'user123'})
item = response['Item']
# Query
response = table.query(
KeyConditionExpression=Key('user_id').eq('user123')
)
# Scan
response = table.scan(
FilterExpression=Attr('age').gte(25)
)
Pynamodb — ODM для DynamoDB
# pip install pynamodb
from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute
class User(Model):
class Meta:
table_name = 'users'
region = 'us-east-1'
user_id = UnicodeAttribute(hash_key=True)
name = UnicodeAttribute()
email = UnicodeAttribute()
age = NumberAttribute()
# Create
user = User('user123')
user.name = 'Alice'
user.email = 'alice@example.com'
user.save()
# Get
user = User.get('user123')
# Query
users = User.query('user123')
Elasticsearch
# pip install elasticsearch
from elasticsearch import Elasticsearch
from datetime import datetime
es = Elasticsearch(['localhost:9200'])
# Index document
doc = {
'author': 'John',
'text': 'Python is awesome',
'timestamp': datetime.now(),
'tags': ['python', 'programming']
}
res = es.index(index='blog', id=1, document=doc)
# Search
res = es.search(
index='blog',
query={
'match': {
'text': 'python'
}
}
)
for hit in res['hits']['hits']:
print(hit['_source'])
# Aggregation
res = es.search(
index='blog',
aggs={
'tags_count': {
'terms': {'field': 'tags'}
}
}
)
Cassandra
# pip install cassandra-driver
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('mykeyspace')
# Insert
session.execute(
"INSERT INTO users (id, name, email) VALUES (%s, %s, %s)",
('user1', 'John', 'john@example.com')
)
# Select
rows = session.execute(
"SELECT * FROM users WHERE id = %s",
('user1',)
)
for row in rows:
print(row.name, row.email)
Сравнение подходов
| БД | Синхронный | Асинхронный | ODM | Когда использовать |
|---|---|---|---|---|
| MongoDB | PyMongo | Motor | MongoEngine/Beanie | Document-oriented data |
| Redis | redis-py | aioredis | Walrus | Caching, sessions, queues |
| DynamoDB | Boto3 | aioboto3 | PynamoDB | AWS serverless |
| Elasticsearch | elasticsearch-py | async | - | Full-text search |
| Cassandra | cassandra-driver | - | - | High-scale time series |
Рекомендации
Для новых проектов:
- MongoDB → Beanie (асинхронный ODM)
- Redis → aioredis (асинхронный)
- DynamoDB → Pynamodb или Boto3
Для production:
- Используй асинхронные драйверы
- Настрой connection pooling
- Добавь мониторинг и логирование
- Обработай исключения и retry логику