perf: improved NLP loading times

On cold run, takes less than 30s to run.
This commit is contained in:
2026-02-08 19:49:19 +00:00
parent dd917cc39e
commit 0d54e46ba3

View File

@@ -1,52 +1,160 @@
import torch
import pandas as pd
import numpy as np
from typing import Any
from transformers import pipeline
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
class NLP:
def __init__(self, df: pd.DataFrame, title_col: str, content_col: str, topics: dict):
_topic_models: dict[str, SentenceTransformer] = {}
_emotion_classifiers: dict[str, Any] = {}
_topic_embedding_cache: dict[tuple[str, ...], np.ndarray] = {}
def __init__(
self, df: pd.DataFrame, title_col: str, content_col: str, topics: dict
):
self.df = df
self.title_col = title_col
self.content_col = content_col
self.device = 0 if torch.cuda.is_available() else 1
self.use_cuda = torch.cuda.is_available()
self.device_str = "cuda" if self.use_cuda else "cpu"
self.pipeline_device = 0 if self.use_cuda else -1
# Topic model
self.topic_model = SentenceTransformer("all-mpnet-base-v2", device=self.device)
if self.use_cuda:
torch.set_float32_matmul_precision("high")
try:
self.topic_model = self._get_topic_model(self.device_str)
self.emotion_classifier = self._get_emotion_classifier(
self.device_str, self.pipeline_device
)
except RuntimeError as exc:
if self.use_cuda and "out of memory" in str(exc).lower():
torch.cuda.empty_cache()
self.use_cuda = False
self.device_str = "cpu"
self.pipeline_device = -1
self.topic_model = self._get_topic_model(self.device_str)
self.emotion_classifier = self._get_emotion_classifier(
self.device_str, self.pipeline_device
)
else:
raise
self.topic_batch_size = 128 if self.use_cuda else 32
self.emotion_batch_size = 96 if self.use_cuda else 16
self.topic_labels = list(topics.keys())
self.topic_texts = list(topics.values())
self.topic_embeddings = self.topic_model.encode(
self.topic_texts,
normalize_embeddings=True,
)
cache_key = tuple(self.topic_texts)
cached_embeddings = NLP._topic_embedding_cache.get(cache_key)
# emotion model
self.emotion_classifier = pipeline(
if cached_embeddings is None:
cached_embeddings = self._encode_with_backoff(
self.topic_texts, self.topic_batch_size
)
NLP._topic_embedding_cache[cache_key] = cached_embeddings
self.topic_embeddings = cached_embeddings
@classmethod
def _get_topic_model(cls, device_str: str) -> SentenceTransformer:
model = cls._topic_models.get(device_str)
if model is None:
model = SentenceTransformer("all-mpnet-base-v2", device=device_str)
cls._topic_models[device_str] = model
return model
@classmethod
def _get_emotion_classifier(cls, device_str: str, pipeline_device: int) -> Any:
classifier = cls._emotion_classifiers.get(device_str)
if classifier is None:
pipeline_kwargs = {
"top_k": None,
"truncation": True,
"device": pipeline_device,
}
if device_str == "cuda":
pipeline_kwargs["dtype"] = torch.float16
classifier = pipeline(
"text-classification",
model="j-hartmann/emotion-english-distilroberta-base",
top_k=None,
truncation=True,
device=self.device
**pipeline_kwargs,
)
cls._emotion_classifiers[device_str] = classifier
return classifier
def _encode_with_backoff(
self, texts: list[str], initial_batch_size: int
) -> np.ndarray:
batch_size = initial_batch_size
while True:
try:
return self.topic_model.encode(
texts,
normalize_embeddings=True,
show_progress_bar=False,
batch_size=batch_size,
convert_to_numpy=True,
)
except RuntimeError as exc:
if (
self.use_cuda
and "out of memory" in str(exc).lower()
and batch_size > 8
):
batch_size = max(8, batch_size // 2)
torch.cuda.empty_cache()
continue
raise
def _infer_emotions_with_backoff(
self, texts: list[str], initial_batch_size: int
) -> list[list[dict[str, Any]]]:
batch_size = initial_batch_size
while True:
try:
return self.emotion_classifier(texts, batch_size=batch_size)
except RuntimeError as exc:
if (
self.use_cuda
and "out of memory" in str(exc).lower()
and batch_size > 8
):
batch_size = max(8, batch_size // 2)
torch.cuda.empty_cache()
continue
raise
def add_emotion_cols(self) -> None:
texts = self.df[self.content_col].astype(str).str.slice(0, 512).tolist()
results = self.emotion_classifier(
texts,
batch_size=64
)
if not texts:
return
labels = [r["label"] for r in results[0]]
results = self._infer_emotions_with_backoff(texts, self.emotion_batch_size)
for label in labels:
self.df[f"emotion_{label}"] = [
next(item["score"] for item in row if item["label"] == label)
for row in results
]
rows: list[dict[str, float]] = []
for row in results:
score_map: dict[str, float] = {}
for item in row:
label = item.get("label")
score = item.get("score")
if isinstance(label, str) and isinstance(score, (int, float)):
score_map[label] = float(score)
rows.append(score_map)
emotion_df = pd.DataFrame(rows).fillna(0.0).add_prefix("emotion_")
for column in emotion_df.columns:
self.df[column] = emotion_df[column].values
for column in self.df.columns:
if column.startswith("emotion_") and column not in emotion_df.columns:
self.df[column] = 0.0
def add_topic_col(self, confidence_threshold: float = 0.3) -> None:
titles = self.df[self.title_col].fillna("").astype(str)
@@ -57,18 +165,22 @@ class NLP:
for title, content in zip(titles, contents)
]
text_embeddings = self.topic_model.encode(
texts,
normalize_embeddings=True,
)
if not texts:
self.df["topic"] = []
self.df["topic_confidence"] = []
return
# Similarity
sims = cosine_similarity(text_embeddings, self.topic_embeddings)
text_embeddings = self._encode_with_backoff(texts, self.topic_batch_size)
# cosine similarity is a dot product for normalized vectors
sims = np.matmul(text_embeddings, self.topic_embeddings.T)
# Best match
best_idx = sims.argmax(axis=1)
best_score = sims.max(axis=1)
best_score = sims[np.arange(len(sims)), best_idx]
self.df["topic"] = [self.topic_labels[i] for i in best_idx]
self.df["topic_confidence"] = best_score
self.df.loc[self.df["topic_confidence"] < confidence_threshold, "topic"] = "Misc"
self.df.loc[self.df["topic_confidence"] < confidence_threshold, "topic"] = (
"Misc"
)