Как посчитаешь количество вхождений слов в текстовом файле?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Подсчет вхождений слов в текстовом файле
Этот вопрос актуален при анализе отзывов, комментариев, feedback'а пользователей или любых текстовых данных в аналитике. Давайте разберемся в разных подходах от простых до продвинутых.
1. Простейший подход: bash команды
Для быстрого анализа маленьких файлов можно использовать стандартные Unix утилиты.
# Подсчет конкретного слова (с учетом регистра)
grep -o 'good' reviews.txt | wc -l
# Результат: 1542 вхождений слова 'good'
# Без учета регистра
grep -io 'good' reviews.txt | wc -l
# Подсчет частоты всех слов
tr ' ' '\n' < reviews.txt | sort | uniq -c | sort -rn | head -20
# Результат:
# 5432 the
# 3214 and
# 2891 is
# ...
# Удаляем пунктуацию и подсчитываем
tr -cs 'A-Za-z' '\n' < reviews.txt | sort | uniq -c | sort -rn | head -20
2. Python: базовый подход
def count_words_simple(filename: str) -> dict:
"""
Простой подсчет частоты слов
"""
word_counts = {}
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
# Преобразуем в нижний регистр и разбиваем на слова
words = line.lower().split()
for word in words:
# Удаляем пунктуацию
word = word.strip('.,!?;:"()[]{}\"')
if word: # Пропускаем пустые строки
word_counts[word] = word_counts.get(word, 0) + 1
# Сортируем по частоте
return dict(sorted(word_counts.items(), key=lambda x: x[1], reverse=True))
# Использование
word_freq = count_words_simple('reviews.txt')
for word, count in list(word_freq.items())[:20]:
print(f"{word}: {count}")
3. Python: с использованием встроенных инструментов
from collections import Counter
import re
from typing import Dict, List
def count_words_advanced(filename: str, min_length: int = 1) -> Dict[str, int]:
"""
Продвинутый подсчет с фильтрацией
"""
# Список стоп-слов (частые слова, которые не имеют смысла)
stop_words = {
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'of', 'is', 'are', 'was', 'were', 'be', 'been', 'being'
}
all_words = []
with open(filename, 'r', encoding='utf-8') as f:
text = f.read().lower()
# Удаляем пунктуацию и разбиваем на слова
words = re.findall(r'\b[a-z]+\b', text)
# Фильтруем: удаляем стоп-слова и короткие слова
for word in words:
if word not in stop_words and len(word) >= min_length:
all_words.append(word)
# Используем Counter для подсчета
word_counts = Counter(all_words)
return dict(word_counts.most_common(100))
# Использование
word_freq = count_words_advanced('reviews.txt')
for word, count in word_freq.items():
print(f"{word}: {count}")
4. Для работы с большими файлами: чтение по частям
def count_words_large_file(filename: str, chunk_size: int = 1024*1024) -> dict:
"""
Подсчет слов в больших файлах (не загружаем в памяти целиком)
"""
from collections import defaultdict
word_counts = defaultdict(int)
buffer = ""
with open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
# Добавляем к буферу
buffer += chunk
# Обрабатываем полные слова
lines = buffer.split('\n')
buffer = lines[-1] # Последняя неполная линия остается в буфере
for line in lines[:-1]:
words = line.lower().split()
for word in words:
word = word.strip('.,!?;:"()[]{}\"')
if word:
word_counts[word] += 1
# Обрабатываем оставшийся буфер
if buffer:
for word in buffer.lower().split():
word = word.strip('.,!?;:"()[]{}\"')
if word:
word_counts[word] += 1
return dict(sorted(word_counts.items(), key=lambda x: x[1], reverse=True))
5. SQL подход: анализ текстовых данных в БД
Если текстовые данные уже в базе данных (как в большинстве аналитических проектов):
-- Таблица с отзывами
CREATE TABLE reviews (
review_id UUID,
user_id UUID,
product_id UUID,
review_text TEXT,
created_at TIMESTAMPTZ
);
-- Подсчет частоты слов в PostgreSQL
WITH word_list AS (
SELECT
review_id,
regexp_split_to_table(lower(review_text), '\s+') as word
FROM reviews
WHERE review_text IS NOT NULL
),
clean_words AS (
SELECT
-- Удаляем пунктуацию
regexp_replace(word, '[^a-z0-9]', '', 'g') as clean_word
FROM word_list
WHERE word ~ '[a-z]' -- Только слова с буквами
)
SELECT
clean_word,
COUNT(*) as frequency,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage
FROM clean_words
WHERE LENGTH(clean_word) > 2 -- Только слова длиной > 2 букв
GROUP BY clean_word
HAVING COUNT(*) > 5 -- Только слова, которые встречаются > 5 раз
ORDER BY frequency DESC
LIMIT 100;
6. Анализ с использованием NATURAL LANGUAGE PROCESSING (NLP)
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from collections import Counter
# Загружаем необходимые ресурсы
nltk.download('stopwords')
nltk.download('punkt')
def count_words_nlp(filename: str) -> dict:
"""
Подсчет слов с использованием NLP
- Токенизация
- Удаление стоп-слов
- Стемминг (приведение к корню слова)
"""
stop_words = set(stopwords.words('english'))
stemmer = PorterStemmer()
with open(filename, 'r', encoding='utf-8') as f:
text = f.read()
# Токенизация
tokens = word_tokenize(text.lower())
# Фильтрация и стемминг
filtered_tokens = []
for token in tokens:
# Пропускаем пунктуацию и стоп-слова
if token.isalpha() and token not in stop_words:
# Применяем стемминг
stem = stemmer.stem(token)
filtered_tokens.append(stem)
# Подсчет
word_counts = Counter(filtered_tokens)
return dict(word_counts.most_common(100))
# Пример
results = count_words_nlp('reviews.txt')
for word, count in results.items():
print(f"{word}: {count}")
# С NLP:
# - 'running' и 'runs' считаются как одно слово 'run'
# - 'the', 'a', 'and' автоматически удаляются
# - Результаты более чистые и интерпретируемые
7. Анализ для разных язык (например, русский)
import pymorphy2
from collections import Counter
import re
def count_words_russian(filename: str) -> dict:
"""
Подсчет частоты русских слов с морфологическим анализом
"""
morph = pymorphy2.MorphAnalyzer()
with open(filename, 'r', encoding='utf-8') as f:
text = f.read().lower()
# Извлекаем слова (кириллица)
words = re.findall(r'[а-яё]+', text)
# Лемматизация (приведение к начальной форме)
lemmas = []
for word in words:
parsed = morph.parse(word)[0]
normal_form = parsed.normal_form
lemmas.append(normal_form)
# Подсчет
word_counts = Counter(lemmas)
return dict(word_counts.most_common(100))
# Пример
# 'красивый', 'красивого', 'красивому' -> 'красивый'
results = count_words_russian('reviews_ru.txt')
8. Практический пример: анализ отзывов в аналитике
class ReviewAnalyzer:
def __init__(self, db_connection):
self.conn = db_connection
def analyze_word_frequency(
self,
product_id: str,
date_from: str,
date_to: str,
top_n: int = 50
) -> dict:
"""
Анализ частоты слов в отзывах за период
"""
query = """
WITH words AS (
SELECT
regexp_split_to_table(
lower(review_text),
'\\s+'
) as word
FROM reviews
WHERE product_id = @product_id
AND created_at >= @date_from::TIMESTAMPTZ
AND created_at < @date_to::TIMESTAMPTZ
AND review_text IS NOT NULL
)
SELECT
regexp_replace(word, '[^a-z0-9]', '', 'g') as word,
COUNT(*) as count,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as pct
FROM words
WHERE LENGTH(word) > 2
GROUP BY word
HAVING COUNT(*) > 3
ORDER BY count DESC
LIMIT @top_n
"""
cursor = self.conn.cursor()
cursor.execute(query, {
'product_id': product_id,
'date_from': date_from,
'date_to': date_to,
'top_n': top_n
})
results = {}
for word, count, pct in cursor.fetchall():
results[word] = {'count': count, 'percentage': pct}
return results
def sentiment_word_analysis(self, review_ids: List[str]) -> dict:
"""
Анализ позитивных и негативных слов в отзывах
"""
positive_words = {'good', 'great', 'excellent', 'amazing', 'best',
'love', 'perfect', 'wonderful', 'awesome'}
negative_words = {'bad', 'terrible', 'awful', 'hate', 'worst',
'poor', 'broken', 'useless', 'disappointing'}
query = """
SELECT
review_id,
review_text,
rating
FROM reviews
WHERE review_id = ANY(@review_ids)
"""
cursor = self.conn.cursor()
cursor.execute(query, {'review_ids': review_ids})
analysis = {
'positive': {},
'negative': {},
'neutral': {}
}
for review_id, text, rating in cursor.fetchall():
words = text.lower().split()
for word in words:
clean_word = word.strip('.,!?;:')
if clean_word in positive_words:
analysis['positive'][clean_word] = analysis['positive'].get(clean_word, 0) + 1
elif clean_word in negative_words:
analysis['negative'][clean_word] = analysis['negative'].get(clean_word, 0) + 1
return analysis
# Использование
analyzer = ReviewAnalyzer(db_connection)
word_freq = analyzer.analyze_word_frequency(
product_id='product_123',
date_from='2024-01-01',
date_to='2024-03-31',
top_n=50
)
for word, data in word_freq.items():
print(f"{word}: {data['count']} ({data['percentage']}%)")
9. Оптимизация для больших объемов
from multiprocessing import Pool
import os
def count_words_parallel(filename: str, num_processes: int = 4) -> dict:
"""
Параллельный подсчет слов для очень больших файлов
"""
file_size = os.path.getsize(filename)
chunk_size = file_size // num_processes
def process_chunk(args):
filename, start, end = args
chunk_counts = {}
with open(filename, 'rb') as f:
f.seek(start)
# Пропускаем первый неполный байт
if start > 0:
f.readline()
while f.tell() < end:
line = f.readline().decode('utf-8', errors='ignore')
words = line.lower().split()
for word in words:
word = word.strip('.,!?;:"()[]{}\"')
if word:
chunk_counts[word] = chunk_counts.get(word, 0) + 1
return chunk_counts
# Подготавливаем chunks
chunks = [
(filename, i * chunk_size, (i + 1) * chunk_size)
for i in range(num_processes)
]
# Обрабатываем параллельно
with Pool(num_processes) as pool:
chunk_results = pool.map(process_chunk, chunks)
# Объединяем результаты
final_counts = {}
for chunk_counts in chunk_results:
for word, count in chunk_counts.items():
final_counts[word] = final_counts.get(word, 0) + count
return dict(sorted(final_counts.items(), key=lambda x: x[1], reverse=True))
10. Полный пример для продакшена
import logging
from datetime import datetime
from typing import Dict, Optional
logger = logging.getLogger(__name__)
class ProductionWordCounter:
def __init__(self, db_connection, cache=None):
self.conn = db_connection
self.cache = cache # Redis или другой кеш
def count_and_cache(
self,
product_id: str,
cache_ttl: int = 3600 # 1 час
) -> Dict[str, int]:
"""
Подсчет слов с кешированием результатов
"""
cache_key = f"word_freq:{product_id}"
# Проверяем кеш
if self.cache:
cached = self.cache.get(cache_key)
if cached:
logger.info(f"Cache hit for {product_id}")
return cached
# Если в кеше нет, считаем
results = self._count_from_db(product_id)
# Сохраняем в кеш
if self.cache:
self.cache.set(cache_key, results, ex=cache_ttl)
logger.info(f"Cached results for {product_id}")
return results
def _count_from_db(self, product_id: str) -> Dict[str, int]:
query = """
WITH words AS (
SELECT regexp_split_to_table(lower(review_text), '\\s+') as word
FROM reviews
WHERE product_id = @product_id
)
SELECT word, COUNT(*) as count
FROM words
WHERE LENGTH(word) > 2
GROUP BY word
ORDER BY count DESC
LIMIT 1000
"""
cursor = self.conn.cursor()
cursor.execute(query, {'product_id': product_id})
return {word: count for word, count in cursor.fetchall()}
Рекомендация: какой метод выбрать?
Для быстрого анализа маленького файла (< 100 MB):
- Bash команды или Python simple approach
- Время: < 1 сек
Для аналитики в базе данных:
- SQL запрос с window functions
- Масштабируется до миллиардов строк
- Можно добавить фильтры (дата, продукт, ос и т.д.)
Для качественного анализа текста:
- NLP подход с NLTK или spacy
- Стемминг/лемматизация
- Удаление стоп-слов
- Результаты более релевантные
Для production систем:
- SQL в БД + кеширование
- Параллельная обработка для очень больших объемов
- Мониторинг и логирование
Заключение
Подсчет слов — базовая операция, которая имеет множество вариаций в зависимости от контекста. Для быстрого анализа используй Python или bash, для production аналитики используй SQL, а для качественного NLP анализа используй специализированные библиотеки.