279 lines
9.4 KiB
Python
279 lines
9.4 KiB
Python
import torch
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from typing import Any
|
|
from transformers import pipeline
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
class NLP:
|
|
_topic_models: dict[str, SentenceTransformer] = {}
|
|
_emotion_classifiers: dict[str, Any] = {}
|
|
_entity_recognizers: dict[str, Any] = {}
|
|
_topic_embedding_cache: dict[tuple[str, ...], np.ndarray] = {}
|
|
|
|
def __init__(
|
|
self, df: pd.DataFrame, title_col: str, content_col: str, topics: dict
|
|
):
|
|
self.df = df
|
|
self.title_col = title_col
|
|
self.content_col = content_col
|
|
self.use_cuda = torch.cuda.is_available()
|
|
self.device_str = "cuda" if self.use_cuda else "cpu"
|
|
self.pipeline_device = 0 if self.use_cuda else -1
|
|
|
|
if self.use_cuda:
|
|
torch.set_float32_matmul_precision("high")
|
|
|
|
try:
|
|
self.topic_model = self._get_topic_model(self.device_str)
|
|
self.emotion_classifier = self._get_emotion_classifier(
|
|
self.device_str, self.pipeline_device
|
|
)
|
|
self.entity_recognizer = self._get_entity_recognizer(
|
|
self.device_str, self.pipeline_device
|
|
)
|
|
except RuntimeError as exc:
|
|
if self.use_cuda and "out of memory" in str(exc).lower():
|
|
torch.cuda.empty_cache()
|
|
self.use_cuda = False
|
|
self.device_str = "cpu"
|
|
self.pipeline_device = -1
|
|
self.topic_model = self._get_topic_model(self.device_str)
|
|
self.emotion_classifier = self._get_emotion_classifier(
|
|
self.device_str, self.pipeline_device
|
|
)
|
|
else:
|
|
raise
|
|
|
|
self.topic_batch_size = 128 if self.use_cuda else 32
|
|
self.emotion_batch_size = 96 if self.use_cuda else 16
|
|
|
|
self.topic_labels = list(topics.keys())
|
|
self.topic_texts = list(topics.values())
|
|
|
|
cache_key = tuple(self.topic_texts)
|
|
cached_embeddings = NLP._topic_embedding_cache.get(cache_key)
|
|
|
|
if cached_embeddings is None:
|
|
cached_embeddings = self._encode_with_backoff(
|
|
self.topic_texts, self.topic_batch_size
|
|
)
|
|
NLP._topic_embedding_cache[cache_key] = cached_embeddings
|
|
|
|
self.topic_embeddings = cached_embeddings
|
|
|
|
@classmethod
|
|
def _get_topic_model(cls, device_str: str) -> SentenceTransformer:
|
|
model = cls._topic_models.get(device_str)
|
|
if model is None:
|
|
model = SentenceTransformer("all-MiniLM-L6-v2", device=device_str)
|
|
cls._topic_models[device_str] = model
|
|
return model
|
|
|
|
@classmethod
|
|
def _get_emotion_classifier(cls, device_str: str, pipeline_device: int) -> Any:
|
|
classifier = cls._emotion_classifiers.get(device_str)
|
|
if classifier is None:
|
|
pipeline_kwargs = {
|
|
"top_k": None,
|
|
"truncation": True,
|
|
"device": pipeline_device,
|
|
}
|
|
if device_str == "cuda":
|
|
pipeline_kwargs["dtype"] = torch.float16
|
|
|
|
classifier = pipeline(
|
|
"text-classification",
|
|
model="j-hartmann/emotion-english-distilroberta-base",
|
|
**pipeline_kwargs,
|
|
)
|
|
cls._emotion_classifiers[device_str] = classifier
|
|
return classifier
|
|
|
|
@classmethod
|
|
def _get_entity_recognizer(cls, device_str: str, pipeline_device: int) -> Any:
|
|
recognizer = cls._entity_recognizers.get(device_str)
|
|
if recognizer is None:
|
|
pipeline_kwargs = {
|
|
"aggregation_strategy": "simple", # merges subwords
|
|
"device": pipeline_device,
|
|
}
|
|
|
|
if device_str == "cuda":
|
|
pipeline_kwargs["dtype"] = torch.float16
|
|
|
|
recognizer = pipeline(
|
|
"token-classification",
|
|
model="dslim/bert-base-NER",
|
|
**pipeline_kwargs,
|
|
)
|
|
cls._entity_recognizers[device_str] = recognizer
|
|
|
|
return recognizer
|
|
|
|
def _encode_with_backoff(
|
|
self, texts: list[str], initial_batch_size: int
|
|
) -> np.ndarray:
|
|
batch_size = initial_batch_size
|
|
while True:
|
|
try:
|
|
return self.topic_model.encode(
|
|
texts,
|
|
normalize_embeddings=True,
|
|
show_progress_bar=False,
|
|
batch_size=batch_size,
|
|
convert_to_numpy=True,
|
|
)
|
|
except RuntimeError as exc:
|
|
if (
|
|
self.use_cuda
|
|
and "out of memory" in str(exc).lower()
|
|
and batch_size > 8
|
|
):
|
|
batch_size = max(8, batch_size // 2)
|
|
torch.cuda.empty_cache()
|
|
continue
|
|
raise
|
|
|
|
def _infer_emotions_with_backoff(
|
|
self, texts: list[str], initial_batch_size: int
|
|
) -> list[list[dict[str, Any]]]:
|
|
batch_size = initial_batch_size
|
|
while True:
|
|
try:
|
|
return self.emotion_classifier(texts, batch_size=batch_size)
|
|
except RuntimeError as exc:
|
|
if (
|
|
self.use_cuda
|
|
and "out of memory" in str(exc).lower()
|
|
and batch_size > 8
|
|
):
|
|
batch_size = max(8, batch_size // 2)
|
|
torch.cuda.empty_cache()
|
|
continue
|
|
raise
|
|
|
|
def _infer_entities_with_backoff(
|
|
self, texts: list[str], initial_batch_size: int
|
|
) -> list[list[dict[str, Any]]]:
|
|
|
|
batch_size = initial_batch_size
|
|
|
|
while True:
|
|
try:
|
|
return self.entity_recognizer(texts, batch_size=batch_size)
|
|
except RuntimeError as exc:
|
|
if (
|
|
self.use_cuda
|
|
and "out of memory" in str(exc).lower()
|
|
and batch_size > 4
|
|
):
|
|
batch_size = max(4, batch_size // 2)
|
|
torch.cuda.empty_cache()
|
|
continue
|
|
raise
|
|
|
|
def add_emotion_cols(self) -> None:
|
|
texts = self.df[self.content_col].astype(str).str.slice(0, 512).tolist()
|
|
|
|
if not texts:
|
|
return
|
|
|
|
results = self._infer_emotions_with_backoff(texts, self.emotion_batch_size)
|
|
|
|
rows: list[dict[str, float]] = []
|
|
for row in results:
|
|
score_map: dict[str, float] = {}
|
|
for item in row:
|
|
label = item.get("label")
|
|
score = item.get("score")
|
|
if isinstance(label, str) and isinstance(score, (int, float)):
|
|
score_map[label] = float(score)
|
|
rows.append(score_map)
|
|
|
|
emotion_df = pd.DataFrame(rows).fillna(0.0).add_prefix("emotion_")
|
|
for column in emotion_df.columns:
|
|
self.df[column] = emotion_df[column].values
|
|
|
|
for column in self.df.columns:
|
|
if column.startswith("emotion_") and column not in emotion_df.columns:
|
|
self.df[column] = 0.0
|
|
|
|
def add_topic_col(self, confidence_threshold: float = 0.3) -> None:
|
|
titles = self.df[self.title_col].fillna("").astype(str)
|
|
contents = self.df[self.content_col].fillna("").astype(str)
|
|
|
|
texts = [
|
|
f"{title}. {content}" if title else content
|
|
for title, content in zip(titles, contents)
|
|
]
|
|
|
|
if not texts:
|
|
self.df["topic"] = []
|
|
self.df["topic_confidence"] = []
|
|
return
|
|
|
|
text_embeddings = self._encode_with_backoff(texts, self.topic_batch_size)
|
|
|
|
# cosine similarity is a dot product for normalized vectors
|
|
sims = np.matmul(text_embeddings, self.topic_embeddings.T)
|
|
|
|
# Best match
|
|
best_idx = sims.argmax(axis=1)
|
|
best_score = sims[np.arange(len(sims)), best_idx]
|
|
|
|
self.df["topic"] = [self.topic_labels[i] for i in best_idx]
|
|
self.df["topic_confidence"] = best_score
|
|
self.df.loc[self.df["topic_confidence"] < confidence_threshold, "topic"] = (
|
|
"Misc"
|
|
)
|
|
|
|
def add_ner_cols(self, max_chars: int = 512) -> None:
|
|
texts = (
|
|
self.df[self.content_col]
|
|
.fillna("")
|
|
.astype(str)
|
|
.str.slice(0, max_chars)
|
|
.tolist()
|
|
)
|
|
|
|
if not texts:
|
|
self.df["entities"] = []
|
|
self.df["entity_counts"] = []
|
|
return
|
|
|
|
results = self._infer_entities_with_backoff(texts, 32 if self.use_cuda else 8)
|
|
|
|
entity_lists = []
|
|
entity_count_dicts = []
|
|
|
|
for row in results:
|
|
entities = []
|
|
counts = {}
|
|
|
|
for ent in row:
|
|
word = ent.get("word")
|
|
label = ent.get("entity_group")
|
|
|
|
if isinstance(word, str) and isinstance(label, str):
|
|
entities.append({"text": word, "label": label})
|
|
counts[label] = counts.get(label, 0) + 1
|
|
|
|
entity_lists.append(entities)
|
|
entity_count_dicts.append(counts)
|
|
|
|
self.df["entities"] = entity_lists
|
|
self.df["entity_counts"] = entity_count_dicts
|
|
|
|
# Expand label counts into columns
|
|
all_labels = set()
|
|
for d in entity_count_dicts:
|
|
all_labels.update(d.keys())
|
|
|
|
for label in all_labels:
|
|
col_name = f"entity_{label}"
|
|
self.df[col_name] = [
|
|
d.get(label, 0) for d in entity_count_dicts
|
|
]
|