Implement job queue for asynchronous NLP #6

Merged
dylan merged 14 commits from feat/implement-job-queue into main 2026-03-03 14:26:38 +00:00
Showing only changes of commit 9d1e8960fc - Show all commits

View File

@@ -1,7 +1,6 @@
import pandas as pd import pandas as pd
import re import re
from collections import Counter
from typing import Any from typing import Any
@@ -14,9 +13,6 @@ class CulturalAnalysis:
df = original_df.copy() df = original_df.copy()
s = df[self.content_col].fillna("").astype(str).str.lower() s = df[self.content_col].fillna("").astype(str).str.lower()
in_group_words = {"we", "us", "our", "ourselves"}
out_group_words = {"they", "them", "their", "themselves"}
emotion_exclusions = {"emotion_neutral", "emotion_surprise"} emotion_exclusions = {"emotion_neutral", "emotion_surprise"}
emotion_cols = [ emotion_cols = [
c for c in df.columns c for c in df.columns
@@ -24,11 +20,13 @@ class CulturalAnalysis:
] ]
# Tokenize per row # Tokenize per row
tokens_per_row = s.apply(lambda txt: re.findall(r"\b[a-z]{2,}\b", txt)) in_pattern = re.compile(r"\b(we|us|our|ourselves)\b")
out_pattern = re.compile(r"\b(they|them|their|themselves)\b")
token_pattern = re.compile(r"\b[a-z]{2,}\b")
total_tokens = int(tokens_per_row.map(len).sum()) in_hits = s.str.count(in_pattern)
in_hits = tokens_per_row.map(lambda toks: sum(t in in_group_words for t in toks)).astype(int) out_hits = s.str.count(out_pattern)
out_hits = tokens_per_row.map(lambda toks: sum(t in out_group_words for t in toks)).astype(int) total_tokens = s.str.count(token_pattern).sum()
in_count = int(in_hits.sum()) in_count = int(in_hits.sum())
out_count = int(out_hits.sum()) out_count = int(out_hits.sum())
@@ -62,33 +60,15 @@ class CulturalAnalysis:
def get_stance_markers(self, df: pd.DataFrame) -> dict[str, Any]: def get_stance_markers(self, df: pd.DataFrame) -> dict[str, Any]:
s = df[self.content_col].fillna("").astype(str) s = df[self.content_col].fillna("").astype(str)
hedges = { hedge_pattern = re.compile(r"\b(maybe|perhaps|possibly|probably|likely|seems|seem|i think|i feel|i guess|kind of|sort of|somewhat)\b")
"maybe", "perhaps", "possibly", "probably", "likely", "seems", "seem", certainty_pattern = re.compile(r"\b(definitely|certainly|clearly|obviously|undeniably|always|never)\b")
"i think", "i feel", "i guess", "kind of", "sort of", "somewhat" deontic_pattern = re.compile(r"\b(must|should|need|needs|have to|has to|ought|required|require)\b")
} permission_pattern = re.compile(r"\b(can|allowed|okay|ok|permitted)\b")
certainty = {
"definitely", "certainly", "clearly", "obviously", "undeniably", "always", "never"
}
deontic = { hedge_counts = s.str.count(hedge_pattern)
"must", "should", "need", "needs", "have to", "has to", "ought", "required", "require" certainty_counts = s.str.count(certainty_pattern)
} deontic_counts = s.str.count(deontic_pattern)
perm_counts = s.str.count(permission_pattern)
permission = {"can", "allowed", "okay", "ok", "permitted"}
def count_phrases(text: str, phrases: set[str]) -> int:
c = 0
for p in phrases:
if " " in p:
c += len(re.findall(r"\b" + re.escape(p) + r"\b", text))
else:
c += len(re.findall(r"\b" + re.escape(p) + r"\b", text))
return c
hedge_counts = s.apply(lambda t: count_phrases(t, hedges))
certainty_counts = s.apply(lambda t: count_phrases(t, certainty))
deontic_counts = s.apply(lambda t: count_phrases(t, deontic))
perm_counts = s.apply(lambda t: count_phrases(t, permission))
token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace(0, 1) token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace(0, 1)
@@ -108,44 +88,30 @@ class CulturalAnalysis:
return {"entity_emotion_avg": {}} return {"entity_emotion_avg": {}}
emotion_cols = [c for c in df.columns if c.startswith("emotion_")] emotion_cols = [c for c in df.columns if c.startswith("emotion_")]
entity_counter = Counter()
for row in df["entities"].dropna(): entity_df = df[["entities"] + emotion_cols].explode("entities")
if isinstance(row, list):
for ent in row:
if isinstance(ent, dict):
text = ent.get("text")
if isinstance(text, str):
text = text.strip()
if len(text) >= 3: # filter short junk
entity_counter[text] += 1
top_entities = entity_counter.most_common(top_n) entity_df["entity_text"] = entity_df["entities"].apply(
lambda e: e.get("text").strip()
entity_emotion_avg = {} if isinstance(e, dict) and isinstance(e.get("text"), str) and len(e.get("text")) >= 3
else None
for entity_text, _ in top_entities:
mask = df["entities"].apply(
lambda ents: isinstance(ents, list) and
any(isinstance(e, dict) and e.get("text") == entity_text for e in ents)
) )
post_count = int(mask.sum()) entity_df = entity_df.dropna(subset=["entity_text"])
entity_counts = entity_df["entity_text"].value_counts().head(top_n)
entity_emotion_avg = {}
if post_count >= min_posts: for entity_text, count in entity_counts.items():
if count >= min_posts:
emo_means = ( emo_means = (
df.loc[mask, emotion_cols] entity_df[entity_df["entity_text"] == entity_text][emotion_cols]
.apply(pd.to_numeric, errors="coerce")
.fillna(0.0)
.mean() .mean()
.to_dict() .to_dict()
) )
entity_emotion_avg[entity_text] = { entity_emotion_avg[entity_text] = {
"post_count": post_count, "post_count": int(count),
"emotion_avg": emo_means "emotion_avg": emo_means,
} }
return { return {"entity_emotion_avg": entity_emotion_avg}
"entity_emotion_avg": entity_emotion_avg
}