Files
crosspost/server/stat_gen.py

308 lines
9.9 KiB
Python

import pandas as pd
import re
import nltk
import datetime
from nltk.corpus import stopwords
from collections import Counter
from server.nlp import NLP
from server.analysis.temporal import TemporalAnalysis
DOMAIN_STOPWORDS = {
"www", "https", "http",
"boards", "boardsie",
"comment", "comments",
"discussion", "thread",
"post", "posts",
"would", "could", "should",
"like", "get", "one"
}
nltk.download('stopwords')
EXCLUDE_WORDS = set(stopwords.words('english')) | DOMAIN_STOPWORDS
class StatGen:
def __init__(self, df: pd.DataFrame, domain_topics: dict) -> None:
comments_df = df[["id", "comments"]].explode("comments")
comments_df = comments_df[comments_df["comments"].apply(lambda x: isinstance(x, dict))]
comments_df = pd.json_normalize(comments_df["comments"])
posts_df = df.drop(columns=["comments"])
posts_df["type"] = "post"
posts_df["parent_id"] = None
comments_df["type"] = "comment"
comments_df["parent_id"] = comments_df.get("post_id")
self.domain_topics = domain_topics
self.df = pd.concat([posts_df, comments_df])
self.df.drop(columns=["post_id"], inplace=True, errors="ignore")
self.nlp = NLP(self.df, "title", "content", domain_topics)
self._add_extra_cols(self.df)
self.temporal_analysis = TemporalAnalysis(self.df)
self.original_df = self.df.copy(deep=True)
## Private Methods
def _add_extra_cols(self, df: pd.DataFrame) -> None:
df['timestamp'] = pd.to_numeric(self.df['timestamp'], errors='coerce')
df['date'] = pd.to_datetime(df['timestamp'], unit='s').dt.date
df["dt"] = pd.to_datetime(df["timestamp"], unit="s", utc=True)
df["hour"] = df["dt"].dt.hour
df["weekday"] = df["dt"].dt.day_name()
self.nlp.add_emotion_cols()
self.nlp.add_topic_col()
def _tokenize(self, text: str):
tokens = re.findall(r"\b[a-z]{3,}\b", text)
return [t for t in tokens if t not in EXCLUDE_WORDS]
def _vocab_richness_per_user(self, min_words: int = 20, top_most_used_words: int = 100) -> list:
df = self.df.copy()
df["content"] = df["content"].fillna("").astype(str).str.lower()
df["tokens"] = df["content"].apply(self._tokenize)
rows = []
for author, group in df.groupby("author"):
all_tokens = [t for tokens in group["tokens"] for t in tokens]
total_words = len(all_tokens)
unique_words = len(set(all_tokens))
events = len(group)
# Min amount of words for a user, any less than this might give weird results
if total_words < min_words:
continue
# 100% = they never reused a word (excluding stop words)
vocab_richness = unique_words / total_words
avg_words = total_words / max(events, 1)
counts = Counter(all_tokens)
top_words = [
{"word": w, "count": int(c)}
for w, c in counts.most_common(top_most_used_words)
]
rows.append({
"author": author,
"events": int(events),
"total_words": int(total_words),
"unique_words": int(unique_words),
"vocab_richness": round(vocab_richness, 3),
"avg_words_per_event": round(avg_words, 2),
"top_words": top_words
})
rows = sorted(rows, key=lambda x: x["vocab_richness"], reverse=True)
return rows
def _interaction_graph(self):
interactions = {a: {} for a in self.df["author"].dropna().unique()}
# reply_to refers to the comment id, this allows us to map comment ids to usernames
id_to_author = self.df.set_index("id")["author"].to_dict()
for _, row in self.df.iterrows():
a = row["author"]
reply_id = row["reply_to"]
if pd.isna(a) or pd.isna(reply_id) or reply_id == "":
continue
b = id_to_author.get(reply_id)
if b is None or a == b:
continue
interactions[a][b] = interactions[a].get(b, 0) + 1
return interactions
## Public
def time_analysis(self) -> pd.DataFrame:
return {
"events_per_day": self.temporal_analysis.posts_per_day(),
"weekday_hour_heatmap": self.temporal_analysis.heatmap()
}
def summary(self) -> dict:
total_posts = (self.df["type"] == "post").sum()
total_comments = (self.df["type"] == "comment").sum()
events_per_user = self.df.groupby("author").size()
return {
"total_events": int(len(self.df)),
"total_posts": int(total_posts),
"total_comments": int(total_comments),
"unique_users": int(events_per_user.count()),
"comments_per_post": round(total_comments / max(total_posts, 1), 2),
"lurker_ratio": round((events_per_user == 1).mean(), 2),
"time_range": {
"start": int(self.df["dt"].min().timestamp()),
"end": int(self.df["dt"].max().timestamp())
},
"sources": self.df["source"].dropna().unique().tolist()
}
def content_analysis(self, limit: int = 100) -> dict:
texts = (
self.df["content"]
.dropna()
.astype(str)
.str.lower()
)
words = []
for text in texts:
tokens = re.findall(r"\b[a-z]{3,}\b", text)
words.extend(
w for w in tokens
if w not in EXCLUDE_WORDS
)
counts = Counter(words)
word_frequencies = (
pd.DataFrame(counts.items(), columns=["word", "count"])
.sort_values("count", ascending=False)
.head(limit)
.reset_index(drop=True)
)
emotion_exclusions = [
"emotion_neutral",
"emotion_surprise"
]
emotion_cols = [
col for col in self.df.columns
if col.startswith("emotion_") and col not in emotion_exclusions
]
counts = (
self.df[
(self.df["topic"] != "Misc")
]
.groupby("topic")
.size()
.rename("n")
)
avg_emotion_by_topic = (
self.df[
(self.df["topic"] != "Misc")
]
.groupby("topic")[emotion_cols]
.mean()
.reset_index()
)
avg_emotion_by_topic = avg_emotion_by_topic.merge(
counts,
on="topic"
)
return {
"word_frequencies": word_frequencies.to_dict(orient='records'),
"average_emotion_by_topic": avg_emotion_by_topic.to_dict(orient='records'),
"reply_time_by_emotion": self.temporal_analysis.avg_reply_time_per_emotion()
}
def user_analysis(self) -> dict:
counts = (
self.df.groupby(["author", "source"])
.size()
.sort_values(ascending=False)
)
top_users = [
{"author": author, "source": source, "count": int(count)}
for (author, source), count in counts.items()
]
per_user = (
self.df.groupby(["author", "type"])
.size()
.unstack(fill_value=0)
)
# ensure columns always exist
for col in ("post", "comment"):
if col not in per_user.columns:
per_user[col] = 0
per_user["comment_post_ratio"] = per_user["comment"] / per_user["post"].replace(0, 1)
per_user["comment_share"] = per_user["comment"] / (per_user["post"] + per_user["comment"]).replace(0, 1)
per_user = per_user.sort_values("comment_post_ratio", ascending=True)
per_user_records = per_user.reset_index().to_dict(orient="records")
vocab_rows = self._vocab_richness_per_user()
vocab_by_author = {row["author"]: row for row in vocab_rows}
# merge vocab richness + per_user information
merged_users = []
for row in per_user_records:
author = row["author"]
merged_users.append({
"author": author,
"post": int(row.get("post", 0)),
"comment": int(row.get("comment", 0)),
"comment_post_ratio": float(row.get("comment_post_ratio", 0)),
"comment_share": float(row.get("comment_share", 0)),
"vocab": vocab_by_author.get(author)
})
merged_users.sort(key=lambda u: u["comment_post_ratio"])
return {
"top_users": top_users,
"users": merged_users,
"interaction_graph": self._interaction_graph()
}
def search(self, search_query: str) -> dict:
self.df = self.df[
self.df["content"].str.contains(search_query)
]
return {
"rows": len(self.df),
"data": self.df.to_dict(orient="records")
}
def set_time_range(self, start: datetime.datetime, end: datetime.datetime) -> dict:
self.df = self.df[
(self.df["dt"] >= start) &
(self.df["dt"] <= end)
]
return {
"rows": len(self.df),
"data": self.df.to_dict(orient="records")
}
"""
Input is a hash map (source_name: str -> enabled: bool)
"""
def filter_data_sources(self, data_sources: dict) -> dict:
enabled_sources = [src for src, enabled in data_sources.items() if enabled]
if not enabled_sources:
raise ValueError("Please choose at least one data source")
self.df = self.df[self.df["source"].isin(enabled_sources)]
return {
"rows": len(self.df),
"data": self.df.to_dict(orient="records")
}
def reset_dataset(self) -> None:
self.df = self.original_df.copy(deep=True)