← Назад к вопросам

Какие фреймворки использовал для работы с NoSQL БД?

2.0 Middle🔥 191 комментариев
#Базы данных (NoSQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Фреймворки и библиотеки для работы с NoSQL БД в Python

MongoDB

1. PyMongo — официальный синхронный драйвер

# pip install pymongo

from pymongo import MongoClient
from bson.objectid import ObjectId
import datetime

# Подключение
client = MongoClient('mongodb://localhost:27017/')
db = client['myapp_db']
collection = db['users']

# CRUD операции

# Create
user_doc = {
    'name': 'John Doe',
    'email': 'john@example.com',
    'age': 30,
    'created_at': datetime.datetime.now(),
    'tags': ['python', 'web'],
    'metadata': {'city': 'New York', 'country': 'USA'}
}
result = collection.insert_one(user_doc)
user_id = result.inserted_id

# Read
user = collection.find_one({'_id': user_id})
users_list = collection.find({'age': {'$gte': 25}}).limit(10)

# Update
collection.update_one(
    {'_id': user_id},
    {'$set': {'age': 31, 'updated_at': datetime.datetime.now()}}
)

# Delete
collection.delete_one({'_id': user_id})

# Агрегация (мощная фишка MongoDB)
pipeline = [
    {'$match': {'age': {'$gte': 25}}},
    {'$group': {
        '_id': None,
        'average_age': {'$avg': '$age'},
        'count': {'$sum': 1}
    }}
]
results = collection.aggregate(pipeline)
for result in results:
    print(result)

# Индексы
collection.create_index('email', unique=True)
collection.create_index([('name', 1), ('age', -1)])

Плюсы:

  • ✅ Официальный драйвер MongoDB
  • ✅ Полная поддержка MongoDB features
  • ✅ Отлично задокументирован
  • ✅ Встроенная поддержка connection pooling

Минусы:

  • ❌ Синхронный (блокирует)
  • ❌ Низкоуровневый — много boilerplate кода

2. Motor — асинхронный драйвер для MongoDB

# pip install motor

from motor.motor_asyncio import AsyncIOMotorClient
import asyncio

async def main():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['myapp_db']
    collection = db['users']
    
    # Асинхронные операции
    user_doc = {'name': 'Alice', 'email': 'alice@example.com'}
    result = await collection.insert_one(user_doc)
    
    user = await collection.find_one({'_id': result.inserted_id})
    print(user)
    
    # Асинхронный поиск
    async for user in collection.find({'age': {'$gte': 25}}):
        print(user['name'])
    
    client.close()

asyncio.run(main())

Плюсы:

  • ✅ Асинхронный (non-blocking)
  • ✅ Совместим с asyncio и FastAPI
  • ✅ Отлично для high-load приложений

Минусы:

  • ❌ Всё ещё low-level
  • ❌ Нет type hints

3. MongoEngine — ODM (Object Document Mapper)

# pip install mongoengine

from mongoengine import Document, StringField, IntField, ListField, DictField
from mongoengine import connect
from datetime import datetime

# Подключение
connect('myapp_db', host='mongodb://localhost:27017')

# Определение модели (похоже на SQLAlchemy)
class User(Document):
    name = StringField(required=True, unique=True)
    email = StringField(required=True, unique=True)
    age = IntField(min_value=0, max_value=150)
    tags = ListField(StringField())
    metadata = DictField()
    created_at = datetime.now()
    
    meta = {'collection': 'users'}
    
    def __str__(self):
        return f'{self.name} ({self.email})'

# Создание
user = User(
    name='Bob Smith',
    email='bob@example.com',
    age=28,
    tags=['python', 'django'],
    metadata={'city': 'Boston'}
)
user.save()

# Чтение
user = User.objects.get(email='bob@example.com')
users_list = User.objects(age__gte=25).limit(10)

# Обновление
user.update(age=29)
user.save()

# Удаление
user.delete()

# Запросы
active_users = User.objects(age__gte=18, age__lte=65)
old_users = User.objects(age__gt=60)
by_tag = User.objects(tags='python')

Плюсы:

  • ✅ ORM-like experience (знакомо разработчикам SQL)
  • ✅ Валидация данных встроена
  • ✅ Queries как в Django ORM

Минусы:

  • ❌ Синхронный
  • ❌ Медленнее raw PyMongo
  • ❌ Менее гибкий

4. Beanie — асинхронный ODM

# pip install beanie

from beanie import Document, Indexed
from pymongo import AsyncIOMotorClient
from pydantic import BaseModel, Field
from typing import List, Optional
import asyncio

class User(Document):
    name: str = Indexed(unique=True)
    email: str = Indexed(unique=True)
    age: int
    tags: List[str] = []
    metadata: Optional[dict] = None
    
    class Settings:
        name = 'users'  # Имя коллекции

async def main():
    # Инициализация
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    await client.drop_database('myapp_db')
    await Beanie.init_beanie(
        database=client['myapp_db'],
        models=[User]
    )
    
    # Create
    user = User(
        name='Charlie',
        email='charlie@example.com',
        age=32,
        tags=['python', 'fastapi']
    )
    await user.insert()
    
    # Read
    user = await User.find_one(User.email == 'charlie@example.com')
    users = await User.find(User.age > 25).to_list()
    
    # Update
    await user.set({User.age: 33})
    
    # Delete
    await user.delete()
    
    client.close()

asyncio.run(main())

Плюсы:

  • ✅ Асинхронный ODM
  • ✅ Использует Pydantic (type hints)
  • ✅ Идеален для FastAPI
  • ✅ Современный подход

Минусы:

  • ❌ Более молодой проект, может быть менее стабилен
  • ❌ Меньше документации

Redis

1. Redis-py — синхронный клиент

# pip install redis

import redis
import json

# Подключение
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Строковые операции
r.set('user:1:name', 'John')  # SET
value = r.get('user:1:name')  # GET

# Счётчики
r.incr('page_views')  # Инкрементировать на 1
r.incrby('score', 10)  # Инкрементировать на 10

# Expires (TTL)
r.setex('session:abc123', 3600, 'session_data')  # Set with expire
r.expire('key', 3600)  # Установить время жизни

# Списки
r.rpush('queue', 'job1', 'job2')  # Добавить в конец
r.lpop('queue')  # Получить с начала
r.lrange('queue', 0, -1)  # Получить все

# Множества
r.sadd('tags', 'python', 'web', 'api')
r.smembers('tags')
r.scard('tags')  # Количество

# Хеши
r.hset('user:1', mapping={
    'name': 'Alice',
    'email': 'alice@example.com',
    'age': 28
})
r.hgetall('user:1')
r.hget('user:1', 'name')

# JSON (Redis 7.0+)
user_data = {'name': 'Bob', 'age': 30}
r.json().set('user:2', '$', user_data)
user = r.json().get('user:2')

# Pub/Sub
pubsub = r.pubsub()
pubsub.subscribe('events')
for message in pubsub.listen():
    print(message['data'])

# Pipeline (batch операции)
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.execute()

Плюсы:

  • ✅ Простой и быстрый
  • ✅ Полная поддержка Redis features
  • ✅ Встроенная Pub/Sub

Минусы:

  • ❌ Синхронный
  • ❌ Требует careful connection pooling

2. aioredis / redis-py async — асинхронный клиент

# pip install redis[asyncio]

import asyncio
import redis.asyncio as redis

async def main():
    r = await redis.from_url('redis://localhost')
    
    # Асинхронные операции
    await r.set('key', 'value')
    value = await r.get('key')
    
    # Pipeline
    async with r.pipeline(transaction=True) as pipe:
        await pipe.set('key1', 'value1')
        await pipe.set('key2', 'value2')
        await pipe.execute()
    
    await r.close()

asyncio.run(main())

Плюсы:

  • ✅ Асинхронный
  • ✅ Совместим с FastAPI

3. Walrus — ORM для Redis

# pip install walrus

from walrus import Database

db = Database(host='localhost', port=6379)

# Определение моделей
class User(db.Model):
    username = db.TextField()
    email = db.TextField()
    created = db.DateTimeField()

# Использование
user = User.create(username='john', email='john@example.com')
users = User.all()
user_by_id = User[1]

DynamoDB (Amazon)

Boto3 — официальный SDK

# pip install boto3

import boto3
from boto3.dynamodb.conditions import Key, Attr

# Клиент
dynamodb = boto3.resource(
    'dynamodb',
    region_name='us-east-1',
    endpoint_url='http://localhost:8000'  # Local DynamoDB
)

table = dynamodb.Table('users')

# Put item
table.put_item(
    Item={
        'user_id': 'user123',
        'name': 'John',
        'email': 'john@example.com',
        'age': 30
    }
)

# Get item
response = table.get_item(Key={'user_id': 'user123'})
item = response['Item']

# Query
response = table.query(
    KeyConditionExpression=Key('user_id').eq('user123')
)

# Scan
response = table.scan(
    FilterExpression=Attr('age').gte(25)
)

Pynamodb — ODM для DynamoDB

# pip install pynamodb

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute

class User(Model):
    class Meta:
        table_name = 'users'
        region = 'us-east-1'
    
    user_id = UnicodeAttribute(hash_key=True)
    name = UnicodeAttribute()
    email = UnicodeAttribute()
    age = NumberAttribute()

# Create
user = User('user123')
user.name = 'Alice'
user.email = 'alice@example.com'
user.save()

# Get
user = User.get('user123')

# Query
users = User.query('user123')

Elasticsearch

# pip install elasticsearch

from elasticsearch import Elasticsearch
from datetime import datetime

es = Elasticsearch(['localhost:9200'])

# Index document
doc = {
    'author': 'John',
    'text': 'Python is awesome',
    'timestamp': datetime.now(),
    'tags': ['python', 'programming']
}
res = es.index(index='blog', id=1, document=doc)

# Search
res = es.search(
    index='blog',
    query={
        'match': {
            'text': 'python'
        }
    }
)
for hit in res['hits']['hits']:
    print(hit['_source'])

# Aggregation
res = es.search(
    index='blog',
    aggs={
        'tags_count': {
            'terms': {'field': 'tags'}
        }
    }
)

Cassandra

# pip install cassandra-driver

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('mykeyspace')

# Insert
session.execute(
    "INSERT INTO users (id, name, email) VALUES (%s, %s, %s)",
    ('user1', 'John', 'john@example.com')
)

# Select
rows = session.execute(
    "SELECT * FROM users WHERE id = %s",
    ('user1',)
)
for row in rows:
    print(row.name, row.email)

Сравнение подходов

БДСинхронныйАсинхронныйODMКогда использовать
MongoDBPyMongoMotorMongoEngine/BeanieDocument-oriented data
Redisredis-pyaioredisWalrusCaching, sessions, queues
DynamoDBBoto3aioboto3PynamoDBAWS serverless
Elasticsearchelasticsearch-pyasync-Full-text search
Cassandracassandra-driver--High-scale time series

Рекомендации

Для новых проектов:

  1. MongoDB → Beanie (асинхронный ODM)
  2. Redis → aioredis (асинхронный)
  3. DynamoDB → Pynamodb или Boto3

Для production:

  • Используй асинхронные драйверы
  • Настрой connection pooling
  • Добавь мониторинг и логирование
  • Обработай исключения и retry логику
Какие фреймворки использовал для работы с NoSQL БД? | PrepBro