diff --git a/connectors/youtube_api.py b/connectors/youtube_api.py deleted file mode 100644 index d0e00a3..0000000 --- a/connectors/youtube_api.py +++ /dev/null @@ -1,84 +0,0 @@ -import os -import datetime - -from dotenv import load_dotenv -from googleapiclient.discovery import build -from googleapiclient.errors import HttpError -from dto.post import Post -from dto.comment import Comment - -load_dotenv() - -API_KEY = os.getenv("YOUTUBE_API_KEY") - -class YouTubeAPI: - def __init__(self): - self.youtube = build('youtube', 'v3', developerKey=API_KEY) - - def search_videos(self, query, limit): - request = self.youtube.search().list( - q=query, - part='snippet', - type='video', - maxResults=limit - ) - response = request.execute() - return response.get('items', []) - - def get_video_comments(self, video_id, limit): - request = self.youtube.commentThreads().list( - part='snippet', - videoId=video_id, - maxResults=limit, - textFormat='plainText' - ) - - try: - response = request.execute() - except HttpError as e: - print(f"Error fetching comments for video {video_id}: {e}") - return [] - return response.get('items', []) - - def fetch_videos(self, query, video_limit, comment_limit) -> list[Post]: - videos = self.search_videos(query, video_limit) - posts = [] - - for video in videos: - video_id = video['id']['videoId'] - snippet = video['snippet'] - title = snippet['title'] - description = snippet['description'] - published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp() - channel_title = snippet['channelTitle'] - - comments = [] - comments_data = self.get_video_comments(video_id, comment_limit) - for comment_thread in comments_data: - comment_snippet = comment_thread['snippet']['topLevelComment']['snippet'] - comment = Comment( - id=comment_thread['id'], - post_id=video_id, - content=comment_snippet['textDisplay'], - author=comment_snippet['authorDisplayName'], - timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(), - reply_to=None, - source="YouTube" - ) - - comments.append(comment) - - post = Post( - id=video_id, - content=f"{title}\n\n{description}", - author=channel_title, - timestamp=published_at, - url=f"https://www.youtube.com/watch?v={video_id}", - title=title, - source="YouTube", - comments=comments - ) - - posts.append(post) - - return posts \ No newline at end of file diff --git a/create_dataset.py b/create_dataset.py deleted file mode 100644 index 791b2bd..0000000 --- a/create_dataset.py +++ /dev/null @@ -1,43 +0,0 @@ -import json -import logging -from connectors.reddit_api import RedditAPI -from connectors.boards_api import BoardsAPI -from connectors.youtube_api import YouTubeAPI - -posts_file = 'posts_test.jsonl' - -reddit_connector = RedditAPI() -boards_connector = BoardsAPI() -youtube_connector = YouTubeAPI() - -logging.basicConfig(level=logging.DEBUG) -logging.getLogger("urllib3").setLevel(logging.WARNING) - -def remove_empty_posts(posts): - return [post for post in posts if post.content.strip() != ""] - -def save_to_jsonl(filename, posts): - with open(filename, 'a', encoding='utf-8') as f: - for post in posts: - # Convert post object to dict if it's a dataclass - data = post.to_dict() - f.write(json.dumps(data) + '\n') - - -def main(): - boards_posts = boards_connector.get_new_category_posts('cork-city', 1200, 1200) - save_to_jsonl(posts_file, boards_posts) - - reddit_posts = reddit_connector.get_new_subreddit_posts('cork', 1200) - reddit_posts = remove_empty_posts(reddit_posts) - save_to_jsonl(posts_file, reddit_posts) - - ireland_posts = reddit_connector.search_new_subreddit_posts('cork', 'ireland', 1200) - ireland_posts = remove_empty_posts(ireland_posts) - save_to_jsonl(posts_file, ireland_posts) - - youtube_videos = youtube_connector.fetch_videos('cork city', 1200, 1200) - save_to_jsonl(posts_file, youtube_videos) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 96c3430..dc3edb2 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -43,7 +43,7 @@ services: - .env command: > celery -A server.queue.celery_app.celery worker - --loglevel=info + --loglevel=debug --pool=solo depends_on: - postgres diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index b1e6045..1b10f61 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -5,6 +5,7 @@ import DatasetsPage from "./pages/Datasets"; import DatasetStatusPage from "./pages/DatasetStatus"; import LoginPage from "./pages/Login"; import UploadPage from "./pages/Upload"; +import AutoScrapePage from "./pages/AutoScrape"; import StatPage from "./pages/Stats"; import { getDocumentTitle } from "./utils/documentTitle"; import DatasetEditPage from "./pages/DatasetEdit"; @@ -22,6 +23,7 @@ function App() { } /> } /> } /> + } /> } /> } /> } /> diff --git a/frontend/src/pages/AutoScrape.tsx b/frontend/src/pages/AutoScrape.tsx new file mode 100644 index 0000000..9e9d336 --- /dev/null +++ b/frontend/src/pages/AutoScrape.tsx @@ -0,0 +1,338 @@ +import axios from "axios"; +import { useEffect, useState } from "react"; +import { useNavigate } from "react-router-dom"; +import StatsStyling from "../styles/stats_styling"; + +const styles = StatsStyling; +const API_BASE_URL = import.meta.env.VITE_BACKEND_URL; + +type SourceOption = { + id: string; + label: string; + search_enabled?: boolean; + categories_enabled?: boolean; + searchEnabled?: boolean; + categoriesEnabled?: boolean; +}; + +type SourceConfig = { + sourceName: string; + limit: string; + search: string; + category: string; +}; + +const buildEmptySourceConfig = (sourceName = ""): SourceConfig => ({ + sourceName, + limit: "100", + search: "", + category: "", +}); + +const supportsSearch = (source?: SourceOption): boolean => + Boolean(source?.search_enabled ?? source?.searchEnabled); + +const supportsCategories = (source?: SourceOption): boolean => + Boolean(source?.categories_enabled ?? source?.categoriesEnabled); + +const AutoScrapePage = () => { + const navigate = useNavigate(); + const [datasetName, setDatasetName] = useState(""); + const [sourceOptions, setSourceOptions] = useState([]); + const [sourceConfigs, setSourceConfigs] = useState([]); + const [returnMessage, setReturnMessage] = useState(""); + const [isLoadingSources, setIsLoadingSources] = useState(true); + const [isSubmitting, setIsSubmitting] = useState(false); + const [hasError, setHasError] = useState(false); + + useEffect(() => { + axios + .get(`${API_BASE_URL}/datasets/sources`) + .then((response) => { + const options = response.data || []; + setSourceOptions(options); + setSourceConfigs([buildEmptySourceConfig(options[0]?.id || "")]); + }) + .catch((requestError: unknown) => { + setHasError(true); + if (axios.isAxiosError(requestError)) { + setReturnMessage( + `Failed to load available sources: ${String( + requestError.response?.data?.error || requestError.message + )}` + ); + } else { + setReturnMessage("Failed to load available sources."); + } + }) + .finally(() => { + setIsLoadingSources(false); + }); + }, []); + + const updateSourceConfig = (index: number, field: keyof SourceConfig, value: string) => { + setSourceConfigs((previous) => + previous.map((config, configIndex) => + configIndex === index + ? field === "sourceName" + ? { ...config, sourceName: value, search: "", category: "" } + : { ...config, [field]: value } + : config + ) + ); + }; + + const getSourceOption = (sourceName: string) => + sourceOptions.find((option) => option.id === sourceName); + + const addSourceConfig = () => { + setSourceConfigs((previous) => [ + ...previous, + buildEmptySourceConfig(sourceOptions[0]?.id || ""), + ]); + }; + + const removeSourceConfig = (index: number) => { + setSourceConfigs((previous) => previous.filter((_, configIndex) => configIndex !== index)); + }; + + const autoScrape = async () => { + const token = localStorage.getItem("access_token"); + if (!token) { + setHasError(true); + setReturnMessage("You must be signed in to auto scrape a dataset."); + return; + } + + const normalizedDatasetName = datasetName.trim(); + if (!normalizedDatasetName) { + setHasError(true); + setReturnMessage("Please add a dataset name before continuing."); + return; + } + + if (sourceConfigs.length === 0) { + setHasError(true); + setReturnMessage("Please add at least one source."); + return; + } + + const normalizedSources = sourceConfigs.map((source) => { + const sourceOption = getSourceOption(source.sourceName); + + return { + name: source.sourceName, + limit: Number(source.limit || 100), + search: supportsSearch(sourceOption) ? source.search.trim() || undefined : undefined, + category: supportsCategories(sourceOption) + ? source.category.trim() || undefined + : undefined, + }; + }); + + const invalidSource = normalizedSources.find( + (source) => !source.name || !Number.isFinite(source.limit) || source.limit <= 0 + ); + + if (invalidSource) { + setHasError(true); + setReturnMessage("Every source needs a name and a limit greater than zero."); + return; + } + + try { + setIsSubmitting(true); + setHasError(false); + setReturnMessage(""); + + const response = await axios.post( + `${API_BASE_URL}/datasets/scrape`, + { + name: normalizedDatasetName, + sources: normalizedSources, + }, + { + headers: { + Authorization: `Bearer ${token}`, + }, + } + ); + + const datasetId = Number(response.data.dataset_id); + + setReturnMessage( + `Auto scrape queued successfully (dataset #${datasetId}). Redirecting to processing status...` + ); + + setTimeout(() => { + navigate(`/dataset/${datasetId}/status`); + }, 400); + } catch (requestError: unknown) { + setHasError(true); + if (axios.isAxiosError(requestError)) { + const message = String( + requestError.response?.data?.error || requestError.message || "Auto scrape failed." + ); + setReturnMessage(`Auto scrape failed: ${message}`); + } else { + setReturnMessage("Auto scrape failed due to an unexpected error."); + } + } finally { + setIsSubmitting(false); + } + }; + + return ( +
+
+
+
+

Auto Scrape Dataset

+

+ Select sources and scrape settings, then queue processing automatically. +

+
+ +
+ +
+
+

Dataset Name

+

Use a clear label so you can identify this run later.

+ setDatasetName(event.target.value)} + /> +
+ +
+

Sources

+

+ Configure source, limit, optional search, and optional category. +

+ + {isLoadingSources &&

Loading sources...

} + + {!isLoadingSources && sourceOptions.length === 0 && ( +

No source connectors are currently available.

+ )} + + {!isLoadingSources && sourceOptions.length > 0 && ( +
+ {sourceConfigs.map((source, index) => { + const sourceOption = getSourceOption(source.sourceName); + const searchEnabled = supportsSearch(sourceOption); + const categoriesEnabled = supportsCategories(sourceOption); + + return ( +
+ + + updateSourceConfig(index, "limit", event.target.value)} + /> + + updateSourceConfig(index, "search", event.target.value)} + /> + + updateSourceConfig(index, "category", event.target.value)} + /> + + {sourceConfigs.length > 1 && ( + + )} +
+ ); + })} + + +
+ )} +
+
+ +
+ {returnMessage || + "After queueing, your dataset is fetched and processed in the background automatically."} +
+
+
+ ); +}; + +export default AutoScrapePage; diff --git a/frontend/src/pages/Datasets.tsx b/frontend/src/pages/Datasets.tsx index 4c79cdc..d06d32a 100644 --- a/frontend/src/pages/Datasets.tsx +++ b/frontend/src/pages/Datasets.tsx @@ -9,7 +9,7 @@ const API_BASE_URL = import.meta.env.VITE_BACKEND_URL; type DatasetItem = { id: number; name?: string; - status?: "processing" | "complete" | "error" | string; + status?: "processing" | "complete" | "error" | "fetching" | string; status_message?: string | null; completed_at?: string | null; created_at?: string | null; @@ -50,7 +50,24 @@ const DatasetsPage = () => { }, []); if (loading) { - return

Loading datasets...

; + return ( +
+
+
+
+
+

Loading datasets

+
+
+ +
+
+
+
+
+
+
+ ) } return ( @@ -63,9 +80,18 @@ const DatasetsPage = () => { View and reopen datasets you previously uploaded.

- +
+ + +
{error && ( @@ -93,7 +119,7 @@ const DatasetsPage = () => {
    {datasets.map((dataset) => { - const isComplete = dataset.status === "complete"; + const isComplete = dataset.status === "complete" || dataset.status === "error"; const editPath = `/dataset/${dataset.id}/edit`; const targetPath = isComplete ? `/dataset/${dataset.id}/stats` diff --git a/frontend/src/pages/Upload.tsx b/frontend/src/pages/Upload.tsx index 93383dc..0799f9b 100644 --- a/frontend/src/pages/Upload.tsx +++ b/frontend/src/pages/Upload.tsx @@ -40,7 +40,7 @@ const UploadPage = () => { setHasError(false); setReturnMessage(""); - const response = await axios.post(`${API_BASE_URL}/upload`, formData, { + const response = await axios.post(`${API_BASE_URL}/datasets/upload`, formData, { headers: { "Content-Type": "multipart/form-data", }, diff --git a/frontend/src/utils/documentTitle.ts b/frontend/src/utils/documentTitle.ts index 904a6a8..5c7d00d 100644 --- a/frontend/src/utils/documentTitle.ts +++ b/frontend/src/utils/documentTitle.ts @@ -3,6 +3,7 @@ const DEFAULT_TITLE = "Ethnograph View"; const STATIC_TITLES: Record = { "/login": "Sign In", "/upload": "Upload Dataset", + "/auto-scrape": "Auto Scrape Dataset", "/datasets": "My Datasets", }; diff --git a/server/app.py b/server/app.py index 7cbf9d3..f373843 100644 --- a/server/app.py +++ b/server/app.py @@ -19,32 +19,38 @@ from server.exceptions import NotAuthorisedException, NonExistentDatasetExceptio from server.db.database import PostgresConnector from server.core.auth import AuthManager from server.core.datasets import DatasetManager -from server.utils import get_request_filters -from server.queue.tasks import process_dataset +from server.utils import get_request_filters, get_env +from server.queue.tasks import process_dataset, fetch_and_process_dataset +from server.connectors.registry import get_available_connectors, get_connector_metadata app = Flask(__name__) # Env Variables load_dotenv() -frontend_url = os.getenv("FRONTEND_URL", "http://localhost:5173") -jwt_secret_key = os.getenv("JWT_SECRET_KEY", "super-secret-change-this") -jwt_access_token_expires = int( - os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200) -) # Default to 20 minutes +max_fetch_limit = int(get_env("MAX_FETCH_LIMIT")) +frontend_url = get_env("FRONTEND_URL") +jwt_secret_key = get_env("JWT_SECRET_KEY") +jwt_access_token_expires = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200)) # Default to 20 minutes # Flask Configuration CORS(app, resources={r"/*": {"origins": frontend_url}}) app.config["JWT_SECRET_KEY"] = jwt_secret_key app.config["JWT_ACCESS_TOKEN_EXPIRES"] = jwt_access_token_expires +# Security bcrypt = Bcrypt(app) jwt = JWTManager(app) +# Helper Objects db = PostgresConnector() auth_manager = AuthManager(db, bcrypt) dataset_manager = DatasetManager(db) stat_gen = StatGen() +connectors = get_available_connectors() +# Default Files +with open("server/topics.json") as f: + default_topic_list = json.load(f) @app.route("/register", methods=["POST"]) def register_user(): @@ -68,7 +74,7 @@ def register_user(): return jsonify({"error": str(e)}), 400 except Exception as e: print(traceback.format_exc()) - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 + return jsonify({"error": f"An unexpected error occurred"}), 500 print(f"Registered new user: {username}") return jsonify({"message": f"User '{username}' registered successfully"}), 200 @@ -93,7 +99,7 @@ def login_user(): return jsonify({"error": "Invalid username or password"}), 401 except Exception as e: print(traceback.format_exc()) - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 + return jsonify({"error": f"An unexpected error occurred"}), 500 @app.route("/profile", methods=["GET"]) @@ -111,7 +117,95 @@ def get_user_datasets(): current_user = int(get_jwt_identity()) return jsonify(dataset_manager.get_user_datasets(current_user)), 200 -@app.route("/upload", methods=["POST"]) +@app.route("/datasets/sources", methods=["GET"]) +def get_dataset_sources(): + list_metadata = list(get_connector_metadata().values()) + return jsonify(list_metadata) + +@app.route("/datasets/scrape", methods=["POST"]) +@jwt_required() +def scrape_data(): + data = request.get_json() + connector_metadata = get_connector_metadata() + + # Strong validation needed, otherwise data goes to Celery and crashes silently + if not data or "sources" not in data: + return jsonify({"error": "Sources must be provided"}), 400 + + if "name" not in data or not str(data["name"]).strip(): + return jsonify({"error": "Dataset name is required"}), 400 + + dataset_name = data["name"].strip() + user_id = int(get_jwt_identity()) + + source_configs = data["sources"] + + if not isinstance(source_configs, list) or len(source_configs) == 0: + return jsonify({"error": "Sources must be a non-empty list"}), 400 + + for source in source_configs: + if not isinstance(source, dict): + return jsonify({"error": "Each source must be an object"}), 400 + + if "name" not in source: + return jsonify({"error": "Each source must contain a name"}), 400 + + name = source["name"] + limit = source.get("limit", 1000) + category = source.get("category") + search = source.get("search") + + if limit: + try: + limit = int(limit) + except (ValueError, TypeError): + return jsonify({"error": "Limit must be an integer"}), 400 + + if limit > 1000: + limit = 1000 + + if name not in connector_metadata: + return jsonify({"error": "Source not supported"}), 400 + + if search and not connector_metadata[name]["search_enabled"]: + return jsonify({"error": f"Source {name} does not support search"}), 400 + + if category and not connector_metadata[name]["categories_enabled"]: + return jsonify({"error": f"Source {name} does not support categories"}), 400 + + if category and not connectors[name]().category_exists(category): + return jsonify({"error": f"Category does not exist for {name}"}), 400 + + try: + dataset_id = dataset_manager.save_dataset_info( + user_id, + dataset_name, + default_topic_list + ) + + dataset_manager.set_dataset_status( + dataset_id, + "fetching", + f"Data is being fetched from {', '.join(source['name'] for source in source_configs)}" + ) + + fetch_and_process_dataset.delay( + dataset_id, + source_configs, + default_topic_list + ) + except Exception: + print(traceback.format_exc()) + return jsonify({"error": "Failed to queue dataset processing"}), 500 + + + return jsonify({ + "message": "Dataset queued for processing", + "dataset_id": dataset_id, + "status": "processing" + }), 202 + +@app.route("/datasets/upload", methods=["POST"]) @jwt_required() def upload_data(): if "posts" not in request.files or "topics" not in request.files: @@ -151,9 +245,9 @@ def upload_data(): } ), 202 except ValueError as e: - return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400 + return jsonify({"error": f"Failed to read JSONL file"}), 400 except Exception as e: - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 + return jsonify({"error": f"An unexpected error occurred"}), 500 @app.route("/dataset/", methods=["GET"]) @jwt_required() @@ -256,10 +350,10 @@ def content_endpoint(dataset_id): except NonExistentDatasetException: return jsonify({"error": "Dataset does not exist"}), 404 except ValueError as e: - return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 + return jsonify({"error": f"Malformed or missing data"}), 400 except Exception as e: print(traceback.format_exc()) - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 + return jsonify({"error": f"An unexpected error occurred"}), 500 @app.route("/dataset//summary", methods=["GET"]) @@ -278,10 +372,10 @@ def get_summary(dataset_id): except NonExistentDatasetException: return jsonify({"error": "Dataset does not exist"}), 404 except ValueError as e: - return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 + return jsonify({"error": f"Malformed or missing data"}), 400 except Exception as e: print(traceback.format_exc()) - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 + return jsonify({"error": f"An unexpected error occurred"}), 500 @app.route("/dataset//time", methods=["GET"]) @@ -300,10 +394,10 @@ def get_time_analysis(dataset_id): except NonExistentDatasetException: return jsonify({"error": "Dataset does not exist"}), 404 except ValueError as e: - return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 + return jsonify({"error": f"Malformed or missing data"}), 400 except Exception as e: print(traceback.format_exc()) - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 + return jsonify({"error": f"An unexpected error occurred"}), 500 @app.route("/dataset//user", methods=["GET"]) @@ -322,10 +416,10 @@ def get_user_analysis(dataset_id): except NonExistentDatasetException: return jsonify({"error": "Dataset does not exist"}), 404 except ValueError as e: - return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 + return jsonify({"error": f"Malformed or missing data"}), 400 except Exception as e: print(traceback.format_exc()) - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 + return jsonify({"error": f"An unexpected error occurred"}), 500 @app.route("/dataset//cultural", methods=["GET"]) @@ -344,10 +438,10 @@ def get_cultural_analysis(dataset_id): except NonExistentDatasetException: return jsonify({"error": "Dataset does not exist"}), 404 except ValueError as e: - return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 + return jsonify({"error": f"Malformed or missing data"}), 400 except Exception as e: print(traceback.format_exc()) - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 + return jsonify({"error": f"An unexpected error occurred"}), 500 @app.route("/dataset//interaction", methods=["GET"]) @@ -366,10 +460,10 @@ def get_interaction_analysis(dataset_id): except NonExistentDatasetException: return jsonify({"error": "Dataset does not exist"}), 404 except ValueError as e: - return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 + return jsonify({"error": f"Malformed or missing data"}), 400 except Exception as e: print(traceback.format_exc()) - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 + return jsonify({"error": f"An unexpected error occurred"}), 500 if __name__ == "__main__": diff --git a/server/connectors/base.py b/server/connectors/base.py new file mode 100644 index 0000000..48163b5 --- /dev/null +++ b/server/connectors/base.py @@ -0,0 +1,29 @@ +from abc import ABC, abstractmethod +from dto.post import Post + +class BaseConnector(ABC): + # Each subclass declares these at the class level + source_name: str # machine-readable: "reddit", "youtube" + display_name: str # human-readable: "Reddit", "YouTube" + required_env: list[str] = [] # env vars needed to activate + + search_enabled: bool + categories_enabled: bool + + @classmethod + def is_available(cls) -> bool: + """Returns True if all required env vars are set.""" + import os + return all(os.getenv(var) for var in cls.required_env) + + @abstractmethod + def get_new_posts_by_search(self, + search: str = None, + category: str = None, + post_limit: int = 10 + ) -> list[Post]: + ... + + @abstractmethod + def category_exists(self, category: str) -> bool: + ... \ No newline at end of file diff --git a/connectors/boards_api.py b/server/connectors/boards_api.py similarity index 70% rename from connectors/boards_api.py rename to server/connectors/boards_api.py index 1b63aa9..57e79f7 100644 --- a/connectors/boards_api.py +++ b/server/connectors/boards_api.py @@ -7,6 +7,7 @@ from dto.post import Post from dto.comment import Comment from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, as_completed +from server.connectors.base import BaseConnector logger = logging.getLogger(__name__) @@ -14,25 +15,64 @@ HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)" } -class BoardsAPI: - def __init__(self): - self.url = "https://www.boards.ie" - self.source_name = "Boards.ie" +class BoardsAPI(BaseConnector): + source_name: str = "boards.ie" + display_name: str = "Boards.ie" - def get_new_category_posts(self, category: str, post_limit: int, comment_limit: int) -> list[Post]: + categories_enabled: bool = True + search_enabled: bool = False + + def __init__(self): + self.base_url = "https://www.boards.ie" + + def get_new_posts_by_search(self, + search: str, + category: str, + post_limit: int + ) -> list[Post]: + if search: + raise NotImplementedError("Search not compatible with boards.ie") + + if category: + return self._get_posts(f"{self.base_url}/categories/{category}", post_limit) + else: + return self._get_posts(f"{self.base_url}/discussions", post_limit) + + def category_exists(self, category: str) -> bool: + if not category: + return False + + url = f"{self.base_url}/categories/{category}" + + try: + response = requests.head(url, headers=HEADERS, allow_redirects=True) + + if response.status_code == 200: + return True + if response.status_code == 404: + return False + + # fallback if HEAD not supported + response = requests.get(url, headers=HEADERS) + return response.status_code == 200 + + except requests.RequestException as e: + logger.error(f"Error checking category '{category}': {e}") + return False + + ## Private + def _get_posts(self, url, limit) -> list[Post]: urls = [] current_page = 1 - logger.info(f"Fetching posts from category: {category}") - - while len(urls) < post_limit: - url = f"{self.url}/categories/{category}/p{current_page}" + while len(urls) < limit: + url = f"{url}/p{current_page}" html = self._fetch_page(url) soup = BeautifulSoup(html, "html.parser") - logger.debug(f"Processing page {current_page} for category {category}") + logger.debug(f"Processing page {current_page} for link: {url}") for a in soup.select("a.threadbit-threadlink"): - if len(urls) >= post_limit: + if len(urls) >= limit: break href = a.get("href") @@ -41,14 +81,14 @@ class BoardsAPI: current_page += 1 - logger.debug(f"Fetched {len(urls)} post URLs from category {category}") + logger.debug(f"Fetched {len(urls)} post URLs") # Fetch post details for each URL and create Post objects posts = [] def fetch_and_parse(post_url): html = self._fetch_page(post_url) - post = self._parse_thread(html, post_url, comment_limit) + post = self._parse_thread(html, post_url) return post with ThreadPoolExecutor(max_workers=30) as executor: @@ -71,7 +111,7 @@ class BoardsAPI: response.raise_for_status() return response.text - def _parse_thread(self, html: str, post_url: str, comment_limit: int) -> Post: + def _parse_thread(self, html: str, post_url: str) -> Post: soup = BeautifulSoup(html, "html.parser") # Author @@ -100,7 +140,7 @@ class BoardsAPI: title = title_tag.text.strip() if title_tag else None # Comments - comments = self._parse_comments(post_url, post_num, comment_limit) + comments = self._parse_comments(post_url, post_num) post = Post( id=post_num, @@ -115,11 +155,11 @@ class BoardsAPI: return post - def _parse_comments(self, url: str, post_id: str, comment_limit: int) -> list[Comment]: + def _parse_comments(self, url: str, post_id: str) -> list[Comment]: comments = [] current_url = url - while current_url and len(comments) < comment_limit: + while current_url: html = self._fetch_page(current_url) page_comments = self._parse_page_comments(html, post_id) comments.extend(page_comments) @@ -130,7 +170,7 @@ class BoardsAPI: if next_link and next_link.get('href'): href = next_link.get('href') - current_url = href if href.startswith('http') else self.url + href + current_url = href if href.startswith('http') else url + href else: current_url = None diff --git a/connectors/reddit_api.py b/server/connectors/reddit_api.py similarity index 74% rename from connectors/reddit_api.py rename to server/connectors/reddit_api.py index 0ec6100..7955fca 100644 --- a/connectors/reddit_api.py +++ b/server/connectors/reddit_api.py @@ -5,44 +5,63 @@ import time from dto.post import Post from dto.user import User from dto.comment import Comment +from server.connectors.base import BaseConnector logger = logging.getLogger(__name__) -class RedditAPI: +class RedditAPI(BaseConnector): + source_name: str = "reddit" + display_name: str = "Reddit" + search_enabled: bool = True + categories_enabled: bool = True + def __init__(self): self.url = "https://www.reddit.com/" - self.source_name = "Reddit" # Public Methods # - def search_new_subreddit_posts(self, search: str, subreddit: str, limit: int) -> list[Post]: - params = { - 'q': search, - 'limit': limit, - 'restrict_sr': 'on', - 'sort': 'new' - } - - logger.info(f"Searching subreddit '{subreddit}' for '{search}' with limit {limit}") - url = f"r/{subreddit}/search.json" - posts = [] + def get_new_posts_by_search(self, + search: str, + category: str, + post_limit: int + ) -> list[Post]: - while len(posts) < limit: - batch_limit = min(100, limit - len(posts)) + prefix = f"r/{category}/" if category else "" + params = {'limit': post_limit} + + if search: + endpoint = f"{prefix}search.json" + params.update({ + 'q': search, + 'sort': 'new', + 'restrict_sr': 'on' if category else 'off' + }) + else: + endpoint = f"{prefix}new.json" + + posts = [] + after = None + + while len(posts) < post_limit: + batch_limit = min(100, post_limit - len(posts)) params['limit'] = batch_limit + if after: + params['after'] = after - data = self._fetch_post_overviews(url, params) - batch_posts = self._parse_posts(data) - - logger.debug(f"Fetched {len(batch_posts)} posts from search in subreddit {subreddit}") - - if not batch_posts: + data = self._fetch_post_overviews(endpoint, params) + + if not data or 'data' not in data or not data['data'].get('children'): break + batch_posts = self._parse_posts(data) posts.extend(batch_posts) - return posts + after = data['data'].get('after') + if not after: + break + + return posts[:post_limit] - def get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]: + def _get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]: posts = [] after = None url = f"r/{subreddit}/new.json" @@ -75,6 +94,17 @@ class RedditAPI: data = self._fetch_post_overviews(f"user/{username}/about.json", {}) return self._parse_user(data) + def category_exists(self, category: str) -> bool: + try: + data = self._fetch_post_overviews(f"r/{category}/about.json", {}) + return ( + data is not None + and 'data' in data + and data['data'].get('id') is not None + ) + except Exception: + return False + ## Private Methods ## def _parse_posts(self, data) -> list[Post]: posts = [] diff --git a/server/connectors/registry.py b/server/connectors/registry.py new file mode 100644 index 0000000..f2371e6 --- /dev/null +++ b/server/connectors/registry.py @@ -0,0 +1,30 @@ +import pkgutil +import importlib +import server.connectors +from server.connectors.base import BaseConnector + +def _discover_connectors() -> list[type[BaseConnector]]: + """Walk the connectors package and collect all BaseConnector subclasses.""" + for _, module_name, _ in pkgutil.iter_modules(server.connectors.__path__): + if module_name in ("base", "registry"): + continue + importlib.import_module(f"server.connectors.{module_name}") + + return [ + cls for cls in BaseConnector.__subclasses__() + if cls.source_name # guard against abstract intermediaries + ] + +def get_available_connectors() -> dict[str, type[BaseConnector]]: + return {c.source_name: c for c in _discover_connectors() if c.is_available()} + +def get_connector_metadata() -> dict[str, dict]: + res = {} + for id, obj in get_available_connectors().items(): + res[id] = {"id": id, + "label": obj.display_name, + "search_enabled": obj.search_enabled, + "categories_enabled": obj.categories_enabled + } + + return res \ No newline at end of file diff --git a/server/connectors/youtube_api.py b/server/connectors/youtube_api.py new file mode 100644 index 0000000..7b014d0 --- /dev/null +++ b/server/connectors/youtube_api.py @@ -0,0 +1,96 @@ +import os +import datetime + +from dotenv import load_dotenv +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from dto.post import Post +from dto.comment import Comment +from server.connectors.base import BaseConnector + +load_dotenv() + +API_KEY = os.getenv("YOUTUBE_API_KEY") + +class YouTubeAPI(BaseConnector): + source_name: str = "youtube" + display_name: str = "YouTube" + search_enabled: bool = True + categories_enabled: bool = False + + def __init__(self): + self.youtube = build('youtube', 'v3', developerKey=API_KEY) + + def get_new_posts_by_search(self, + search: str, + category: str, + post_limit: int + ) -> list[Post]: + videos = self._search_videos(search, post_limit) + posts = [] + + for video in videos: + video_id = video['id']['videoId'] + snippet = video['snippet'] + title = snippet['title'] + description = snippet['description'] + published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp() + channel_title = snippet['channelTitle'] + + comments = [] + comments_data = self._get_video_comments(video_id) + for comment_thread in comments_data: + comment_snippet = comment_thread['snippet']['topLevelComment']['snippet'] + comment = Comment( + id=comment_thread['id'], + post_id=video_id, + content=comment_snippet['textDisplay'], + author=comment_snippet['authorDisplayName'], + timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(), + reply_to=None, + source=self.source_name + ) + + comments.append(comment) + + post = Post( + id=video_id, + content=f"{title}\n\n{description}", + author=channel_title, + timestamp=published_at, + url=f"https://www.youtube.com/watch?v={video_id}", + title=title, + source=self.source_name, + comments=comments + ) + + posts.append(post) + + return posts + + def category_exists(self, category): + return True + + def _search_videos(self, query, limit): + request = self.youtube.search().list( + q=query, + part='snippet', + type='video', + maxResults=limit + ) + response = request.execute() + return response.get('items', []) + + def _get_video_comments(self, video_id): + request = self.youtube.commentThreads().list( + part='snippet', + videoId=video_id, + textFormat='plainText' + ) + + try: + response = request.execute() + except HttpError as e: + print(f"Error fetching comments for video {video_id}: {e}") + return [] + return response.get('items', []) diff --git a/server/core/auth.py b/server/core/auth.py index 625c3c2..34bb93c 100644 --- a/server/core/auth.py +++ b/server/core/auth.py @@ -1,6 +1,10 @@ +import re + from server.db.database import PostgresConnector from flask_bcrypt import Bcrypt +EMAIL_REGEX = re.compile(r"[^@]+@[^@]+\.[^@]+") + class AuthManager: def __init__(self, db: PostgresConnector, bcrypt: Bcrypt): self.db = db @@ -18,6 +22,12 @@ class AuthManager: def register_user(self, username, email, password): hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8") + if len(username) < 3: + raise ValueError("Username must be longer than 3 characters") + + if not EMAIL_REGEX.match(email): + raise ValueError("Please enter a valid email address") + if self.get_user_by_email(email): raise ValueError("Email already registered") diff --git a/server/core/datasets.py b/server/core/datasets.py index 5886cfc..4690454 100644 --- a/server/core/datasets.py +++ b/server/core/datasets.py @@ -1,7 +1,7 @@ import pandas as pd from server.db.database import PostgresConnector from psycopg2.extras import Json -from server.exceptions import NotAuthorisedException, NonExistentDatasetException +from server.exceptions import NonExistentDatasetException class DatasetManager: def __init__(self, db: PostgresConnector): @@ -114,7 +114,7 @@ class DatasetManager: self.db.execute_batch(query, values) def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None): - if status not in ["processing", "complete", "error"]: + if status not in ["fetching", "processing", "complete", "error"]: raise ValueError("Invalid status") query = """ diff --git a/server/db/schema.sql b/server/db/schema.sql index 051a396..4550633 100644 --- a/server/db/schema.sql +++ b/server/db/schema.sql @@ -23,7 +23,7 @@ CREATE TABLE datasets ( -- Enforce valid states CONSTRAINT datasets_status_check - CHECK (status IN ('processing', 'complete', 'error')) + CHECK (status IN ('fetching', 'processing', 'complete', 'error')) ); CREATE TABLE events ( diff --git a/server/queue/tasks.py b/server/queue/tasks.py index a089596..95248d1 100644 --- a/server/queue/tasks.py +++ b/server/queue/tasks.py @@ -1,9 +1,13 @@ import pandas as pd +import logging from server.queue.celery_app import celery from server.analysis.enrichment import DatasetEnrichment from server.db.database import PostgresConnector from server.core.datasets import DatasetManager +from server.connectors.registry import get_available_connectors + +logger = logging.getLogger(__name__) @celery.task(bind=True, max_retries=3) def process_dataset(self, dataset_id: int, posts: list, topics: dict): @@ -16,6 +20,41 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict): processor = DatasetEnrichment(df, topics) enriched_df = processor.enrich() + dataset_manager.save_dataset_content(dataset_id, enriched_df) + dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully") + except Exception as e: + dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}") + +@celery.task(bind=True, max_retries=3) +def fetch_and_process_dataset(self, + dataset_id: int, + source_info: list[dict], + topics: dict): + connectors = get_available_connectors() + db = PostgresConnector() + dataset_manager = DatasetManager(db) + posts = [] + + try: + for metadata in source_info: + name = metadata["name"] + search = metadata.get("search") + category = metadata.get("category") + limit = metadata.get("limit", 100) + + connector = connectors[name]() + raw_posts = connector.get_new_posts_by_search( + search=search, + category=category, + post_limit=limit + ) + posts.extend(post.to_dict() for post in raw_posts) + + df = pd.DataFrame(posts) + + processor = DatasetEnrichment(df, topics) + enriched_df = processor.enrich() + dataset_manager.save_dataset_content(dataset_id, enriched_df) dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully") except Exception as e: diff --git a/server/topics.json b/server/topics.json new file mode 100644 index 0000000..271913a --- /dev/null +++ b/server/topics.json @@ -0,0 +1,67 @@ +{ + "Personal Life": "daily life, life updates, what happened today, personal stories, life events, reflections", + + "Relationships": "dating, relationships, breakups, friendships, family relationships, marriage, relationship advice", + + "Family & Parenting": "parents, parenting, children, raising kids, family dynamics, family stories", + + "Work & Careers": "jobs, workplaces, office life, promotions, quitting jobs, career advice, workplace drama", + + "Education": "school, studying, exams, university, homework, academic pressure, learning experiences", + + "Money & Finance": "saving money, debt, budgeting, cost of living, financial advice, personal finance", + + "Health & Fitness": "exercise, gym, workouts, running, diet, fitness routines, weight loss", + + "Mental Health": "stress, anxiety, depression, burnout, therapy, emotional wellbeing", + + "Food & Cooking": "meals, cooking, recipes, restaurants, snacks, food opinions", + + "Travel": "holidays, trips, tourism, travel experiences, airports, flights, travel tips", + + "Entertainment": "movies, TV shows, streaming services, celebrities, pop culture", + + "Music": "songs, albums, artists, concerts, music opinions", + + "Gaming": "video games, gaming culture, consoles, PC gaming, esports", + + "Sports": "sports matches, teams, players, competitions, sports opinions", + + "Technology": "phones, gadgets, apps, AI, software, tech trends", + + "Internet Culture": "memes, viral trends, online jokes, internet drama, trending topics", + + "Social Media": "platforms, influencers, content creators, algorithms, online communities", + + "News & Current Events": "breaking news, world events, major incidents, public discussions", + + "Politics": "political debates, elections, government policies, ideology", + + "Culture & Society": "social issues, cultural trends, generational debates, societal changes", + + "Identity & Lifestyle": "personal identity, lifestyle choices, values, self-expression", + + "Hobbies & Interests": "art, photography, crafts, collecting, hobbies", + + "Fashion & Beauty": "clothing, style, makeup, skincare, fashion trends", + + "Animals & Pets": "pets, animal videos, pet care, wildlife", + + "Humour": "jokes, funny stories, sarcasm, memes", + + "Opinions & Debates": "hot takes, controversial opinions, arguments, discussions", + + "Advice & Tips": "life advice, tutorials, how-to tips, recommendations", + + "Product Reviews": "reviews, recommendations, experiences with products", + + "Complaints & Rants": "frustrations, complaining, venting about things", + + "Motivation & Inspiration": "motivational quotes, success stories, encouragement", + + "Questions & Curiosity": "asking questions, seeking opinions, curiosity posts", + + "Celebrations & Achievements": "birthdays, milestones, achievements, good news", + + "Random Thoughts": "shower thoughts, observations, random ideas" +} \ No newline at end of file diff --git a/server/utils.py b/server/utils.py index 815739f..fb42953 100644 --- a/server/utils.py +++ b/server/utils.py @@ -1,4 +1,5 @@ import datetime +import os from flask import request def parse_datetime_filter(value): @@ -48,3 +49,9 @@ def get_request_filters() -> dict: filters["data_sources"] = data_sources return filters + +def get_env(name: str) -> str: + value = os.getenv(name) + if not value: + raise RuntimeError(f"Missing required environment variable: {name}") + return value