refactor: extract interaction and linguistic analysis into dedicated classes

This commit is contained in:
2026-02-17 18:00:16 +00:00
parent 83010aee55
commit d27ba3fca4
3 changed files with 173 additions and 143 deletions

View File

@@ -1,13 +1,13 @@
import pandas as pd
import re
import nltk
import datetime
import nltk
from nltk.corpus import stopwords
from collections import Counter
from server.nlp import NLP
from server.analysis.temporal import TemporalAnalysis
from server.analysis.emotional import EmotionalAnalysis
from server.analysis.interactional import InteractionAnalysis
from server.analysis.linguistic import LinguisticAnalysis
DOMAIN_STOPWORDS = {
"www", "https", "http",
@@ -43,6 +43,8 @@ class StatGen:
self.temporal_analysis = TemporalAnalysis(self.df)
self.emotional_analysis = EmotionalAnalysis(self.df)
self.interaction_analysis = InteractionAnalysis(self.df, EXCLUDE_WORDS)
self.linguistic_analysis = LinguisticAnalysis(self.df, EXCLUDE_WORDS)
self.original_df = self.df.copy(deep=True)
@@ -56,73 +58,7 @@ class StatGen:
self.nlp.add_emotion_cols()
self.nlp.add_topic_col()
def _tokenize(self, text: str):
tokens = re.findall(r"\b[a-z]{3,}\b", text)
return [t for t in tokens if t not in EXCLUDE_WORDS]
def _vocab_richness_per_user(self, min_words: int = 20, top_most_used_words: int = 100) -> list:
df = self.df.copy()
df["content"] = df["content"].fillna("").astype(str).str.lower()
df["tokens"] = df["content"].apply(self._tokenize)
rows = []
for author, group in df.groupby("author"):
all_tokens = [t for tokens in group["tokens"] for t in tokens]
total_words = len(all_tokens)
unique_words = len(set(all_tokens))
events = len(group)
# Min amount of words for a user, any less than this might give weird results
if total_words < min_words:
continue
# 100% = they never reused a word (excluding stop words)
vocab_richness = unique_words / total_words
avg_words = total_words / max(events, 1)
counts = Counter(all_tokens)
top_words = [
{"word": w, "count": int(c)}
for w, c in counts.most_common(top_most_used_words)
]
rows.append({
"author": author,
"events": int(events),
"total_words": int(total_words),
"unique_words": int(unique_words),
"vocab_richness": round(vocab_richness, 3),
"avg_words_per_event": round(avg_words, 2),
"top_words": top_words
})
rows = sorted(rows, key=lambda x: x["vocab_richness"], reverse=True)
return rows
def _interaction_graph(self):
interactions = {a: {} for a in self.df["author"].dropna().unique()}
# reply_to refers to the comment id, this allows us to map comment ids to usernames
id_to_author = self.df.set_index("id")["author"].to_dict()
for _, row in self.df.iterrows():
a = row["author"]
reply_id = row["reply_to"]
if pd.isna(a) or pd.isna(reply_id) or reply_id == "":
continue
b = id_to_author.get(reply_id)
if b is None or a == b:
continue
interactions[a][b] = interactions[a].get(b, 0) + 1
return interactions
## Public
def time_analysis(self) -> pd.DataFrame:
return {
@@ -150,87 +86,18 @@ class StatGen:
"sources": self.df["source"].dropna().unique().tolist()
}
def content_analysis(self, limit: int = 100) -> dict:
texts = (
self.df["content"]
.dropna()
.astype(str)
.str.lower()
)
words = []
for text in texts:
tokens = re.findall(r"\b[a-z]{3,}\b", text)
words.extend(
w for w in tokens
if w not in EXCLUDE_WORDS
)
counts = Counter(words)
word_frequencies = (
pd.DataFrame(counts.items(), columns=["word", "count"])
.sort_values("count", ascending=False)
.head(limit)
.reset_index(drop=True)
)
def content_analysis(self) -> dict:
return {
"word_frequencies": word_frequencies.to_dict(orient='records'),
"word_frequencies": self.linguistic_analysis.word_frequencies(),
"average_emotion_by_topic": self.emotional_analysis.avg_emotion_by_topic(),
"reply_time_by_emotion": self.temporal_analysis.avg_reply_time_per_emotion()
}
def user_analysis(self) -> dict:
counts = (
self.df.groupby(["author", "source"])
.size()
.sort_values(ascending=False)
)
top_users = [
{"author": author, "source": source, "count": int(count)}
for (author, source), count in counts.items()
]
per_user = (
self.df.groupby(["author", "type"])
.size()
.unstack(fill_value=0)
)
# ensure columns always exist
for col in ("post", "comment"):
if col not in per_user.columns:
per_user[col] = 0
per_user["comment_post_ratio"] = per_user["comment"] / per_user["post"].replace(0, 1)
per_user["comment_share"] = per_user["comment"] / (per_user["post"] + per_user["comment"]).replace(0, 1)
per_user = per_user.sort_values("comment_post_ratio", ascending=True)
per_user_records = per_user.reset_index().to_dict(orient="records")
vocab_rows = self._vocab_richness_per_user()
vocab_by_author = {row["author"]: row for row in vocab_rows}
# merge vocab richness + per_user information
merged_users = []
for row in per_user_records:
author = row["author"]
merged_users.append({
"author": author,
"post": int(row.get("post", 0)),
"comment": int(row.get("comment", 0)),
"comment_post_ratio": float(row.get("comment_post_ratio", 0)),
"comment_share": float(row.get("comment_share", 0)),
"vocab": vocab_by_author.get(author)
})
merged_users.sort(key=lambda u: u["comment_post_ratio"])
return {
"top_users": top_users,
"users": merged_users,
"interaction_graph": self._interaction_graph()
"top_users": self.interaction_analysis.top_users(),
"users": self.interaction_analysis.per_user_analysis(),
"interaction_graph": self.interaction_analysis.interaction_graph()
}
def search(self, search_query: str) -> dict: