These emotions often dominate due to the writing style of online users and don't provide an accurate picture of the sentiment.
310 lines
10 KiB
Python
310 lines
10 KiB
Python
import torch
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from typing import Any
|
|
from transformers import pipeline
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
class NLP:
|
|
_topic_models: dict[str, SentenceTransformer] = {}
|
|
_emotion_classifiers: dict[str, Any] = {}
|
|
_entity_recognizers: dict[str, Any] = {}
|
|
_topic_embedding_cache: dict[tuple[str, ...], np.ndarray] = {}
|
|
|
|
def __init__(
|
|
self, df: pd.DataFrame, title_col: str, content_col: str, topics: dict
|
|
):
|
|
self.df = df
|
|
self.title_col = title_col
|
|
self.content_col = content_col
|
|
self.use_cuda = torch.cuda.is_available()
|
|
self.device_str = "cuda" if self.use_cuda else "cpu"
|
|
self.pipeline_device = 0 if self.use_cuda else -1
|
|
|
|
if self.use_cuda:
|
|
torch.set_float32_matmul_precision("high")
|
|
|
|
try:
|
|
self.topic_model = self._get_topic_model(self.device_str)
|
|
self.emotion_classifier = self._get_emotion_classifier(
|
|
self.device_str, self.pipeline_device
|
|
)
|
|
self.entity_recognizer = self._get_entity_recognizer(
|
|
self.device_str, self.pipeline_device
|
|
)
|
|
except RuntimeError as exc:
|
|
if self.use_cuda and "out of memory" in str(exc).lower():
|
|
torch.cuda.empty_cache()
|
|
self.use_cuda = False
|
|
self.device_str = "cpu"
|
|
self.pipeline_device = -1
|
|
self.topic_model = self._get_topic_model(self.device_str)
|
|
self.emotion_classifier = self._get_emotion_classifier(
|
|
self.device_str, self.pipeline_device
|
|
)
|
|
else:
|
|
raise
|
|
|
|
self.topic_batch_size = 128 if self.use_cuda else 32
|
|
self.emotion_batch_size = 96 if self.use_cuda else 16
|
|
|
|
self.topic_labels = list(topics.keys())
|
|
self.topic_texts = list(topics.values())
|
|
|
|
cache_key = tuple(self.topic_texts)
|
|
cached_embeddings = NLP._topic_embedding_cache.get(cache_key)
|
|
|
|
if cached_embeddings is None:
|
|
cached_embeddings = self._encode_with_backoff(
|
|
self.topic_texts, self.topic_batch_size
|
|
)
|
|
NLP._topic_embedding_cache[cache_key] = cached_embeddings
|
|
|
|
self.topic_embeddings = cached_embeddings
|
|
|
|
@classmethod
|
|
def _get_topic_model(cls, device_str: str) -> SentenceTransformer:
|
|
model = cls._topic_models.get(device_str)
|
|
if model is None:
|
|
model = SentenceTransformer("all-MiniLM-L6-v2", device=device_str)
|
|
cls._topic_models[device_str] = model
|
|
return model
|
|
|
|
@classmethod
|
|
def _get_emotion_classifier(cls, device_str: str, pipeline_device: int) -> Any:
|
|
classifier = cls._emotion_classifiers.get(device_str)
|
|
if classifier is None:
|
|
pipeline_kwargs = {
|
|
"top_k": None,
|
|
"truncation": True,
|
|
"device": pipeline_device,
|
|
}
|
|
if device_str == "cuda":
|
|
pipeline_kwargs["dtype"] = torch.float16
|
|
|
|
classifier = pipeline(
|
|
"text-classification",
|
|
model="j-hartmann/emotion-english-distilroberta-base",
|
|
**pipeline_kwargs,
|
|
)
|
|
cls._emotion_classifiers[device_str] = classifier
|
|
return classifier
|
|
|
|
@classmethod
|
|
def _get_entity_recognizer(cls, device_str: str, pipeline_device: int) -> Any:
|
|
recognizer = cls._entity_recognizers.get(device_str)
|
|
if recognizer is None:
|
|
pipeline_kwargs = {
|
|
"aggregation_strategy": "simple", # merges subwords
|
|
"device": pipeline_device,
|
|
}
|
|
|
|
if device_str == "cuda":
|
|
pipeline_kwargs["dtype"] = torch.float16
|
|
|
|
recognizer = pipeline(
|
|
"token-classification",
|
|
model="dslim/bert-base-NER",
|
|
**pipeline_kwargs,
|
|
)
|
|
cls._entity_recognizers[device_str] = recognizer
|
|
|
|
return recognizer
|
|
|
|
def _encode_with_backoff(
|
|
self, texts: list[str], initial_batch_size: int
|
|
) -> np.ndarray:
|
|
batch_size = initial_batch_size
|
|
while True:
|
|
try:
|
|
return self.topic_model.encode(
|
|
texts,
|
|
normalize_embeddings=True,
|
|
show_progress_bar=False,
|
|
batch_size=batch_size,
|
|
convert_to_numpy=True,
|
|
)
|
|
except RuntimeError as exc:
|
|
if (
|
|
self.use_cuda
|
|
and "out of memory" in str(exc).lower()
|
|
and batch_size > 8
|
|
):
|
|
batch_size = max(8, batch_size // 2)
|
|
torch.cuda.empty_cache()
|
|
continue
|
|
raise
|
|
|
|
def _infer_emotions_with_backoff(
|
|
self, texts: list[str], initial_batch_size: int
|
|
) -> list[list[dict[str, Any]]]:
|
|
batch_size = initial_batch_size
|
|
while True:
|
|
try:
|
|
return self.emotion_classifier(texts, batch_size=batch_size)
|
|
except RuntimeError as exc:
|
|
if (
|
|
self.use_cuda
|
|
and "out of memory" in str(exc).lower()
|
|
and batch_size > 8
|
|
):
|
|
batch_size = max(8, batch_size // 2)
|
|
torch.cuda.empty_cache()
|
|
continue
|
|
raise
|
|
|
|
def _infer_entities_with_backoff(
|
|
self, texts: list[str], initial_batch_size: int
|
|
) -> list[list[dict[str, Any]]]:
|
|
|
|
batch_size = initial_batch_size
|
|
|
|
while True:
|
|
try:
|
|
return self.entity_recognizer(texts, batch_size=batch_size)
|
|
except RuntimeError as exc:
|
|
if (
|
|
self.use_cuda
|
|
and "out of memory" in str(exc).lower()
|
|
and batch_size > 4
|
|
):
|
|
batch_size = max(4, batch_size // 2)
|
|
torch.cuda.empty_cache()
|
|
continue
|
|
raise
|
|
|
|
def add_emotion_cols(self) -> None:
|
|
texts = self.df[self.content_col].astype(str).str.slice(0, 512).tolist()
|
|
|
|
if not texts:
|
|
return
|
|
|
|
results = self._infer_emotions_with_backoff(texts, self.emotion_batch_size)
|
|
|
|
rows: list[dict[str, float]] = []
|
|
for row in results:
|
|
score_map: dict[str, float] = {}
|
|
for item in row:
|
|
label = item.get("label")
|
|
score = item.get("score")
|
|
if isinstance(label, str) and isinstance(score, (int, float)):
|
|
score_map[label] = float(score)
|
|
rows.append(score_map)
|
|
|
|
emotion_df = pd.DataFrame(rows).fillna(0.0).add_prefix("emotion_")
|
|
for column in emotion_df.columns:
|
|
self.df[column] = emotion_df[column].values
|
|
|
|
for column in self.df.columns:
|
|
if column.startswith("emotion_") and column not in emotion_df.columns:
|
|
self.df[column] = 0.0
|
|
|
|
# drop neutral and surprise columns from df and normalize others to sum to 1
|
|
drop_cols = ["emotion_neutral", "emotion_surprise"]
|
|
|
|
existing_drop = [c for c in drop_cols if c in self.df.columns]
|
|
self.df.drop(columns=existing_drop, inplace=True)
|
|
|
|
remaining_emotion_cols = [
|
|
c for c in self.df.columns
|
|
if c.startswith("emotion_")
|
|
]
|
|
|
|
if remaining_emotion_cols:
|
|
emotion_matrix = (
|
|
self.df[remaining_emotion_cols]
|
|
.apply(pd.to_numeric, errors="coerce")
|
|
.fillna(0.0)
|
|
)
|
|
|
|
row_sums = emotion_matrix.sum(axis=1)
|
|
|
|
# Avoid division by zero
|
|
row_sums = row_sums.replace(0, 1.0)
|
|
|
|
normalized = emotion_matrix.div(row_sums, axis=0)
|
|
|
|
self.df[remaining_emotion_cols] = normalized.values
|
|
|
|
|
|
|
|
def add_topic_col(self, confidence_threshold: float = 0.3) -> None:
|
|
titles = self.df[self.title_col].fillna("").astype(str)
|
|
contents = self.df[self.content_col].fillna("").astype(str)
|
|
|
|
texts = [
|
|
f"{title}. {content}" if title else content
|
|
for title, content in zip(titles, contents)
|
|
]
|
|
|
|
if not texts:
|
|
self.df["topic"] = []
|
|
self.df["topic_confidence"] = []
|
|
return
|
|
|
|
text_embeddings = self._encode_with_backoff(texts, self.topic_batch_size)
|
|
|
|
# cosine similarity is a dot product for normalized vectors
|
|
sims = np.matmul(text_embeddings, self.topic_embeddings.T)
|
|
|
|
# Best match
|
|
best_idx = sims.argmax(axis=1)
|
|
best_score = sims[np.arange(len(sims)), best_idx]
|
|
|
|
self.df["topic"] = [self.topic_labels[i] for i in best_idx]
|
|
self.df["topic_confidence"] = best_score
|
|
self.df.loc[self.df["topic_confidence"] < confidence_threshold, "topic"] = (
|
|
"Misc"
|
|
)
|
|
|
|
def add_ner_cols(self, max_chars: int = 512) -> None:
|
|
texts = (
|
|
self.df[self.content_col]
|
|
.fillna("")
|
|
.astype(str)
|
|
.str.slice(0, max_chars)
|
|
.tolist()
|
|
)
|
|
|
|
if not texts:
|
|
self.df["entities"] = []
|
|
self.df["entity_counts"] = []
|
|
return
|
|
|
|
results = self._infer_entities_with_backoff(texts, 32 if self.use_cuda else 8)
|
|
|
|
entity_lists = []
|
|
entity_count_dicts = []
|
|
|
|
for row in results:
|
|
entities = []
|
|
counts = {}
|
|
|
|
for ent in row:
|
|
word = ent.get("word")
|
|
label = ent.get("entity_group")
|
|
|
|
if isinstance(word, str) and isinstance(label, str):
|
|
entities.append({"text": word, "label": label})
|
|
counts[label] = counts.get(label, 0) + 1
|
|
|
|
entity_lists.append(entities)
|
|
entity_count_dicts.append(counts)
|
|
|
|
self.df["entities"] = entity_lists
|
|
self.df["entity_counts"] = entity_count_dicts
|
|
|
|
# Expand label counts into columns
|
|
all_labels = set()
|
|
for d in entity_count_dicts:
|
|
all_labels.update(d.keys())
|
|
|
|
for label in all_labels:
|
|
col_name = f"entity_{label}"
|
|
self.df[col_name] = [
|
|
d.get(label, 0) for d in entity_count_dicts
|
|
]
|
|
|
|
|