From 376773a0cc1a769d772a36b2948d9f93b3712afe Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Wed, 25 Mar 2026 19:34:43 +0000 Subject: [PATCH] style: run python linter & prettifier on backend code --- server/analysis/cultural.py | 68 ++++++++++++----- server/analysis/enrichment.py | 15 ++-- server/analysis/interactional.py | 15 ++-- server/analysis/linguistic.py | 5 +- server/analysis/nlp.py | 18 ++--- server/analysis/user.py | 71 ++++++++--------- server/app.py | 100 ++++++++++++++---------- server/connectors/base.py | 18 ++--- server/connectors/boards_api.py | 72 ++++++++++-------- server/connectors/reddit_api.py | 127 ++++++++++++++++--------------- server/connectors/registry.py | 19 +++-- server/connectors/youtube_api.py | 104 ++++++++++++------------- server/core/auth.py | 17 +++-- server/core/datasets.py | 33 ++++---- server/db/database.py | 8 +- server/queue/celery_app.py | 4 +- server/queue/tasks.py | 29 ++++--- 17 files changed, 408 insertions(+), 315 deletions(-) diff --git a/server/analysis/cultural.py b/server/analysis/cultural.py index 6aef58a..af64091 100644 --- a/server/analysis/cultural.py +++ b/server/analysis/cultural.py @@ -15,7 +15,8 @@ class CulturalAnalysis: emotion_exclusions = {"emotion_neutral", "emotion_surprise"} emotion_cols = [ - c for c in df.columns + c + for c in df.columns if c.startswith("emotion_") and c not in emotion_exclusions ] @@ -40,7 +41,6 @@ class CulturalAnalysis: "out_group_usage": out_count, "in_group_ratio": round(in_count / max(total_tokens, 1), 5), "out_group_ratio": round(out_count / max(total_tokens, 1), 5), - "in_group_posts": int(in_mask.sum()), "out_group_posts": int(out_mask.sum()), "tie_posts": int(tie_mask.sum()), @@ -49,20 +49,34 @@ class CulturalAnalysis: if emotion_cols: emo = df[emotion_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0) - in_avg = emo.loc[in_mask].mean() if in_mask.any() else pd.Series(0.0, index=emotion_cols) - out_avg = emo.loc[out_mask].mean() if out_mask.any() else pd.Series(0.0, index=emotion_cols) + in_avg = ( + emo.loc[in_mask].mean() + if in_mask.any() + else pd.Series(0.0, index=emotion_cols) + ) + out_avg = ( + emo.loc[out_mask].mean() + if out_mask.any() + else pd.Series(0.0, index=emotion_cols) + ) result["in_group_emotion_avg"] = in_avg.to_dict() result["out_group_emotion_avg"] = out_avg.to_dict() return result - + def get_stance_markers(self, df: pd.DataFrame) -> dict[str, Any]: s = df[self.content_col].fillna("").astype(str) - hedge_pattern = re.compile(r"\b(maybe|perhaps|possibly|probably|likely|seems|seem|i think|i feel|i guess|kind of|sort of|somewhat)\b") - certainty_pattern = re.compile(r"\b(definitely|certainly|clearly|obviously|undeniably|always|never)\b") - deontic_pattern = re.compile(r"\b(must|should|need|needs|have to|has to|ought|required|require)\b") + hedge_pattern = re.compile( + r"\b(maybe|perhaps|possibly|probably|likely|seems|seem|i think|i feel|i guess|kind of|sort of|somewhat)\b" + ) + certainty_pattern = re.compile( + r"\b(definitely|certainly|clearly|obviously|undeniably|always|never)\b" + ) + deontic_pattern = re.compile( + r"\b(must|should|need|needs|have to|has to|ought|required|require)\b" + ) permission_pattern = re.compile(r"\b(can|allowed|okay|ok|permitted)\b") hedge_counts = s.str.count(hedge_pattern) @@ -70,20 +84,32 @@ class CulturalAnalysis: deontic_counts = s.str.count(deontic_pattern) perm_counts = s.str.count(permission_pattern) - token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace(0, 1) + token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace( + 0, 1 + ) return { "hedge_total": int(hedge_counts.sum()), "certainty_total": int(certainty_counts.sum()), "deontic_total": int(deontic_counts.sum()), "permission_total": int(perm_counts.sum()), - "hedge_per_1k_tokens": round(1000 * hedge_counts.sum() / token_counts.sum(), 3), - "certainty_per_1k_tokens": round(1000 * certainty_counts.sum() / token_counts.sum(), 3), - "deontic_per_1k_tokens": round(1000 * deontic_counts.sum() / token_counts.sum(), 3), - "permission_per_1k_tokens": round(1000 * perm_counts.sum() / token_counts.sum(), 3), + "hedge_per_1k_tokens": round( + 1000 * hedge_counts.sum() / token_counts.sum(), 3 + ), + "certainty_per_1k_tokens": round( + 1000 * certainty_counts.sum() / token_counts.sum(), 3 + ), + "deontic_per_1k_tokens": round( + 1000 * deontic_counts.sum() / token_counts.sum(), 3 + ), + "permission_per_1k_tokens": round( + 1000 * perm_counts.sum() / token_counts.sum(), 3 + ), } - - def get_avg_emotions_per_entity(self, df: pd.DataFrame, top_n: int = 25, min_posts: int = 10) -> dict[str, Any]: + + def get_avg_emotions_per_entity( + self, df: pd.DataFrame, top_n: int = 25, min_posts: int = 10 + ) -> dict[str, Any]: if "ner_entities" not in df.columns: return {"entity_emotion_avg": {}} @@ -92,9 +118,13 @@ class CulturalAnalysis: entity_df = df[["ner_entities"] + emotion_cols].explode("ner_entities") entity_df["entity_text"] = entity_df["ner_entities"].apply( - lambda e: e.get("text").strip() - if isinstance(e, dict) and isinstance(e.get("text"), str) and len(e.get("text")) >= 3 - else None + lambda e: ( + e.get("text").strip() + if isinstance(e, dict) + and isinstance(e.get("text"), str) + and len(e.get("text")) >= 3 + else None + ) ) entity_df = entity_df.dropna(subset=["entity_text"]) @@ -114,4 +144,4 @@ class CulturalAnalysis: "emotion_avg": emo_means, } - return {"entity_emotion_avg": entity_emotion_avg} \ No newline at end of file + return {"entity_emotion_avg": entity_emotion_avg} diff --git a/server/analysis/enrichment.py b/server/analysis/enrichment.py index cc9694c..52202e9 100644 --- a/server/analysis/enrichment.py +++ b/server/analysis/enrichment.py @@ -2,6 +2,7 @@ import pandas as pd from server.analysis.nlp import NLP + class DatasetEnrichment: def __init__(self, df: pd.DataFrame, topics: dict): self.df = self._explode_comments(df) @@ -10,7 +11,9 @@ class DatasetEnrichment: def _explode_comments(self, df) -> pd.DataFrame: comments_df = df[["id", "comments"]].explode("comments") - comments_df = comments_df[comments_df["comments"].apply(lambda x: isinstance(x, dict))] + comments_df = comments_df[ + comments_df["comments"].apply(lambda x: isinstance(x, dict)) + ] comments_df = pd.json_normalize(comments_df["comments"]) posts_df = df.drop(columns=["comments"]) @@ -24,16 +27,16 @@ class DatasetEnrichment: df.drop(columns=["post_id"], inplace=True, errors="ignore") return df - + def enrich(self) -> pd.DataFrame: - self.df['timestamp'] = pd.to_numeric(self.df['timestamp'], errors='raise') - self.df['date'] = pd.to_datetime(self.df['timestamp'], unit='s').dt.date + self.df["timestamp"] = pd.to_numeric(self.df["timestamp"], errors="raise") + self.df["date"] = pd.to_datetime(self.df["timestamp"], unit="s").dt.date self.df["dt"] = pd.to_datetime(self.df["timestamp"], unit="s", utc=True) self.df["hour"] = self.df["dt"].dt.hour self.df["weekday"] = self.df["dt"].dt.day_name() - + self.nlp.add_emotion_cols() self.nlp.add_topic_col() self.nlp.add_ner_cols() - return self.df \ No newline at end of file + return self.df diff --git a/server/analysis/interactional.py b/server/analysis/interactional.py index e15940e..8d75a3c 100644 --- a/server/analysis/interactional.py +++ b/server/analysis/interactional.py @@ -1,6 +1,7 @@ import pandas as pd import re + class InteractionAnalysis: def __init__(self, word_exclusions: set[str]): self.word_exclusions = word_exclusions @@ -51,7 +52,7 @@ class InteractionAnalysis: return 0 return round(sum(depths) / len(depths), 2) - + def top_interaction_pairs(self, df: pd.DataFrame, top_n=10): graph = self.interaction_graph(df) pairs = [] @@ -62,7 +63,7 @@ class InteractionAnalysis: pairs.sort(key=lambda x: x[1], reverse=True) return pairs[:top_n] - + def conversation_concentration(self, df: pd.DataFrame) -> dict: if "type" not in df.columns: return {} @@ -76,12 +77,16 @@ class InteractionAnalysis: total_authors = len(author_counts) top_10_pct_n = max(1, int(total_authors * 0.1)) - top_10_pct_share = round(author_counts.head(top_10_pct_n).sum() / total_comments, 4) + top_10_pct_share = round( + author_counts.head(top_10_pct_n).sum() / total_comments, 4 + ) return { "total_commenting_authors": total_authors, "top_10pct_author_count": top_10_pct_n, "top_10pct_comment_share": float(top_10_pct_share), "single_comment_authors": int((author_counts == 1).sum()), - "single_comment_author_ratio": float(round((author_counts == 1).sum() / total_authors, 4)), - } \ No newline at end of file + "single_comment_author_ratio": float( + round((author_counts == 1).sum() / total_authors, 4) + ), + } diff --git a/server/analysis/linguistic.py b/server/analysis/linguistic.py index 7546bbf..a242739 100644 --- a/server/analysis/linguistic.py +++ b/server/analysis/linguistic.py @@ -64,7 +64,10 @@ class LinguisticAnalysis: def lexical_diversity(self, df: pd.DataFrame) -> dict: tokens = ( - df["content"].fillna("").astype(str).str.lower() + df["content"] + .fillna("") + .astype(str) + .str.lower() .str.findall(r"\b[a-z]{2,}\b") .explode() ) diff --git a/server/analysis/nlp.py b/server/analysis/nlp.py index 4459851..f02605b 100644 --- a/server/analysis/nlp.py +++ b/server/analysis/nlp.py @@ -6,6 +6,7 @@ from typing import Any from transformers import pipeline from sentence_transformers import SentenceTransformer + class NLP: _topic_models: dict[str, SentenceTransformer] = {} _emotion_classifiers: dict[str, Any] = {} @@ -32,7 +33,7 @@ class NLP: ) self.entity_recognizer = self._get_entity_recognizer( self.device_str, self.pipeline_device - ) + ) except RuntimeError as exc: if self.use_cuda and "out of memory" in str(exc).lower(): torch.cuda.empty_cache() @@ -90,7 +91,7 @@ class NLP: ) cls._emotion_classifiers[device_str] = classifier return classifier - + @classmethod def _get_entity_recognizer(cls, device_str: str, pipeline_device: int) -> Any: recognizer = cls._entity_recognizers.get(device_str) @@ -207,8 +208,7 @@ class NLP: self.df.drop(columns=existing_drop, inplace=True) remaining_emotion_cols = [ - c for c in self.df.columns - if c.startswith("emotion_") + c for c in self.df.columns if c.startswith("emotion_") ] if remaining_emotion_cols: @@ -227,8 +227,6 @@ class NLP: self.df[remaining_emotion_cols] = normalized.values - - def add_topic_col(self, confidence_threshold: float = 0.3) -> None: titles = self.df[self.title_col].fillna("").astype(str) contents = self.df[self.content_col].fillna("").astype(str) @@ -257,7 +255,7 @@ class NLP: self.df.loc[self.df["topic_confidence"] < confidence_threshold, "topic"] = ( "Misc" ) - + def add_ner_cols(self, max_chars: int = 512) -> None: texts = ( self.df[self.content_col] @@ -302,8 +300,4 @@ class NLP: for label in all_labels: col_name = f"entity_{label}" - self.df[col_name] = [ - d.get(label, 0) for d in entity_count_dicts - ] - - + self.df[col_name] = [d.get(label, 0) for d in entity_count_dicts] diff --git a/server/analysis/user.py b/server/analysis/user.py index fc8e618..f4837d3 100644 --- a/server/analysis/user.py +++ b/server/analysis/user.py @@ -3,6 +3,7 @@ import re from collections import Counter + class UserAnalysis: def __init__(self, word_exclusions: set[str]): self.word_exclusions = word_exclusions @@ -12,49 +13,49 @@ class UserAnalysis: return [t for t in tokens if t not in self.word_exclusions] def _vocab_richness_per_user( - self, df: pd.DataFrame, min_words: int = 20, top_most_used_words: int = 100 - ) -> list: - df = df.copy() - df["content"] = df["content"].fillna("").astype(str).str.lower() - df["tokens"] = df["content"].apply(self._tokenize) + self, df: pd.DataFrame, min_words: int = 20, top_most_used_words: int = 100 + ) -> list: + df = df.copy() + df["content"] = df["content"].fillna("").astype(str).str.lower() + df["tokens"] = df["content"].apply(self._tokenize) - rows = [] - for author, group in df.groupby("author"): - all_tokens = [t for tokens in group["tokens"] for t in tokens] + rows = [] + for author, group in df.groupby("author"): + all_tokens = [t for tokens in group["tokens"] for t in tokens] - total_words = len(all_tokens) - unique_words = len(set(all_tokens)) - events = len(group) + total_words = len(all_tokens) + unique_words = len(set(all_tokens)) + events = len(group) - # Min amount of words for a user, any less than this might give weird results - if total_words < min_words: - continue + # Min amount of words for a user, any less than this might give weird results + if total_words < min_words: + continue - # 100% = they never reused a word (excluding stop words) - vocab_richness = unique_words / total_words - avg_words = total_words / max(events, 1) + # 100% = they never reused a word (excluding stop words) + vocab_richness = unique_words / total_words + avg_words = total_words / max(events, 1) - counts = Counter(all_tokens) - top_words = [ - {"word": w, "count": int(c)} - for w, c in counts.most_common(top_most_used_words) - ] + counts = Counter(all_tokens) + top_words = [ + {"word": w, "count": int(c)} + for w, c in counts.most_common(top_most_used_words) + ] - rows.append( - { - "author": author, - "events": int(events), - "total_words": int(total_words), - "unique_words": int(unique_words), - "vocab_richness": round(vocab_richness, 3), - "avg_words_per_event": round(avg_words, 2), - "top_words": top_words, - } - ) + rows.append( + { + "author": author, + "events": int(events), + "total_words": int(total_words), + "unique_words": int(unique_words), + "vocab_richness": round(vocab_richness, 3), + "avg_words_per_event": round(avg_words, 2), + "top_words": top_words, + } + ) - rows = sorted(rows, key=lambda x: x["vocab_richness"], reverse=True) + rows = sorted(rows, key=lambda x: x["vocab_richness"], reverse=True) - return rows + return rows def top_users(self, df: pd.DataFrame) -> list: counts = df.groupby(["author", "source"]).size().sort_values(ascending=False) diff --git a/server/app.py b/server/app.py index 7a5dea0..1f61032 100644 --- a/server/app.py +++ b/server/app.py @@ -30,7 +30,9 @@ load_dotenv() max_fetch_limit = int(get_env("MAX_FETCH_LIMIT")) frontend_url = get_env("FRONTEND_URL") jwt_secret_key = get_env("JWT_SECRET_KEY") -jwt_access_token_expires = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200)) # Default to 20 minutes +jwt_access_token_expires = int( + os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200) +) # Default to 20 minutes # Flask Configuration CORS(app, resources={r"/*": {"origins": frontend_url}}) @@ -52,6 +54,7 @@ connectors = get_available_connectors() with open("server/topics.json") as f: default_topic_list = json.load(f) + @app.route("/register", methods=["POST"]) def register_user(): data = request.get_json() @@ -107,9 +110,13 @@ def login_user(): def profile(): current_user = get_jwt_identity() - return jsonify( - message="Access granted", user=auth_manager.get_user_by_id(current_user) - ), 200 + return ( + jsonify( + message="Access granted", user=auth_manager.get_user_by_id(current_user) + ), + 200, + ) + @app.route("/user/datasets") @jwt_required() @@ -117,11 +124,13 @@ def get_user_datasets(): current_user = int(get_jwt_identity()) return jsonify(dataset_manager.get_user_datasets(current_user)), 200 + @app.route("/datasets/sources", methods=["GET"]) def get_dataset_sources(): list_metadata = list(get_connector_metadata().values()) return jsonify(list_metadata) + @app.route("/datasets/scrape", methods=["POST"]) @jwt_required() def scrape_data(): @@ -160,7 +169,7 @@ def scrape_data(): limit = int(limit) except (ValueError, TypeError): return jsonify({"error": "Limit must be an integer"}), 400 - + if limit > 1000: limit = 1000 @@ -172,15 +181,13 @@ def scrape_data(): if category and not connector_metadata[name]["categories_enabled"]: return jsonify({"error": f"Source {name} does not support categories"}), 400 - + if category and not connectors[name]().category_exists(category): return jsonify({"error": f"Category does not exist for {name}"}), 400 try: dataset_id = dataset_manager.save_dataset_info( - user_id, - dataset_name, - default_topic_list + user_id, dataset_name, default_topic_list ) dataset_manager.set_dataset_status( @@ -189,22 +196,21 @@ def scrape_data(): f"Data is being fetched from {', '.join(source['name'] for source in source_configs)}", ) - fetch_and_process_dataset.delay( - dataset_id, - source_configs, - default_topic_list - ) + fetch_and_process_dataset.delay(dataset_id, source_configs, default_topic_list) except Exception: print(traceback.format_exc()) return jsonify({"error": "Failed to queue dataset processing"}), 500 - return jsonify( - { - "message": "Dataset queued for processing", - "dataset_id": dataset_id, - "status": "processing", - } - ), 202 + return ( + jsonify( + { + "message": "Dataset queued for processing", + "dataset_id": dataset_id, + "status": "processing", + } + ), + 202, + ) @app.route("/datasets/upload", methods=["POST"]) @@ -226,9 +232,12 @@ def upload_data(): if not post_file.filename.endswith(".jsonl") or not topic_file.filename.endswith( ".json" ): - return jsonify( - {"error": "Invalid file type. Only .jsonl and .json files are allowed."} - ), 400 + return ( + jsonify( + {"error": "Invalid file type. Only .jsonl and .json files are allowed."} + ), + 400, + ) try: current_user = int(get_jwt_identity()) @@ -241,13 +250,16 @@ def upload_data(): process_dataset.delay(dataset_id, posts_df.to_dict(orient="records"), topics) - return jsonify( - { - "message": "Dataset queued for processing", - "dataset_id": dataset_id, - "status": "processing", - } - ), 202 + return ( + jsonify( + { + "message": "Dataset queued for processing", + "dataset_id": dataset_id, + "status": "processing", + } + ), + 202, + ) except ValueError as e: return jsonify({"error": f"Failed to read JSONL file"}), 400 except Exception as e: @@ -296,9 +308,12 @@ def update_dataset(dataset_id): return jsonify({"error": "A valid name must be provided"}), 400 dataset_manager.update_dataset_name(dataset_id, new_name.strip()) - return jsonify( - {"message": f"Dataset {dataset_id} renamed to '{new_name.strip()}'"} - ), 200 + return ( + jsonify( + {"message": f"Dataset {dataset_id} renamed to '{new_name.strip()}'"} + ), + 200, + ) except NotAuthorisedException: return jsonify({"error": "User is not authorised to access this content"}), 403 except NonExistentDatasetException: @@ -321,11 +336,14 @@ def delete_dataset(dataset_id): dataset_manager.delete_dataset_info(dataset_id) dataset_manager.delete_dataset_content(dataset_id) - return jsonify( - { - "message": f"Dataset {dataset_id} metadata and content successfully deleted" - } - ), 200 + return ( + jsonify( + { + "message": f"Dataset {dataset_id} metadata and content successfully deleted" + } + ), + 200, + ) except NotAuthorisedException: return jsonify({"error": "User is not authorised to access this content"}), 403 except NonExistentDatasetException: @@ -523,7 +541,8 @@ def get_interaction_analysis(dataset_id): except Exception as e: print(traceback.format_exc()) return jsonify({"error": f"An unexpected error occurred"}), 500 - + + @app.route("/dataset//all", methods=["GET"]) @jwt_required() def get_full_dataset(dataset_id: int): @@ -546,5 +565,6 @@ def get_full_dataset(dataset_id: int): print(traceback.format_exc()) return jsonify({"error": f"An unexpected error occurred"}), 500 + if __name__ == "__main__": app.run(debug=True) diff --git a/server/connectors/base.py b/server/connectors/base.py index 48163b5..4bf81d4 100644 --- a/server/connectors/base.py +++ b/server/connectors/base.py @@ -1,10 +1,11 @@ from abc import ABC, abstractmethod from dto.post import Post + class BaseConnector(ABC): # Each subclass declares these at the class level - source_name: str # machine-readable: "reddit", "youtube" - display_name: str # human-readable: "Reddit", "YouTube" + source_name: str # machine-readable: "reddit", "youtube" + display_name: str # human-readable: "Reddit", "YouTube" required_env: list[str] = [] # env vars needed to activate search_enabled: bool @@ -14,16 +15,13 @@ class BaseConnector(ABC): def is_available(cls) -> bool: """Returns True if all required env vars are set.""" import os + return all(os.getenv(var) for var in cls.required_env) @abstractmethod - def get_new_posts_by_search(self, - search: str = None, - category: str = None, - post_limit: int = 10 - ) -> list[Post]: - ... + def get_new_posts_by_search( + self, search: str = None, category: str = None, post_limit: int = 10 + ) -> list[Post]: ... @abstractmethod - def category_exists(self, category: str) -> bool: - ... \ No newline at end of file + def category_exists(self, category: str) -> bool: ... diff --git a/server/connectors/boards_api.py b/server/connectors/boards_api.py index 57e79f7..f5c04e6 100644 --- a/server/connectors/boards_api.py +++ b/server/connectors/boards_api.py @@ -11,9 +11,8 @@ from server.connectors.base import BaseConnector logger = logging.getLogger(__name__) -HEADERS = { - "User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)" -} +HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)"} + class BoardsAPI(BaseConnector): source_name: str = "boards.ie" @@ -25,19 +24,17 @@ class BoardsAPI(BaseConnector): def __init__(self): self.base_url = "https://www.boards.ie" - def get_new_posts_by_search(self, - search: str, - category: str, - post_limit: int - ) -> list[Post]: + def get_new_posts_by_search( + self, search: str, category: str, post_limit: int + ) -> list[Post]: if search: raise NotImplementedError("Search not compatible with boards.ie") - + if category: return self._get_posts(f"{self.base_url}/categories/{category}", post_limit) else: return self._get_posts(f"{self.base_url}/discussions", post_limit) - + def category_exists(self, category: str) -> bool: if not category: return False @@ -59,7 +56,7 @@ class BoardsAPI(BaseConnector): except requests.RequestException as e: logger.error(f"Error checking category '{category}': {e}") return False - + ## Private def _get_posts(self, url, limit) -> list[Post]: urls = [] @@ -78,7 +75,7 @@ class BoardsAPI(BaseConnector): href = a.get("href") if href: urls.append(href) - + current_page += 1 logger.debug(f"Fetched {len(urls)} post URLs") @@ -96,7 +93,9 @@ class BoardsAPI(BaseConnector): for i, future in enumerate(as_completed(futures)): post_url = futures[future] - logger.debug(f"Fetching Post {i + 1} / {len(urls)} details from URL: {post_url}") + logger.debug( + f"Fetching Post {i + 1} / {len(urls)} details from URL: {post_url}" + ) try: post = future.result() posts.append(post) @@ -105,7 +104,6 @@ class BoardsAPI(BaseConnector): return posts - def _fetch_page(self, url: str) -> str: response = requests.get(url, headers=HEADERS) response.raise_for_status() @@ -113,7 +111,7 @@ class BoardsAPI(BaseConnector): def _parse_thread(self, html: str, post_url: str) -> Post: soup = BeautifulSoup(html, "html.parser") - + # Author author_tag = soup.select_one(".userinfo-username-title") author = author_tag.text.strip() if author_tag else None @@ -122,10 +120,16 @@ class BoardsAPI(BaseConnector): timestamp_tag = soup.select_one(".postbit-header") timestamp = None if timestamp_tag: - match = re.search(r"\d{2}-\d{2}-\d{4}\s+\d{2}:\d{2}[AP]M", timestamp_tag.get_text()) + match = re.search( + r"\d{2}-\d{2}-\d{4}\s+\d{2}:\d{2}[AP]M", timestamp_tag.get_text() + ) timestamp = match.group(0) if match else None # convert to unix epoch - timestamp = datetime.datetime.strptime(timestamp, "%d-%m-%Y %I:%M%p").timestamp() if timestamp else None + timestamp = ( + datetime.datetime.strptime(timestamp, "%d-%m-%Y %I:%M%p").timestamp() + if timestamp + else None + ) # Post ID post_num = re.search(r"discussion/(\d+)", post_url) @@ -133,7 +137,9 @@ class BoardsAPI(BaseConnector): # Content content_tag = soup.select_one(".Message.userContent") - content = content_tag.get_text(separator="\n", strip=True) if content_tag else None + content = ( + content_tag.get_text(separator="\n", strip=True) if content_tag else None + ) # Title title_tag = soup.select_one(".PageTitle h1") @@ -150,7 +156,7 @@ class BoardsAPI(BaseConnector): url=post_url, timestamp=timestamp, source=self.source_name, - comments=comments + comments=comments, ) return post @@ -168,9 +174,9 @@ class BoardsAPI(BaseConnector): soup = BeautifulSoup(html, "html.parser") next_link = soup.find("a", class_="Next") - if next_link and next_link.get('href'): - href = next_link.get('href') - current_url = href if href.startswith('http') else url + href + if next_link and next_link.get("href"): + href = next_link.get("href") + current_url = href if href.startswith("http") else url + href else: current_url = None @@ -186,21 +192,29 @@ class BoardsAPI(BaseConnector): comment_id = tag.get("id") # Author - user_elem = tag.find('span', class_='userinfo-username-title') + user_elem = tag.find("span", class_="userinfo-username-title") username = user_elem.get_text(strip=True) if user_elem else None # Timestamp - date_elem = tag.find('span', class_='DateCreated') + date_elem = tag.find("span", class_="DateCreated") timestamp = date_elem.get_text(strip=True) if date_elem else None - timestamp = datetime.datetime.strptime(timestamp, "%d-%m-%Y %I:%M%p").timestamp() if timestamp else None + timestamp = ( + datetime.datetime.strptime(timestamp, "%d-%m-%Y %I:%M%p").timestamp() + if timestamp + else None + ) # Content - message_div = tag.find('div', class_='Message userContent') + message_div = tag.find("div", class_="Message userContent") if message_div.blockquote: message_div.blockquote.decompose() - content = message_div.get_text(separator="\n", strip=True) if message_div else None + content = ( + message_div.get_text(separator="\n", strip=True) + if message_div + else None + ) comment = Comment( id=comment_id, @@ -209,10 +223,8 @@ class BoardsAPI(BaseConnector): content=content, timestamp=timestamp, reply_to=None, - source=self.source_name + source=self.source_name, ) comments.append(comment) return comments - - diff --git a/server/connectors/reddit_api.py b/server/connectors/reddit_api.py index 7955fca..ef618cf 100644 --- a/server/connectors/reddit_api.py +++ b/server/connectors/reddit_api.py @@ -9,6 +9,7 @@ from server.connectors.base import BaseConnector logger = logging.getLogger(__name__) + class RedditAPI(BaseConnector): source_name: str = "reddit" display_name: str = "Reddit" @@ -19,22 +20,18 @@ class RedditAPI(BaseConnector): self.url = "https://www.reddit.com/" # Public Methods # - def get_new_posts_by_search(self, - search: str, - category: str, - post_limit: int - ) -> list[Post]: - + def get_new_posts_by_search( + self, search: str, category: str, post_limit: int + ) -> list[Post]: + prefix = f"r/{category}/" if category else "" - params = {'limit': post_limit} + params = {"limit": post_limit} if search: endpoint = f"{prefix}search.json" - params.update({ - 'q': search, - 'sort': 'new', - 'restrict_sr': 'on' if category else 'off' - }) + params.update( + {"q": search, "sort": "new", "restrict_sr": "on" if category else "off"} + ) else: endpoint = f"{prefix}new.json" @@ -43,24 +40,24 @@ class RedditAPI(BaseConnector): while len(posts) < post_limit: batch_limit = min(100, post_limit - len(posts)) - params['limit'] = batch_limit + params["limit"] = batch_limit if after: - params['after'] = after + params["after"] = after data = self._fetch_post_overviews(endpoint, params) - - if not data or 'data' not in data or not data['data'].get('children'): + + if not data or "data" not in data or not data["data"].get("children"): break batch_posts = self._parse_posts(data) posts.extend(batch_posts) - after = data['data'].get('after') + after = data["data"].get("after") if not after: break return posts[:post_limit] - + def _get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]: posts = [] after = None @@ -70,37 +67,36 @@ class RedditAPI(BaseConnector): while len(posts) < limit: batch_limit = min(100, limit - len(posts)) - params = { - 'limit': batch_limit, - 'after': after - } + params = {"limit": batch_limit, "after": after} data = self._fetch_post_overviews(url, params) batch_posts = self._parse_posts(data) - logger.debug(f"Fetched {len(batch_posts)} new posts from subreddit {subreddit}") + logger.debug( + f"Fetched {len(batch_posts)} new posts from subreddit {subreddit}" + ) if not batch_posts: break posts.extend(batch_posts) - after = data['data'].get('after') + after = data["data"].get("after") if not after: break return posts - + def get_user(self, username: str) -> User: data = self._fetch_post_overviews(f"user/{username}/about.json", {}) return self._parse_user(data) - + def category_exists(self, category: str) -> bool: try: data = self._fetch_post_overviews(f"r/{category}/about.json", {}) return ( data is not None - and 'data' in data - and data['data'].get('id') is not None + and "data" in data + and data["data"].get("id") is not None ) except Exception: return False @@ -109,25 +105,26 @@ class RedditAPI(BaseConnector): def _parse_posts(self, data) -> list[Post]: posts = [] - total_num_posts = len(data['data']['children']) + total_num_posts = len(data["data"]["children"]) current_index = 0 - for item in data['data']['children']: + for item in data["data"]["children"]: current_index += 1 logger.debug(f"Parsing post {current_index} of {total_num_posts}") - post_data = item['data'] + post_data = item["data"] post = Post( - id=post_data['id'], - author=post_data['author'], - title=post_data['title'], - content=post_data.get('selftext', ''), - url=post_data['url'], - timestamp=post_data['created_utc'], + id=post_data["id"], + author=post_data["author"], + title=post_data["title"], + content=post_data.get("selftext", ""), + url=post_data["url"], + timestamp=post_data["created_utc"], source=self.source_name, - comments=self._get_post_comments(post_data['id'])) - post.subreddit = post_data['subreddit'] - post.upvotes = post_data['ups'] + comments=self._get_post_comments(post_data["id"]), + ) + post.subreddit = post_data["subreddit"] + post.upvotes = post_data["ups"] posts.append(post) return posts @@ -140,56 +137,62 @@ class RedditAPI(BaseConnector): if len(data) < 2: return comments - comment_data = data[1]['data']['children'] + comment_data = data[1]["data"]["children"] def _parse_comment_tree(items, parent_id=None): for item in items: - if item['kind'] != 't1': + if item["kind"] != "t1": continue - comment_info = item['data'] + comment_info = item["data"] comment = Comment( - id=comment_info['id'], + id=comment_info["id"], post_id=post_id, - author=comment_info['author'], - content=comment_info.get('body', ''), - timestamp=comment_info['created_utc'], - reply_to=parent_id or comment_info.get('parent_id', None), - source=self.source_name + author=comment_info["author"], + content=comment_info.get("body", ""), + timestamp=comment_info["created_utc"], + reply_to=parent_id or comment_info.get("parent_id", None), + source=self.source_name, ) comments.append(comment) # Process replies recursively - replies = comment_info.get('replies') + replies = comment_info.get("replies") if replies and isinstance(replies, dict): - reply_items = replies.get('data', {}).get('children', []) + reply_items = replies.get("data", {}).get("children", []) _parse_comment_tree(reply_items, parent_id=comment.id) _parse_comment_tree(comment_data) return comments - + def _parse_user(self, data) -> User: - user_data = data['data'] - user = User( - username=user_data['name'], - created_utc=user_data['created_utc']) - user.karma = user_data['total_karma'] + user_data = data["data"] + user = User(username=user_data["name"], created_utc=user_data["created_utc"]) + user.karma = user_data["total_karma"] return user - + def _fetch_post_overviews(self, endpoint: str, params: dict) -> dict: url = f"{self.url}{endpoint}" max_retries = 15 - backoff = 1 # seconds + backoff = 1 # seconds for attempt in range(max_retries): try: - response = requests.get(url, headers={'User-agent': 'python:ethnography-college-project:0.1 (by /u/ThisBirchWood)'}, params=params) + response = requests.get( + url, + headers={ + "User-agent": "python:ethnography-college-project:0.1 (by /u/ThisBirchWood)" + }, + params=params, + ) if response.status_code == 429: wait_time = response.headers.get("Retry-After", backoff) - logger.warning(f"Rate limited by Reddit API. Retrying in {wait_time} seconds...") + logger.warning( + f"Rate limited by Reddit API. Retrying in {wait_time} seconds..." + ) time.sleep(wait_time) backoff *= 2 @@ -205,4 +208,4 @@ class RedditAPI(BaseConnector): return response.json() except requests.RequestException as e: print(f"Error fetching data from Reddit API: {e}") - return {} \ No newline at end of file + return {} diff --git a/server/connectors/registry.py b/server/connectors/registry.py index f2371e6..08f641d 100644 --- a/server/connectors/registry.py +++ b/server/connectors/registry.py @@ -3,6 +3,7 @@ import importlib import server.connectors from server.connectors.base import BaseConnector + def _discover_connectors() -> list[type[BaseConnector]]: """Walk the connectors package and collect all BaseConnector subclasses.""" for _, module_name, _ in pkgutil.iter_modules(server.connectors.__path__): @@ -11,20 +12,24 @@ def _discover_connectors() -> list[type[BaseConnector]]: importlib.import_module(f"server.connectors.{module_name}") return [ - cls for cls in BaseConnector.__subclasses__() + cls + for cls in BaseConnector.__subclasses__() if cls.source_name # guard against abstract intermediaries ] + def get_available_connectors() -> dict[str, type[BaseConnector]]: return {c.source_name: c for c in _discover_connectors() if c.is_available()} + def get_connector_metadata() -> dict[str, dict]: res = {} for id, obj in get_available_connectors().items(): - res[id] = {"id": id, - "label": obj.display_name, - "search_enabled": obj.search_enabled, - "categories_enabled": obj.categories_enabled - } + res[id] = { + "id": id, + "label": obj.display_name, + "search_enabled": obj.search_enabled, + "categories_enabled": obj.categories_enabled, + } - return res \ No newline at end of file + return res diff --git a/server/connectors/youtube_api.py b/server/connectors/youtube_api.py index 7b014d0..2ec73d9 100644 --- a/server/connectors/youtube_api.py +++ b/server/connectors/youtube_api.py @@ -12,6 +12,7 @@ load_dotenv() API_KEY = os.getenv("YOUTUBE_API_KEY") + class YouTubeAPI(BaseConnector): source_name: str = "youtube" display_name: str = "YouTube" @@ -19,73 +20,72 @@ class YouTubeAPI(BaseConnector): categories_enabled: bool = False def __init__(self): - self.youtube = build('youtube', 'v3', developerKey=API_KEY) + self.youtube = build("youtube", "v3", developerKey=API_KEY) - def get_new_posts_by_search(self, - search: str, - category: str, - post_limit: int - ) -> list[Post]: - videos = self._search_videos(search, post_limit) - posts = [] + def get_new_posts_by_search( + self, search: str, category: str, post_limit: int + ) -> list[Post]: + videos = self._search_videos(search, post_limit) + posts = [] - for video in videos: - video_id = video['id']['videoId'] - snippet = video['snippet'] - title = snippet['title'] - description = snippet['description'] - published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp() - channel_title = snippet['channelTitle'] + for video in videos: + video_id = video["id"]["videoId"] + snippet = video["snippet"] + title = snippet["title"] + description = snippet["description"] + published_at = datetime.datetime.strptime( + snippet["publishedAt"], "%Y-%m-%dT%H:%M:%SZ" + ).timestamp() + channel_title = snippet["channelTitle"] - comments = [] - comments_data = self._get_video_comments(video_id) - for comment_thread in comments_data: - comment_snippet = comment_thread['snippet']['topLevelComment']['snippet'] - comment = Comment( - id=comment_thread['id'], - post_id=video_id, - content=comment_snippet['textDisplay'], - author=comment_snippet['authorDisplayName'], - timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(), - reply_to=None, - source=self.source_name - ) - - comments.append(comment) - - post = Post( - id=video_id, - content=f"{title}\n\n{description}", - author=channel_title, - timestamp=published_at, - url=f"https://www.youtube.com/watch?v={video_id}", - title=title, + comments = [] + comments_data = self._get_video_comments(video_id) + for comment_thread in comments_data: + comment_snippet = comment_thread["snippet"]["topLevelComment"][ + "snippet" + ] + comment = Comment( + id=comment_thread["id"], + post_id=video_id, + content=comment_snippet["textDisplay"], + author=comment_snippet["authorDisplayName"], + timestamp=datetime.datetime.strptime( + comment_snippet["publishedAt"], "%Y-%m-%dT%H:%M:%SZ" + ).timestamp(), + reply_to=None, source=self.source_name, - comments=comments ) - posts.append(post) + comments.append(comment) + + post = Post( + id=video_id, + content=f"{title}\n\n{description}", + author=channel_title, + timestamp=published_at, + url=f"https://www.youtube.com/watch?v={video_id}", + title=title, + source=self.source_name, + comments=comments, + ) + + posts.append(post) + + return posts - return posts - def category_exists(self, category): return True def _search_videos(self, query, limit): request = self.youtube.search().list( - q=query, - part='snippet', - type='video', - maxResults=limit + q=query, part="snippet", type="video", maxResults=limit ) response = request.execute() - return response.get('items', []) - + return response.get("items", []) + def _get_video_comments(self, video_id): request = self.youtube.commentThreads().list( - part='snippet', - videoId=video_id, - textFormat='plainText' + part="snippet", videoId=video_id, textFormat="plainText" ) try: @@ -93,4 +93,4 @@ class YouTubeAPI(BaseConnector): except HttpError as e: print(f"Error fetching comments for video {video_id}: {e}") return [] - return response.get('items', []) + return response.get("items", []) diff --git a/server/core/auth.py b/server/core/auth.py index 34bb93c..3079960 100644 --- a/server/core/auth.py +++ b/server/core/auth.py @@ -5,6 +5,7 @@ from flask_bcrypt import Bcrypt EMAIL_REGEX = re.compile(r"[^@]+@[^@]+\.[^@]+") + class AuthManager: def __init__(self, db: PostgresConnector, bcrypt: Bcrypt): self.db = db @@ -24,13 +25,13 @@ class AuthManager: if len(username) < 3: raise ValueError("Username must be longer than 3 characters") - + if not EMAIL_REGEX.match(email): raise ValueError("Please enter a valid email address") if self.get_user_by_email(email): raise ValueError("Email already registered") - + if self.get_user_by_username(username): raise ValueError("Username already taken") @@ -38,20 +39,22 @@ class AuthManager: def authenticate_user(self, username, password): user = self.get_user_by_username(username) - if user and self.bcrypt.check_password_hash(user['password_hash'], password): + if user and self.bcrypt.check_password_hash(user["password_hash"], password): return user return None - + def get_user_by_id(self, user_id): query = "SELECT id, username, email FROM users WHERE id = %s" result = self.db.execute(query, (user_id,), fetch=True) return result[0] if result else None - + def get_user_by_username(self, username) -> dict: - query = "SELECT id, username, email, password_hash FROM users WHERE username = %s" + query = ( + "SELECT id, username, email, password_hash FROM users WHERE username = %s" + ) result = self.db.execute(query, (username,), fetch=True) return result[0] if result else None - + def get_user_by_email(self, email) -> dict: query = "SELECT id, username, email, password_hash FROM users WHERE email = %s" result = self.db.execute(query, (email,), fetch=True) diff --git a/server/core/datasets.py b/server/core/datasets.py index a55445d..c2f2214 100644 --- a/server/core/datasets.py +++ b/server/core/datasets.py @@ -3,6 +3,7 @@ from server.db.database import PostgresConnector from psycopg2.extras import Json from server.exceptions import NonExistentDatasetException + class DatasetManager: def __init__(self, db: PostgresConnector): self.db = db @@ -15,18 +16,18 @@ class DatasetManager: if dataset_info.get("user_id") != user_id: return False - + return True - + def get_user_datasets(self, user_id: int) -> list[dict]: query = "SELECT * FROM datasets WHERE user_id = %s" - return self.db.execute(query, (user_id, ), fetch=True) + return self.db.execute(query, (user_id,), fetch=True) def get_dataset_content(self, dataset_id: int) -> pd.DataFrame: query = "SELECT * FROM events WHERE dataset_id = %s" result = self.db.execute(query, (dataset_id,), fetch=True) return pd.DataFrame(result) - + def get_dataset_info(self, dataset_id: int) -> dict: query = "SELECT * FROM datasets WHERE id = %s" result = self.db.execute(query, (dataset_id,), fetch=True) @@ -35,14 +36,16 @@ class DatasetManager: raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist") return result[0] - + def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int: query = """ INSERT INTO datasets (user_id, name, topics) VALUES (%s, %s, %s) RETURNING id """ - result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True) + result = self.db.execute( + query, (user_id, dataset_name, Json(topics)), fetch=True + ) return result[0]["id"] if result else None def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame): @@ -113,7 +116,9 @@ class DatasetManager: self.db.execute_batch(query, values) - def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None): + def set_dataset_status( + self, dataset_id: int, status: str, status_message: str | None = None + ): if status not in ["fetching", "processing", "complete", "error"]: raise ValueError("Invalid status") @@ -137,24 +142,24 @@ class DatasetManager: WHERE id = %s """ - result = self.db.execute(query, (dataset_id, ), fetch=True) - + result = self.db.execute(query, (dataset_id,), fetch=True) + if not result: print(result) raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist") - + return result[0] - + def update_dataset_name(self, dataset_id: int, new_name: str): query = "UPDATE datasets SET name = %s WHERE id = %s" self.db.execute(query, (new_name, dataset_id)) - + def delete_dataset_info(self, dataset_id: int): query = "DELETE FROM datasets WHERE id = %s" - self.db.execute(query, (dataset_id, )) + self.db.execute(query, (dataset_id,)) def delete_dataset_content(self, dataset_id: int): query = "DELETE FROM events WHERE dataset_id = %s" - self.db.execute(query, (dataset_id, )) \ No newline at end of file + self.db.execute(query, (dataset_id,)) diff --git a/server/db/database.py b/server/db/database.py index f56f579..7a5a707 100644 --- a/server/db/database.py +++ b/server/db/database.py @@ -22,8 +22,10 @@ class PostgresConnector: database=os.getenv("POSTGRES_DB", "postgres"), ) except psycopg2.OperationalError as e: - raise DatabaseNotConfiguredException(f"Ensure database is up and running: {e}") - + raise DatabaseNotConfiguredException( + f"Ensure database is up and running: {e}" + ) + self.connection.autocommit = False def execute(self, query, params=None, fetch=False) -> list: @@ -48,4 +50,4 @@ class PostgresConnector: def close(self): if self.connection: - self.connection.close() \ No newline at end of file + self.connection.close() diff --git a/server/queue/celery_app.py b/server/queue/celery_app.py index cf55f3f..fe4b5d9 100644 --- a/server/queue/celery_app.py +++ b/server/queue/celery_app.py @@ -5,6 +5,7 @@ from server.utils import get_env load_dotenv() REDIS_URL = get_env("REDIS_URL") + def create_celery(): celery = Celery( "ethnograph", @@ -16,6 +17,7 @@ def create_celery(): celery.conf.accept_content = ["json"] return celery + celery = create_celery() -from server.queue import tasks \ No newline at end of file +from server.queue import tasks diff --git a/server/queue/tasks.py b/server/queue/tasks.py index 95248d1..d4dee0a 100644 --- a/server/queue/tasks.py +++ b/server/queue/tasks.py @@ -9,6 +9,7 @@ from server.connectors.registry import get_available_connectors logger = logging.getLogger(__name__) + @celery.task(bind=True, max_retries=3) def process_dataset(self, dataset_id: int, posts: list, topics: dict): db = PostgresConnector() @@ -21,15 +22,19 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict): enriched_df = processor.enrich() dataset_manager.save_dataset_content(dataset_id, enriched_df) - dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully") + dataset_manager.set_dataset_status( + dataset_id, "complete", "NLP Processing Completed Successfully" + ) except Exception as e: - dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}") + dataset_manager.set_dataset_status( + dataset_id, "error", f"An error occurred: {e}" + ) + @celery.task(bind=True, max_retries=3) -def fetch_and_process_dataset(self, - dataset_id: int, - source_info: list[dict], - topics: dict): +def fetch_and_process_dataset( + self, dataset_id: int, source_info: list[dict], topics: dict +): connectors = get_available_connectors() db = PostgresConnector() dataset_manager = DatasetManager(db) @@ -44,9 +49,7 @@ def fetch_and_process_dataset(self, connector = connectors[name]() raw_posts = connector.get_new_posts_by_search( - search=search, - category=category, - post_limit=limit + search=search, category=category, post_limit=limit ) posts.extend(post.to_dict() for post in raw_posts) @@ -56,6 +59,10 @@ def fetch_and_process_dataset(self, enriched_df = processor.enrich() dataset_manager.save_dataset_content(dataset_id, enriched_df) - dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully") + dataset_manager.set_dataset_status( + dataset_id, "complete", "NLP Processing Completed Successfully" + ) except Exception as e: - dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}") \ No newline at end of file + dataset_manager.set_dataset_status( + dataset_id, "error", f"An error occurred: {e}" + )