Какие трудности возникали при обработке больших данных в реальных проектах?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Фреймворки для работы с NoSQL в Python
Современные NoSQL базы требуют специальных инструментов. Разберу самые популярные и мощные фреймворки.
1. PyMongo (MongoDB)
Оффициальный драйвер MongoDB. Низкоуровневый, но мощный.
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
# Подключение
client = MongoClient("mongodb://localhost:27017/")
db = client["myapp"]
collection = db["users"]
# CRUD операции
# Create
user = {"name": "John", "email": "john@example.com", "age": 30}
result = collection.insert_one(user)
print(result.inserted_id)
# Read
user = collection.find_one({"email": "john@example.com"})
users = list(collection.find({"age": {"$gt": 18}}))
# Update
collection.update_one(
{"email": "john@example.com"},
{"$set": {"age": 31}}
)
# Delete
collection.delete_one({"email": "john@example.com"})
# Aggregation pipeline
results = collection.aggregate([
{"$match": {"age": {"$gt": 18}}},
{"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
])
Преимущества:
- Полный контроль
- Оффициальная поддержка
- Работает с большинством MongoDB features
Недостатки:
- Многословный код
- Нет типизации
- Нужно следить за эффективностью запросов
2. Motor (Async MongoDB)
Асинхронный драйвер MongoDB на базе PyMongo.
import motor.motor_asyncio
client = motor.motor_asyncio.AsyncMongoClient("mongodb://localhost:27017/")
db = client["myapp"]
collection = db["users"]
async def create_user(name, email):
result = await collection.insert_one({"name": name, "email": email})
return result.inserted_id
async def find_user(email):
user = await collection.find_one({"email": email})
return user
# Использование с FastAPI
from fastapi import FastAPI
app = FastAPI()
@app.post("/users")
async def add_user(name: str, email: str):
user_id = await create_user(name, email)
return {"id": str(user_id)}
Идеально для:
- Web приложений (FastAPI, Starlette)
- Высоконагруженных систем
- Асинхронных операций
3. Beanie (MongoDB + Pydantic)
Modern Pythonic ORM для MongoDB с типизацией Pydantic.
from beanie import Document, Indexed, PydanticObjectId
from pydantic import Field, EmailStr
from datetime import datetime
class User(Document):
name: str
email: Indexed(str, unique=True) # Индекс
age: int
created_at: datetime = Field(default_factory=datetime.now)
class Settings:
name = "users" # Имя коллекции
# CRUD с типизацией
async def create_user():
user = User(name="John", email="john@example.com", age=30)
await user.insert()
return user
async def find_users():
# Все операции типизированы!
users = await User.find({"age": {"$gt": 18}}).to_list()
return users
async def update_user(user_id):
user = await User.get(user_id)
user.age = 31
await user.save()
# Запросы
users = await User.find(User.age > 18).to_list()
adults = await User.find_many(User.age >= 18, limit=10).to_list()
Преимущества:
- Полная типизация
- Интеграция с Pydantic
- Асинхронность
- FastAPI compatible
4. Redis-py (Redis)
Для работы с Redis кэшем и структурами данных.
import redis
import json
# Подключение
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
# Строковые значения
r.set("user:1:name", "John")
name = r.get("user:1:name")
# Хэши (Hash)
r.hset("user:1", mapping={
"name": "John",
"email": "john@example.com",
"age": 30
})
user_data = r.hgetall("user:1")
# Списки (List - очередь)
r.lpush("tasks:queue", "task1", "task2", "task3")
task = r.rpop("tasks:queue")
# Множества (Set)
r.sadd("user:1:skills", "Python", "JavaScript", "Docker")
skills = r.smembers("user:1:skills")
# Сортированные множества (Sorted Set)
r.zadd("leaderboard", {"user1": 100, "user2": 150, "user3": 120})
top_users = r.zrange("leaderboard", 0, 9, withscores=True)
# TTL (время жизни)
r.setex("session:abc123", 3600, "user_data") # 1 час
r.expire("cache_key", 300) # 5 минут
# Pipeline (батчинг операций)
pipe = r.pipeline()
pipe.set("key1", "value1")
pipe.set("key2", "value2")
pipe.execute()
5. Aioredis (Async Redis)
Асинхронный Redis для asyncio.
import aioredis
async def main():
redis = await aioredis.create_redis_pool("redis://localhost:6379")
# Операции
await redis.set("key", "value")
value = await redis.get("key")
# С FastAPI
await redis.lpush("queue", "item1", "item2")
redis.close()
await redis.wait_closed()
# Кэширование для FastAPI
from fastapi import FastAPI
from functools import wraps
app = FastAPI()
def cached(expire: int):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = f"{func.__name__}:{args}:{kwargs}"
# Проверить кэш
result = await redis.get(key)
if result:
return json.loads(result)
# Вычислить
result = await func(*args, **kwargs)
await redis.setex(key, expire, json.dumps(result))
return result
return wrapper
return decorator
6. Elasticsearch-py (Elasticsearch)
Для полнотекстового поиска.
from elasticsearch import Elasticsearch
es = Elasticsearch([{"host": "localhost", "port": 9200}])
# Индексирование документа
es.index(index="articles", id=1, body={
"title": "Python Best Practices",
"content": "...",
"author": "John",
"published": "2024-01-01"
})
# Полнотекстовый поиск
results = es.search(index="articles", body={
"query": {
"multi_match": {
"query": "Python",
"fields": ["title", "content"]
}
}
})
# Фильтры и агрегации
results = es.search(index="articles", body={
"query": {
"bool": {
"must": [{"match": {"title": "Python"}}],
"filter": [{"range": {"published": {"gte": "2024-01-01"}}}]
}
},
"aggs": {
"authors": {"terms": {"field": "author"}}
}
})
7. Cassandra-driver (Apache Cassandra)
Для масштабируемых систем с высокой доступностью.
from cassandra.cluster import Cluster
cluster = Cluster(["127.0.0.1"])
session = cluster.connect("myapp")
# Создание таблицы
session.execute("""
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
name TEXT,
email TEXT,
created_at TIMESTAMP
)
""")
# Insert
from cassandra.util import uuid4
user_id = uuid4()
session.execute(
"INSERT INTO users (id, name, email) VALUES (%s, %s, %s)",
[user_id, "John", "john@example.com"]
)
# Select
rows = session.execute("SELECT * FROM users WHERE id = %s", [user_id])
for row in rows:
print(row.name)
Сравнение фреймворков
| Фреймворк | БД | Async | Типизация | Сложность |
|---|---|---|---|---|
| PyMongo | MongoDB | Нет | Нет | Средняя |
| Motor | MongoDB | Да | Нет | Средняя |
| Beanie | MongoDB | Да | Да | Низкая |
| Redis-py | Redis | Нет | Нет | Низкая |
| Aioredis | Redis | Да | Нет | Низкая |
| Elasticsearch-py | Elasticsearch | Нет | Нет | Средняя |
| Cassandra-driver | Cassandra | Нет | Нет | Высокая |
Рекомендации
Для MongoDB: Используй Beanie, это лучший выбор для современных приложений с типизацией.
Для Redis: Redis-py для sync, Aioredis для async.
Для Elasticsearch: Elasticsearch-py с DSL wrapper для сложных запросов.
Для Cassandra: Cassandra-driver для высоконагруженных систем.
Вывод: выбирай фреймворк в зависимости от требований к async и типизации. Для современных проектов всегда приоритет async и типизация.