style: run python linter & prettifier on backend code
This commit is contained in:
@@ -15,7 +15,8 @@ class CulturalAnalysis:
|
||||
|
||||
emotion_exclusions = {"emotion_neutral", "emotion_surprise"}
|
||||
emotion_cols = [
|
||||
c for c in df.columns
|
||||
c
|
||||
for c in df.columns
|
||||
if c.startswith("emotion_") and c not in emotion_exclusions
|
||||
]
|
||||
|
||||
@@ -40,7 +41,6 @@ class CulturalAnalysis:
|
||||
"out_group_usage": out_count,
|
||||
"in_group_ratio": round(in_count / max(total_tokens, 1), 5),
|
||||
"out_group_ratio": round(out_count / max(total_tokens, 1), 5),
|
||||
|
||||
"in_group_posts": int(in_mask.sum()),
|
||||
"out_group_posts": int(out_mask.sum()),
|
||||
"tie_posts": int(tie_mask.sum()),
|
||||
@@ -49,20 +49,34 @@ class CulturalAnalysis:
|
||||
if emotion_cols:
|
||||
emo = df[emotion_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0)
|
||||
|
||||
in_avg = emo.loc[in_mask].mean() if in_mask.any() else pd.Series(0.0, index=emotion_cols)
|
||||
out_avg = emo.loc[out_mask].mean() if out_mask.any() else pd.Series(0.0, index=emotion_cols)
|
||||
in_avg = (
|
||||
emo.loc[in_mask].mean()
|
||||
if in_mask.any()
|
||||
else pd.Series(0.0, index=emotion_cols)
|
||||
)
|
||||
out_avg = (
|
||||
emo.loc[out_mask].mean()
|
||||
if out_mask.any()
|
||||
else pd.Series(0.0, index=emotion_cols)
|
||||
)
|
||||
|
||||
result["in_group_emotion_avg"] = in_avg.to_dict()
|
||||
result["out_group_emotion_avg"] = out_avg.to_dict()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_stance_markers(self, df: pd.DataFrame) -> dict[str, Any]:
|
||||
s = df[self.content_col].fillna("").astype(str)
|
||||
|
||||
hedge_pattern = re.compile(r"\b(maybe|perhaps|possibly|probably|likely|seems|seem|i think|i feel|i guess|kind of|sort of|somewhat)\b")
|
||||
certainty_pattern = re.compile(r"\b(definitely|certainly|clearly|obviously|undeniably|always|never)\b")
|
||||
deontic_pattern = re.compile(r"\b(must|should|need|needs|have to|has to|ought|required|require)\b")
|
||||
hedge_pattern = re.compile(
|
||||
r"\b(maybe|perhaps|possibly|probably|likely|seems|seem|i think|i feel|i guess|kind of|sort of|somewhat)\b"
|
||||
)
|
||||
certainty_pattern = re.compile(
|
||||
r"\b(definitely|certainly|clearly|obviously|undeniably|always|never)\b"
|
||||
)
|
||||
deontic_pattern = re.compile(
|
||||
r"\b(must|should|need|needs|have to|has to|ought|required|require)\b"
|
||||
)
|
||||
permission_pattern = re.compile(r"\b(can|allowed|okay|ok|permitted)\b")
|
||||
|
||||
hedge_counts = s.str.count(hedge_pattern)
|
||||
@@ -70,20 +84,32 @@ class CulturalAnalysis:
|
||||
deontic_counts = s.str.count(deontic_pattern)
|
||||
perm_counts = s.str.count(permission_pattern)
|
||||
|
||||
token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace(0, 1)
|
||||
token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace(
|
||||
0, 1
|
||||
)
|
||||
|
||||
return {
|
||||
"hedge_total": int(hedge_counts.sum()),
|
||||
"certainty_total": int(certainty_counts.sum()),
|
||||
"deontic_total": int(deontic_counts.sum()),
|
||||
"permission_total": int(perm_counts.sum()),
|
||||
"hedge_per_1k_tokens": round(1000 * hedge_counts.sum() / token_counts.sum(), 3),
|
||||
"certainty_per_1k_tokens": round(1000 * certainty_counts.sum() / token_counts.sum(), 3),
|
||||
"deontic_per_1k_tokens": round(1000 * deontic_counts.sum() / token_counts.sum(), 3),
|
||||
"permission_per_1k_tokens": round(1000 * perm_counts.sum() / token_counts.sum(), 3),
|
||||
"hedge_per_1k_tokens": round(
|
||||
1000 * hedge_counts.sum() / token_counts.sum(), 3
|
||||
),
|
||||
"certainty_per_1k_tokens": round(
|
||||
1000 * certainty_counts.sum() / token_counts.sum(), 3
|
||||
),
|
||||
"deontic_per_1k_tokens": round(
|
||||
1000 * deontic_counts.sum() / token_counts.sum(), 3
|
||||
),
|
||||
"permission_per_1k_tokens": round(
|
||||
1000 * perm_counts.sum() / token_counts.sum(), 3
|
||||
),
|
||||
}
|
||||
|
||||
def get_avg_emotions_per_entity(self, df: pd.DataFrame, top_n: int = 25, min_posts: int = 10) -> dict[str, Any]:
|
||||
|
||||
def get_avg_emotions_per_entity(
|
||||
self, df: pd.DataFrame, top_n: int = 25, min_posts: int = 10
|
||||
) -> dict[str, Any]:
|
||||
if "ner_entities" not in df.columns:
|
||||
return {"entity_emotion_avg": {}}
|
||||
|
||||
@@ -92,9 +118,13 @@ class CulturalAnalysis:
|
||||
entity_df = df[["ner_entities"] + emotion_cols].explode("ner_entities")
|
||||
|
||||
entity_df["entity_text"] = entity_df["ner_entities"].apply(
|
||||
lambda e: e.get("text").strip()
|
||||
if isinstance(e, dict) and isinstance(e.get("text"), str) and len(e.get("text")) >= 3
|
||||
else None
|
||||
lambda e: (
|
||||
e.get("text").strip()
|
||||
if isinstance(e, dict)
|
||||
and isinstance(e.get("text"), str)
|
||||
and len(e.get("text")) >= 3
|
||||
else None
|
||||
)
|
||||
)
|
||||
|
||||
entity_df = entity_df.dropna(subset=["entity_text"])
|
||||
@@ -114,4 +144,4 @@ class CulturalAnalysis:
|
||||
"emotion_avg": emo_means,
|
||||
}
|
||||
|
||||
return {"entity_emotion_avg": entity_emotion_avg}
|
||||
return {"entity_emotion_avg": entity_emotion_avg}
|
||||
|
||||
@@ -2,6 +2,7 @@ import pandas as pd
|
||||
|
||||
from server.analysis.nlp import NLP
|
||||
|
||||
|
||||
class DatasetEnrichment:
|
||||
def __init__(self, df: pd.DataFrame, topics: dict):
|
||||
self.df = self._explode_comments(df)
|
||||
@@ -10,7 +11,9 @@ class DatasetEnrichment:
|
||||
|
||||
def _explode_comments(self, df) -> pd.DataFrame:
|
||||
comments_df = df[["id", "comments"]].explode("comments")
|
||||
comments_df = comments_df[comments_df["comments"].apply(lambda x: isinstance(x, dict))]
|
||||
comments_df = comments_df[
|
||||
comments_df["comments"].apply(lambda x: isinstance(x, dict))
|
||||
]
|
||||
comments_df = pd.json_normalize(comments_df["comments"])
|
||||
|
||||
posts_df = df.drop(columns=["comments"])
|
||||
@@ -24,16 +27,16 @@ class DatasetEnrichment:
|
||||
df.drop(columns=["post_id"], inplace=True, errors="ignore")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def enrich(self) -> pd.DataFrame:
|
||||
self.df['timestamp'] = pd.to_numeric(self.df['timestamp'], errors='raise')
|
||||
self.df['date'] = pd.to_datetime(self.df['timestamp'], unit='s').dt.date
|
||||
self.df["timestamp"] = pd.to_numeric(self.df["timestamp"], errors="raise")
|
||||
self.df["date"] = pd.to_datetime(self.df["timestamp"], unit="s").dt.date
|
||||
self.df["dt"] = pd.to_datetime(self.df["timestamp"], unit="s", utc=True)
|
||||
self.df["hour"] = self.df["dt"].dt.hour
|
||||
self.df["weekday"] = self.df["dt"].dt.day_name()
|
||||
|
||||
|
||||
self.nlp.add_emotion_cols()
|
||||
self.nlp.add_topic_col()
|
||||
self.nlp.add_ner_cols()
|
||||
|
||||
return self.df
|
||||
return self.df
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import pandas as pd
|
||||
import re
|
||||
|
||||
|
||||
class InteractionAnalysis:
|
||||
def __init__(self, word_exclusions: set[str]):
|
||||
self.word_exclusions = word_exclusions
|
||||
@@ -51,7 +52,7 @@ class InteractionAnalysis:
|
||||
return 0
|
||||
|
||||
return round(sum(depths) / len(depths), 2)
|
||||
|
||||
|
||||
def top_interaction_pairs(self, df: pd.DataFrame, top_n=10):
|
||||
graph = self.interaction_graph(df)
|
||||
pairs = []
|
||||
@@ -62,7 +63,7 @@ class InteractionAnalysis:
|
||||
|
||||
pairs.sort(key=lambda x: x[1], reverse=True)
|
||||
return pairs[:top_n]
|
||||
|
||||
|
||||
def conversation_concentration(self, df: pd.DataFrame) -> dict:
|
||||
if "type" not in df.columns:
|
||||
return {}
|
||||
@@ -76,12 +77,16 @@ class InteractionAnalysis:
|
||||
total_authors = len(author_counts)
|
||||
|
||||
top_10_pct_n = max(1, int(total_authors * 0.1))
|
||||
top_10_pct_share = round(author_counts.head(top_10_pct_n).sum() / total_comments, 4)
|
||||
top_10_pct_share = round(
|
||||
author_counts.head(top_10_pct_n).sum() / total_comments, 4
|
||||
)
|
||||
|
||||
return {
|
||||
"total_commenting_authors": total_authors,
|
||||
"top_10pct_author_count": top_10_pct_n,
|
||||
"top_10pct_comment_share": float(top_10_pct_share),
|
||||
"single_comment_authors": int((author_counts == 1).sum()),
|
||||
"single_comment_author_ratio": float(round((author_counts == 1).sum() / total_authors, 4)),
|
||||
}
|
||||
"single_comment_author_ratio": float(
|
||||
round((author_counts == 1).sum() / total_authors, 4)
|
||||
),
|
||||
}
|
||||
|
||||
@@ -64,7 +64,10 @@ class LinguisticAnalysis:
|
||||
|
||||
def lexical_diversity(self, df: pd.DataFrame) -> dict:
|
||||
tokens = (
|
||||
df["content"].fillna("").astype(str).str.lower()
|
||||
df["content"]
|
||||
.fillna("")
|
||||
.astype(str)
|
||||
.str.lower()
|
||||
.str.findall(r"\b[a-z]{2,}\b")
|
||||
.explode()
|
||||
)
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Any
|
||||
from transformers import pipeline
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
|
||||
class NLP:
|
||||
_topic_models: dict[str, SentenceTransformer] = {}
|
||||
_emotion_classifiers: dict[str, Any] = {}
|
||||
@@ -32,7 +33,7 @@ class NLP:
|
||||
)
|
||||
self.entity_recognizer = self._get_entity_recognizer(
|
||||
self.device_str, self.pipeline_device
|
||||
)
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
if self.use_cuda and "out of memory" in str(exc).lower():
|
||||
torch.cuda.empty_cache()
|
||||
@@ -90,7 +91,7 @@ class NLP:
|
||||
)
|
||||
cls._emotion_classifiers[device_str] = classifier
|
||||
return classifier
|
||||
|
||||
|
||||
@classmethod
|
||||
def _get_entity_recognizer(cls, device_str: str, pipeline_device: int) -> Any:
|
||||
recognizer = cls._entity_recognizers.get(device_str)
|
||||
@@ -207,8 +208,7 @@ class NLP:
|
||||
self.df.drop(columns=existing_drop, inplace=True)
|
||||
|
||||
remaining_emotion_cols = [
|
||||
c for c in self.df.columns
|
||||
if c.startswith("emotion_")
|
||||
c for c in self.df.columns if c.startswith("emotion_")
|
||||
]
|
||||
|
||||
if remaining_emotion_cols:
|
||||
@@ -227,8 +227,6 @@ class NLP:
|
||||
|
||||
self.df[remaining_emotion_cols] = normalized.values
|
||||
|
||||
|
||||
|
||||
def add_topic_col(self, confidence_threshold: float = 0.3) -> None:
|
||||
titles = self.df[self.title_col].fillna("").astype(str)
|
||||
contents = self.df[self.content_col].fillna("").astype(str)
|
||||
@@ -257,7 +255,7 @@ class NLP:
|
||||
self.df.loc[self.df["topic_confidence"] < confidence_threshold, "topic"] = (
|
||||
"Misc"
|
||||
)
|
||||
|
||||
|
||||
def add_ner_cols(self, max_chars: int = 512) -> None:
|
||||
texts = (
|
||||
self.df[self.content_col]
|
||||
@@ -302,8 +300,4 @@ class NLP:
|
||||
|
||||
for label in all_labels:
|
||||
col_name = f"entity_{label}"
|
||||
self.df[col_name] = [
|
||||
d.get(label, 0) for d in entity_count_dicts
|
||||
]
|
||||
|
||||
|
||||
self.df[col_name] = [d.get(label, 0) for d in entity_count_dicts]
|
||||
|
||||
@@ -3,6 +3,7 @@ import re
|
||||
|
||||
from collections import Counter
|
||||
|
||||
|
||||
class UserAnalysis:
|
||||
def __init__(self, word_exclusions: set[str]):
|
||||
self.word_exclusions = word_exclusions
|
||||
@@ -12,49 +13,49 @@ class UserAnalysis:
|
||||
return [t for t in tokens if t not in self.word_exclusions]
|
||||
|
||||
def _vocab_richness_per_user(
|
||||
self, df: pd.DataFrame, min_words: int = 20, top_most_used_words: int = 100
|
||||
) -> list:
|
||||
df = df.copy()
|
||||
df["content"] = df["content"].fillna("").astype(str).str.lower()
|
||||
df["tokens"] = df["content"].apply(self._tokenize)
|
||||
self, df: pd.DataFrame, min_words: int = 20, top_most_used_words: int = 100
|
||||
) -> list:
|
||||
df = df.copy()
|
||||
df["content"] = df["content"].fillna("").astype(str).str.lower()
|
||||
df["tokens"] = df["content"].apply(self._tokenize)
|
||||
|
||||
rows = []
|
||||
for author, group in df.groupby("author"):
|
||||
all_tokens = [t for tokens in group["tokens"] for t in tokens]
|
||||
rows = []
|
||||
for author, group in df.groupby("author"):
|
||||
all_tokens = [t for tokens in group["tokens"] for t in tokens]
|
||||
|
||||
total_words = len(all_tokens)
|
||||
unique_words = len(set(all_tokens))
|
||||
events = len(group)
|
||||
total_words = len(all_tokens)
|
||||
unique_words = len(set(all_tokens))
|
||||
events = len(group)
|
||||
|
||||
# Min amount of words for a user, any less than this might give weird results
|
||||
if total_words < min_words:
|
||||
continue
|
||||
# Min amount of words for a user, any less than this might give weird results
|
||||
if total_words < min_words:
|
||||
continue
|
||||
|
||||
# 100% = they never reused a word (excluding stop words)
|
||||
vocab_richness = unique_words / total_words
|
||||
avg_words = total_words / max(events, 1)
|
||||
# 100% = they never reused a word (excluding stop words)
|
||||
vocab_richness = unique_words / total_words
|
||||
avg_words = total_words / max(events, 1)
|
||||
|
||||
counts = Counter(all_tokens)
|
||||
top_words = [
|
||||
{"word": w, "count": int(c)}
|
||||
for w, c in counts.most_common(top_most_used_words)
|
||||
]
|
||||
counts = Counter(all_tokens)
|
||||
top_words = [
|
||||
{"word": w, "count": int(c)}
|
||||
for w, c in counts.most_common(top_most_used_words)
|
||||
]
|
||||
|
||||
rows.append(
|
||||
{
|
||||
"author": author,
|
||||
"events": int(events),
|
||||
"total_words": int(total_words),
|
||||
"unique_words": int(unique_words),
|
||||
"vocab_richness": round(vocab_richness, 3),
|
||||
"avg_words_per_event": round(avg_words, 2),
|
||||
"top_words": top_words,
|
||||
}
|
||||
)
|
||||
rows.append(
|
||||
{
|
||||
"author": author,
|
||||
"events": int(events),
|
||||
"total_words": int(total_words),
|
||||
"unique_words": int(unique_words),
|
||||
"vocab_richness": round(vocab_richness, 3),
|
||||
"avg_words_per_event": round(avg_words, 2),
|
||||
"top_words": top_words,
|
||||
}
|
||||
)
|
||||
|
||||
rows = sorted(rows, key=lambda x: x["vocab_richness"], reverse=True)
|
||||
rows = sorted(rows, key=lambda x: x["vocab_richness"], reverse=True)
|
||||
|
||||
return rows
|
||||
return rows
|
||||
|
||||
def top_users(self, df: pd.DataFrame) -> list:
|
||||
counts = df.groupby(["author", "source"]).size().sort_values(ascending=False)
|
||||
|
||||
Reference in New Issue
Block a user