diff --git a/connectors/youtube_api.py b/connectors/youtube_api.py
deleted file mode 100644
index d0e00a3..0000000
--- a/connectors/youtube_api.py
+++ /dev/null
@@ -1,84 +0,0 @@
-import os
-import datetime
-
-from dotenv import load_dotenv
-from googleapiclient.discovery import build
-from googleapiclient.errors import HttpError
-from dto.post import Post
-from dto.comment import Comment
-
-load_dotenv()
-
-API_KEY = os.getenv("YOUTUBE_API_KEY")
-
-class YouTubeAPI:
- def __init__(self):
- self.youtube = build('youtube', 'v3', developerKey=API_KEY)
-
- def search_videos(self, query, limit):
- request = self.youtube.search().list(
- q=query,
- part='snippet',
- type='video',
- maxResults=limit
- )
- response = request.execute()
- return response.get('items', [])
-
- def get_video_comments(self, video_id, limit):
- request = self.youtube.commentThreads().list(
- part='snippet',
- videoId=video_id,
- maxResults=limit,
- textFormat='plainText'
- )
-
- try:
- response = request.execute()
- except HttpError as e:
- print(f"Error fetching comments for video {video_id}: {e}")
- return []
- return response.get('items', [])
-
- def fetch_videos(self, query, video_limit, comment_limit) -> list[Post]:
- videos = self.search_videos(query, video_limit)
- posts = []
-
- for video in videos:
- video_id = video['id']['videoId']
- snippet = video['snippet']
- title = snippet['title']
- description = snippet['description']
- published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
- channel_title = snippet['channelTitle']
-
- comments = []
- comments_data = self.get_video_comments(video_id, comment_limit)
- for comment_thread in comments_data:
- comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
- comment = Comment(
- id=comment_thread['id'],
- post_id=video_id,
- content=comment_snippet['textDisplay'],
- author=comment_snippet['authorDisplayName'],
- timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(),
- reply_to=None,
- source="YouTube"
- )
-
- comments.append(comment)
-
- post = Post(
- id=video_id,
- content=f"{title}\n\n{description}",
- author=channel_title,
- timestamp=published_at,
- url=f"https://www.youtube.com/watch?v={video_id}",
- title=title,
- source="YouTube",
- comments=comments
- )
-
- posts.append(post)
-
- return posts
\ No newline at end of file
diff --git a/create_dataset.py b/create_dataset.py
deleted file mode 100644
index 791b2bd..0000000
--- a/create_dataset.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import json
-import logging
-from connectors.reddit_api import RedditAPI
-from connectors.boards_api import BoardsAPI
-from connectors.youtube_api import YouTubeAPI
-
-posts_file = 'posts_test.jsonl'
-
-reddit_connector = RedditAPI()
-boards_connector = BoardsAPI()
-youtube_connector = YouTubeAPI()
-
-logging.basicConfig(level=logging.DEBUG)
-logging.getLogger("urllib3").setLevel(logging.WARNING)
-
-def remove_empty_posts(posts):
- return [post for post in posts if post.content.strip() != ""]
-
-def save_to_jsonl(filename, posts):
- with open(filename, 'a', encoding='utf-8') as f:
- for post in posts:
- # Convert post object to dict if it's a dataclass
- data = post.to_dict()
- f.write(json.dumps(data) + '\n')
-
-
-def main():
- boards_posts = boards_connector.get_new_category_posts('cork-city', 1200, 1200)
- save_to_jsonl(posts_file, boards_posts)
-
- reddit_posts = reddit_connector.get_new_subreddit_posts('cork', 1200)
- reddit_posts = remove_empty_posts(reddit_posts)
- save_to_jsonl(posts_file, reddit_posts)
-
- ireland_posts = reddit_connector.search_new_subreddit_posts('cork', 'ireland', 1200)
- ireland_posts = remove_empty_posts(ireland_posts)
- save_to_jsonl(posts_file, ireland_posts)
-
- youtube_videos = youtube_connector.fetch_videos('cork city', 1200, 1200)
- save_to_jsonl(posts_file, youtube_videos)
-
-if __name__ == "__main__":
- main()
\ No newline at end of file
diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml
index 96c3430..dc3edb2 100644
--- a/docker-compose.dev.yml
+++ b/docker-compose.dev.yml
@@ -43,7 +43,7 @@ services:
- .env
command: >
celery -A server.queue.celery_app.celery worker
- --loglevel=info
+ --loglevel=debug
--pool=solo
depends_on:
- postgres
diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx
index b1e6045..1b10f61 100644
--- a/frontend/src/App.tsx
+++ b/frontend/src/App.tsx
@@ -5,6 +5,7 @@ import DatasetsPage from "./pages/Datasets";
import DatasetStatusPage from "./pages/DatasetStatus";
import LoginPage from "./pages/Login";
import UploadPage from "./pages/Upload";
+import AutoScrapePage from "./pages/AutoScrape";
import StatPage from "./pages/Stats";
import { getDocumentTitle } from "./utils/documentTitle";
import DatasetEditPage from "./pages/DatasetEdit";
@@ -22,6 +23,7 @@ function App() {
} />
} />
} />
+ } />
} />
} />
} />
diff --git a/frontend/src/pages/AutoScrape.tsx b/frontend/src/pages/AutoScrape.tsx
new file mode 100644
index 0000000..9e9d336
--- /dev/null
+++ b/frontend/src/pages/AutoScrape.tsx
@@ -0,0 +1,338 @@
+import axios from "axios";
+import { useEffect, useState } from "react";
+import { useNavigate } from "react-router-dom";
+import StatsStyling from "../styles/stats_styling";
+
+const styles = StatsStyling;
+const API_BASE_URL = import.meta.env.VITE_BACKEND_URL;
+
+type SourceOption = {
+ id: string;
+ label: string;
+ search_enabled?: boolean;
+ categories_enabled?: boolean;
+ searchEnabled?: boolean;
+ categoriesEnabled?: boolean;
+};
+
+type SourceConfig = {
+ sourceName: string;
+ limit: string;
+ search: string;
+ category: string;
+};
+
+const buildEmptySourceConfig = (sourceName = ""): SourceConfig => ({
+ sourceName,
+ limit: "100",
+ search: "",
+ category: "",
+});
+
+const supportsSearch = (source?: SourceOption): boolean =>
+ Boolean(source?.search_enabled ?? source?.searchEnabled);
+
+const supportsCategories = (source?: SourceOption): boolean =>
+ Boolean(source?.categories_enabled ?? source?.categoriesEnabled);
+
+const AutoScrapePage = () => {
+ const navigate = useNavigate();
+ const [datasetName, setDatasetName] = useState("");
+ const [sourceOptions, setSourceOptions] = useState([]);
+ const [sourceConfigs, setSourceConfigs] = useState([]);
+ const [returnMessage, setReturnMessage] = useState("");
+ const [isLoadingSources, setIsLoadingSources] = useState(true);
+ const [isSubmitting, setIsSubmitting] = useState(false);
+ const [hasError, setHasError] = useState(false);
+
+ useEffect(() => {
+ axios
+ .get(`${API_BASE_URL}/datasets/sources`)
+ .then((response) => {
+ const options = response.data || [];
+ setSourceOptions(options);
+ setSourceConfigs([buildEmptySourceConfig(options[0]?.id || "")]);
+ })
+ .catch((requestError: unknown) => {
+ setHasError(true);
+ if (axios.isAxiosError(requestError)) {
+ setReturnMessage(
+ `Failed to load available sources: ${String(
+ requestError.response?.data?.error || requestError.message
+ )}`
+ );
+ } else {
+ setReturnMessage("Failed to load available sources.");
+ }
+ })
+ .finally(() => {
+ setIsLoadingSources(false);
+ });
+ }, []);
+
+ const updateSourceConfig = (index: number, field: keyof SourceConfig, value: string) => {
+ setSourceConfigs((previous) =>
+ previous.map((config, configIndex) =>
+ configIndex === index
+ ? field === "sourceName"
+ ? { ...config, sourceName: value, search: "", category: "" }
+ : { ...config, [field]: value }
+ : config
+ )
+ );
+ };
+
+ const getSourceOption = (sourceName: string) =>
+ sourceOptions.find((option) => option.id === sourceName);
+
+ const addSourceConfig = () => {
+ setSourceConfigs((previous) => [
+ ...previous,
+ buildEmptySourceConfig(sourceOptions[0]?.id || ""),
+ ]);
+ };
+
+ const removeSourceConfig = (index: number) => {
+ setSourceConfigs((previous) => previous.filter((_, configIndex) => configIndex !== index));
+ };
+
+ const autoScrape = async () => {
+ const token = localStorage.getItem("access_token");
+ if (!token) {
+ setHasError(true);
+ setReturnMessage("You must be signed in to auto scrape a dataset.");
+ return;
+ }
+
+ const normalizedDatasetName = datasetName.trim();
+ if (!normalizedDatasetName) {
+ setHasError(true);
+ setReturnMessage("Please add a dataset name before continuing.");
+ return;
+ }
+
+ if (sourceConfigs.length === 0) {
+ setHasError(true);
+ setReturnMessage("Please add at least one source.");
+ return;
+ }
+
+ const normalizedSources = sourceConfigs.map((source) => {
+ const sourceOption = getSourceOption(source.sourceName);
+
+ return {
+ name: source.sourceName,
+ limit: Number(source.limit || 100),
+ search: supportsSearch(sourceOption) ? source.search.trim() || undefined : undefined,
+ category: supportsCategories(sourceOption)
+ ? source.category.trim() || undefined
+ : undefined,
+ };
+ });
+
+ const invalidSource = normalizedSources.find(
+ (source) => !source.name || !Number.isFinite(source.limit) || source.limit <= 0
+ );
+
+ if (invalidSource) {
+ setHasError(true);
+ setReturnMessage("Every source needs a name and a limit greater than zero.");
+ return;
+ }
+
+ try {
+ setIsSubmitting(true);
+ setHasError(false);
+ setReturnMessage("");
+
+ const response = await axios.post(
+ `${API_BASE_URL}/datasets/scrape`,
+ {
+ name: normalizedDatasetName,
+ sources: normalizedSources,
+ },
+ {
+ headers: {
+ Authorization: `Bearer ${token}`,
+ },
+ }
+ );
+
+ const datasetId = Number(response.data.dataset_id);
+
+ setReturnMessage(
+ `Auto scrape queued successfully (dataset #${datasetId}). Redirecting to processing status...`
+ );
+
+ setTimeout(() => {
+ navigate(`/dataset/${datasetId}/status`);
+ }, 400);
+ } catch (requestError: unknown) {
+ setHasError(true);
+ if (axios.isAxiosError(requestError)) {
+ const message = String(
+ requestError.response?.data?.error || requestError.message || "Auto scrape failed."
+ );
+ setReturnMessage(`Auto scrape failed: ${message}`);
+ } else {
+ setReturnMessage("Auto scrape failed due to an unexpected error.");
+ }
+ } finally {
+ setIsSubmitting(false);
+ }
+ };
+
+ return (
+
+
+
+
+
Auto Scrape Dataset
+
+ Select sources and scrape settings, then queue processing automatically.
+
+
+
+
+
+
+
+
Dataset Name
+
Use a clear label so you can identify this run later.
+
setDatasetName(event.target.value)}
+ />
+
+
+
+
Sources
+
+ Configure source, limit, optional search, and optional category.
+
+
+ {isLoadingSources &&
Loading sources...
}
+
+ {!isLoadingSources && sourceOptions.length === 0 && (
+
No source connectors are currently available.
+ )}
+
+ {!isLoadingSources && sourceOptions.length > 0 && (
+
+ )}
+
+
+
+
+ {returnMessage ||
+ "After queueing, your dataset is fetched and processed in the background automatically."}
+
+
+
+ );
+};
+
+export default AutoScrapePage;
diff --git a/frontend/src/pages/Datasets.tsx b/frontend/src/pages/Datasets.tsx
index 4c79cdc..d06d32a 100644
--- a/frontend/src/pages/Datasets.tsx
+++ b/frontend/src/pages/Datasets.tsx
@@ -9,7 +9,7 @@ const API_BASE_URL = import.meta.env.VITE_BACKEND_URL;
type DatasetItem = {
id: number;
name?: string;
- status?: "processing" | "complete" | "error" | string;
+ status?: "processing" | "complete" | "error" | "fetching" | string;
status_message?: string | null;
completed_at?: string | null;
created_at?: string | null;
@@ -50,7 +50,24 @@ const DatasetsPage = () => {
}, []);
if (loading) {
- return Loading datasets...
;
+ return (
+
+
+
+
+
+
Loading datasets
+
+
+
+
+
+
+ )
}
return (
@@ -63,9 +80,18 @@ const DatasetsPage = () => {
View and reopen datasets you previously uploaded.
-
+
+
+
+
{error && (
@@ -93,7 +119,7 @@ const DatasetsPage = () => {
{datasets.map((dataset) => {
- const isComplete = dataset.status === "complete";
+ const isComplete = dataset.status === "complete" || dataset.status === "error";
const editPath = `/dataset/${dataset.id}/edit`;
const targetPath = isComplete
? `/dataset/${dataset.id}/stats`
diff --git a/frontend/src/pages/Upload.tsx b/frontend/src/pages/Upload.tsx
index 93383dc..0799f9b 100644
--- a/frontend/src/pages/Upload.tsx
+++ b/frontend/src/pages/Upload.tsx
@@ -40,7 +40,7 @@ const UploadPage = () => {
setHasError(false);
setReturnMessage("");
- const response = await axios.post(`${API_BASE_URL}/upload`, formData, {
+ const response = await axios.post(`${API_BASE_URL}/datasets/upload`, formData, {
headers: {
"Content-Type": "multipart/form-data",
},
diff --git a/frontend/src/utils/documentTitle.ts b/frontend/src/utils/documentTitle.ts
index 904a6a8..5c7d00d 100644
--- a/frontend/src/utils/documentTitle.ts
+++ b/frontend/src/utils/documentTitle.ts
@@ -3,6 +3,7 @@ const DEFAULT_TITLE = "Ethnograph View";
const STATIC_TITLES: Record = {
"/login": "Sign In",
"/upload": "Upload Dataset",
+ "/auto-scrape": "Auto Scrape Dataset",
"/datasets": "My Datasets",
};
diff --git a/server/app.py b/server/app.py
index 7cbf9d3..f373843 100644
--- a/server/app.py
+++ b/server/app.py
@@ -19,32 +19,38 @@ from server.exceptions import NotAuthorisedException, NonExistentDatasetExceptio
from server.db.database import PostgresConnector
from server.core.auth import AuthManager
from server.core.datasets import DatasetManager
-from server.utils import get_request_filters
-from server.queue.tasks import process_dataset
+from server.utils import get_request_filters, get_env
+from server.queue.tasks import process_dataset, fetch_and_process_dataset
+from server.connectors.registry import get_available_connectors, get_connector_metadata
app = Flask(__name__)
# Env Variables
load_dotenv()
-frontend_url = os.getenv("FRONTEND_URL", "http://localhost:5173")
-jwt_secret_key = os.getenv("JWT_SECRET_KEY", "super-secret-change-this")
-jwt_access_token_expires = int(
- os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200)
-) # Default to 20 minutes
+max_fetch_limit = int(get_env("MAX_FETCH_LIMIT"))
+frontend_url = get_env("FRONTEND_URL")
+jwt_secret_key = get_env("JWT_SECRET_KEY")
+jwt_access_token_expires = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200)) # Default to 20 minutes
# Flask Configuration
CORS(app, resources={r"/*": {"origins": frontend_url}})
app.config["JWT_SECRET_KEY"] = jwt_secret_key
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = jwt_access_token_expires
+# Security
bcrypt = Bcrypt(app)
jwt = JWTManager(app)
+# Helper Objects
db = PostgresConnector()
auth_manager = AuthManager(db, bcrypt)
dataset_manager = DatasetManager(db)
stat_gen = StatGen()
+connectors = get_available_connectors()
+# Default Files
+with open("server/topics.json") as f:
+ default_topic_list = json.load(f)
@app.route("/register", methods=["POST"])
def register_user():
@@ -68,7 +74,7 @@ def register_user():
return jsonify({"error": str(e)}), 400
except Exception as e:
print(traceback.format_exc())
- return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
+ return jsonify({"error": f"An unexpected error occurred"}), 500
print(f"Registered new user: {username}")
return jsonify({"message": f"User '{username}' registered successfully"}), 200
@@ -93,7 +99,7 @@ def login_user():
return jsonify({"error": "Invalid username or password"}), 401
except Exception as e:
print(traceback.format_exc())
- return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
+ return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/profile", methods=["GET"])
@@ -111,7 +117,95 @@ def get_user_datasets():
current_user = int(get_jwt_identity())
return jsonify(dataset_manager.get_user_datasets(current_user)), 200
-@app.route("/upload", methods=["POST"])
+@app.route("/datasets/sources", methods=["GET"])
+def get_dataset_sources():
+ list_metadata = list(get_connector_metadata().values())
+ return jsonify(list_metadata)
+
+@app.route("/datasets/scrape", methods=["POST"])
+@jwt_required()
+def scrape_data():
+ data = request.get_json()
+ connector_metadata = get_connector_metadata()
+
+ # Strong validation needed, otherwise data goes to Celery and crashes silently
+ if not data or "sources" not in data:
+ return jsonify({"error": "Sources must be provided"}), 400
+
+ if "name" not in data or not str(data["name"]).strip():
+ return jsonify({"error": "Dataset name is required"}), 400
+
+ dataset_name = data["name"].strip()
+ user_id = int(get_jwt_identity())
+
+ source_configs = data["sources"]
+
+ if not isinstance(source_configs, list) or len(source_configs) == 0:
+ return jsonify({"error": "Sources must be a non-empty list"}), 400
+
+ for source in source_configs:
+ if not isinstance(source, dict):
+ return jsonify({"error": "Each source must be an object"}), 400
+
+ if "name" not in source:
+ return jsonify({"error": "Each source must contain a name"}), 400
+
+ name = source["name"]
+ limit = source.get("limit", 1000)
+ category = source.get("category")
+ search = source.get("search")
+
+ if limit:
+ try:
+ limit = int(limit)
+ except (ValueError, TypeError):
+ return jsonify({"error": "Limit must be an integer"}), 400
+
+ if limit > 1000:
+ limit = 1000
+
+ if name not in connector_metadata:
+ return jsonify({"error": "Source not supported"}), 400
+
+ if search and not connector_metadata[name]["search_enabled"]:
+ return jsonify({"error": f"Source {name} does not support search"}), 400
+
+ if category and not connector_metadata[name]["categories_enabled"]:
+ return jsonify({"error": f"Source {name} does not support categories"}), 400
+
+ if category and not connectors[name]().category_exists(category):
+ return jsonify({"error": f"Category does not exist for {name}"}), 400
+
+ try:
+ dataset_id = dataset_manager.save_dataset_info(
+ user_id,
+ dataset_name,
+ default_topic_list
+ )
+
+ dataset_manager.set_dataset_status(
+ dataset_id,
+ "fetching",
+ f"Data is being fetched from {', '.join(source['name'] for source in source_configs)}"
+ )
+
+ fetch_and_process_dataset.delay(
+ dataset_id,
+ source_configs,
+ default_topic_list
+ )
+ except Exception:
+ print(traceback.format_exc())
+ return jsonify({"error": "Failed to queue dataset processing"}), 500
+
+
+ return jsonify({
+ "message": "Dataset queued for processing",
+ "dataset_id": dataset_id,
+ "status": "processing"
+ }), 202
+
+@app.route("/datasets/upload", methods=["POST"])
@jwt_required()
def upload_data():
if "posts" not in request.files or "topics" not in request.files:
@@ -151,9 +245,9 @@ def upload_data():
}
), 202
except ValueError as e:
- return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
+ return jsonify({"error": f"Failed to read JSONL file"}), 400
except Exception as e:
- return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
+ return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/", methods=["GET"])
@jwt_required()
@@ -256,10 +350,10 @@ def content_endpoint(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
- return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
+ return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
- return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
+ return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset//summary", methods=["GET"])
@@ -278,10 +372,10 @@ def get_summary(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
- return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
+ return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
- return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
+ return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset//time", methods=["GET"])
@@ -300,10 +394,10 @@ def get_time_analysis(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
- return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
+ return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
- return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
+ return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset//user", methods=["GET"])
@@ -322,10 +416,10 @@ def get_user_analysis(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
- return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
+ return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
- return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
+ return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset//cultural", methods=["GET"])
@@ -344,10 +438,10 @@ def get_cultural_analysis(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
- return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
+ return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
- return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
+ return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset//interaction", methods=["GET"])
@@ -366,10 +460,10 @@ def get_interaction_analysis(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
- return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
+ return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
- return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
+ return jsonify({"error": f"An unexpected error occurred"}), 500
if __name__ == "__main__":
diff --git a/server/connectors/base.py b/server/connectors/base.py
new file mode 100644
index 0000000..48163b5
--- /dev/null
+++ b/server/connectors/base.py
@@ -0,0 +1,29 @@
+from abc import ABC, abstractmethod
+from dto.post import Post
+
+class BaseConnector(ABC):
+ # Each subclass declares these at the class level
+ source_name: str # machine-readable: "reddit", "youtube"
+ display_name: str # human-readable: "Reddit", "YouTube"
+ required_env: list[str] = [] # env vars needed to activate
+
+ search_enabled: bool
+ categories_enabled: bool
+
+ @classmethod
+ def is_available(cls) -> bool:
+ """Returns True if all required env vars are set."""
+ import os
+ return all(os.getenv(var) for var in cls.required_env)
+
+ @abstractmethod
+ def get_new_posts_by_search(self,
+ search: str = None,
+ category: str = None,
+ post_limit: int = 10
+ ) -> list[Post]:
+ ...
+
+ @abstractmethod
+ def category_exists(self, category: str) -> bool:
+ ...
\ No newline at end of file
diff --git a/connectors/boards_api.py b/server/connectors/boards_api.py
similarity index 70%
rename from connectors/boards_api.py
rename to server/connectors/boards_api.py
index 1b63aa9..57e79f7 100644
--- a/connectors/boards_api.py
+++ b/server/connectors/boards_api.py
@@ -7,6 +7,7 @@ from dto.post import Post
from dto.comment import Comment
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
+from server.connectors.base import BaseConnector
logger = logging.getLogger(__name__)
@@ -14,25 +15,64 @@ HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)"
}
-class BoardsAPI:
- def __init__(self):
- self.url = "https://www.boards.ie"
- self.source_name = "Boards.ie"
+class BoardsAPI(BaseConnector):
+ source_name: str = "boards.ie"
+ display_name: str = "Boards.ie"
- def get_new_category_posts(self, category: str, post_limit: int, comment_limit: int) -> list[Post]:
+ categories_enabled: bool = True
+ search_enabled: bool = False
+
+ def __init__(self):
+ self.base_url = "https://www.boards.ie"
+
+ def get_new_posts_by_search(self,
+ search: str,
+ category: str,
+ post_limit: int
+ ) -> list[Post]:
+ if search:
+ raise NotImplementedError("Search not compatible with boards.ie")
+
+ if category:
+ return self._get_posts(f"{self.base_url}/categories/{category}", post_limit)
+ else:
+ return self._get_posts(f"{self.base_url}/discussions", post_limit)
+
+ def category_exists(self, category: str) -> bool:
+ if not category:
+ return False
+
+ url = f"{self.base_url}/categories/{category}"
+
+ try:
+ response = requests.head(url, headers=HEADERS, allow_redirects=True)
+
+ if response.status_code == 200:
+ return True
+ if response.status_code == 404:
+ return False
+
+ # fallback if HEAD not supported
+ response = requests.get(url, headers=HEADERS)
+ return response.status_code == 200
+
+ except requests.RequestException as e:
+ logger.error(f"Error checking category '{category}': {e}")
+ return False
+
+ ## Private
+ def _get_posts(self, url, limit) -> list[Post]:
urls = []
current_page = 1
- logger.info(f"Fetching posts from category: {category}")
-
- while len(urls) < post_limit:
- url = f"{self.url}/categories/{category}/p{current_page}"
+ while len(urls) < limit:
+ url = f"{url}/p{current_page}"
html = self._fetch_page(url)
soup = BeautifulSoup(html, "html.parser")
- logger.debug(f"Processing page {current_page} for category {category}")
+ logger.debug(f"Processing page {current_page} for link: {url}")
for a in soup.select("a.threadbit-threadlink"):
- if len(urls) >= post_limit:
+ if len(urls) >= limit:
break
href = a.get("href")
@@ -41,14 +81,14 @@ class BoardsAPI:
current_page += 1
- logger.debug(f"Fetched {len(urls)} post URLs from category {category}")
+ logger.debug(f"Fetched {len(urls)} post URLs")
# Fetch post details for each URL and create Post objects
posts = []
def fetch_and_parse(post_url):
html = self._fetch_page(post_url)
- post = self._parse_thread(html, post_url, comment_limit)
+ post = self._parse_thread(html, post_url)
return post
with ThreadPoolExecutor(max_workers=30) as executor:
@@ -71,7 +111,7 @@ class BoardsAPI:
response.raise_for_status()
return response.text
- def _parse_thread(self, html: str, post_url: str, comment_limit: int) -> Post:
+ def _parse_thread(self, html: str, post_url: str) -> Post:
soup = BeautifulSoup(html, "html.parser")
# Author
@@ -100,7 +140,7 @@ class BoardsAPI:
title = title_tag.text.strip() if title_tag else None
# Comments
- comments = self._parse_comments(post_url, post_num, comment_limit)
+ comments = self._parse_comments(post_url, post_num)
post = Post(
id=post_num,
@@ -115,11 +155,11 @@ class BoardsAPI:
return post
- def _parse_comments(self, url: str, post_id: str, comment_limit: int) -> list[Comment]:
+ def _parse_comments(self, url: str, post_id: str) -> list[Comment]:
comments = []
current_url = url
- while current_url and len(comments) < comment_limit:
+ while current_url:
html = self._fetch_page(current_url)
page_comments = self._parse_page_comments(html, post_id)
comments.extend(page_comments)
@@ -130,7 +170,7 @@ class BoardsAPI:
if next_link and next_link.get('href'):
href = next_link.get('href')
- current_url = href if href.startswith('http') else self.url + href
+ current_url = href if href.startswith('http') else url + href
else:
current_url = None
diff --git a/connectors/reddit_api.py b/server/connectors/reddit_api.py
similarity index 74%
rename from connectors/reddit_api.py
rename to server/connectors/reddit_api.py
index 0ec6100..7955fca 100644
--- a/connectors/reddit_api.py
+++ b/server/connectors/reddit_api.py
@@ -5,44 +5,63 @@ import time
from dto.post import Post
from dto.user import User
from dto.comment import Comment
+from server.connectors.base import BaseConnector
logger = logging.getLogger(__name__)
-class RedditAPI:
+class RedditAPI(BaseConnector):
+ source_name: str = "reddit"
+ display_name: str = "Reddit"
+ search_enabled: bool = True
+ categories_enabled: bool = True
+
def __init__(self):
self.url = "https://www.reddit.com/"
- self.source_name = "Reddit"
# Public Methods #
- def search_new_subreddit_posts(self, search: str, subreddit: str, limit: int) -> list[Post]:
- params = {
- 'q': search,
- 'limit': limit,
- 'restrict_sr': 'on',
- 'sort': 'new'
- }
-
- logger.info(f"Searching subreddit '{subreddit}' for '{search}' with limit {limit}")
- url = f"r/{subreddit}/search.json"
- posts = []
+ def get_new_posts_by_search(self,
+ search: str,
+ category: str,
+ post_limit: int
+ ) -> list[Post]:
- while len(posts) < limit:
- batch_limit = min(100, limit - len(posts))
+ prefix = f"r/{category}/" if category else ""
+ params = {'limit': post_limit}
+
+ if search:
+ endpoint = f"{prefix}search.json"
+ params.update({
+ 'q': search,
+ 'sort': 'new',
+ 'restrict_sr': 'on' if category else 'off'
+ })
+ else:
+ endpoint = f"{prefix}new.json"
+
+ posts = []
+ after = None
+
+ while len(posts) < post_limit:
+ batch_limit = min(100, post_limit - len(posts))
params['limit'] = batch_limit
+ if after:
+ params['after'] = after
- data = self._fetch_post_overviews(url, params)
- batch_posts = self._parse_posts(data)
-
- logger.debug(f"Fetched {len(batch_posts)} posts from search in subreddit {subreddit}")
-
- if not batch_posts:
+ data = self._fetch_post_overviews(endpoint, params)
+
+ if not data or 'data' not in data or not data['data'].get('children'):
break
+ batch_posts = self._parse_posts(data)
posts.extend(batch_posts)
- return posts
+ after = data['data'].get('after')
+ if not after:
+ break
+
+ return posts[:post_limit]
- def get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]:
+ def _get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]:
posts = []
after = None
url = f"r/{subreddit}/new.json"
@@ -75,6 +94,17 @@ class RedditAPI:
data = self._fetch_post_overviews(f"user/{username}/about.json", {})
return self._parse_user(data)
+ def category_exists(self, category: str) -> bool:
+ try:
+ data = self._fetch_post_overviews(f"r/{category}/about.json", {})
+ return (
+ data is not None
+ and 'data' in data
+ and data['data'].get('id') is not None
+ )
+ except Exception:
+ return False
+
## Private Methods ##
def _parse_posts(self, data) -> list[Post]:
posts = []
diff --git a/server/connectors/registry.py b/server/connectors/registry.py
new file mode 100644
index 0000000..f2371e6
--- /dev/null
+++ b/server/connectors/registry.py
@@ -0,0 +1,30 @@
+import pkgutil
+import importlib
+import server.connectors
+from server.connectors.base import BaseConnector
+
+def _discover_connectors() -> list[type[BaseConnector]]:
+ """Walk the connectors package and collect all BaseConnector subclasses."""
+ for _, module_name, _ in pkgutil.iter_modules(server.connectors.__path__):
+ if module_name in ("base", "registry"):
+ continue
+ importlib.import_module(f"server.connectors.{module_name}")
+
+ return [
+ cls for cls in BaseConnector.__subclasses__()
+ if cls.source_name # guard against abstract intermediaries
+ ]
+
+def get_available_connectors() -> dict[str, type[BaseConnector]]:
+ return {c.source_name: c for c in _discover_connectors() if c.is_available()}
+
+def get_connector_metadata() -> dict[str, dict]:
+ res = {}
+ for id, obj in get_available_connectors().items():
+ res[id] = {"id": id,
+ "label": obj.display_name,
+ "search_enabled": obj.search_enabled,
+ "categories_enabled": obj.categories_enabled
+ }
+
+ return res
\ No newline at end of file
diff --git a/server/connectors/youtube_api.py b/server/connectors/youtube_api.py
new file mode 100644
index 0000000..7b014d0
--- /dev/null
+++ b/server/connectors/youtube_api.py
@@ -0,0 +1,96 @@
+import os
+import datetime
+
+from dotenv import load_dotenv
+from googleapiclient.discovery import build
+from googleapiclient.errors import HttpError
+from dto.post import Post
+from dto.comment import Comment
+from server.connectors.base import BaseConnector
+
+load_dotenv()
+
+API_KEY = os.getenv("YOUTUBE_API_KEY")
+
+class YouTubeAPI(BaseConnector):
+ source_name: str = "youtube"
+ display_name: str = "YouTube"
+ search_enabled: bool = True
+ categories_enabled: bool = False
+
+ def __init__(self):
+ self.youtube = build('youtube', 'v3', developerKey=API_KEY)
+
+ def get_new_posts_by_search(self,
+ search: str,
+ category: str,
+ post_limit: int
+ ) -> list[Post]:
+ videos = self._search_videos(search, post_limit)
+ posts = []
+
+ for video in videos:
+ video_id = video['id']['videoId']
+ snippet = video['snippet']
+ title = snippet['title']
+ description = snippet['description']
+ published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
+ channel_title = snippet['channelTitle']
+
+ comments = []
+ comments_data = self._get_video_comments(video_id)
+ for comment_thread in comments_data:
+ comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
+ comment = Comment(
+ id=comment_thread['id'],
+ post_id=video_id,
+ content=comment_snippet['textDisplay'],
+ author=comment_snippet['authorDisplayName'],
+ timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(),
+ reply_to=None,
+ source=self.source_name
+ )
+
+ comments.append(comment)
+
+ post = Post(
+ id=video_id,
+ content=f"{title}\n\n{description}",
+ author=channel_title,
+ timestamp=published_at,
+ url=f"https://www.youtube.com/watch?v={video_id}",
+ title=title,
+ source=self.source_name,
+ comments=comments
+ )
+
+ posts.append(post)
+
+ return posts
+
+ def category_exists(self, category):
+ return True
+
+ def _search_videos(self, query, limit):
+ request = self.youtube.search().list(
+ q=query,
+ part='snippet',
+ type='video',
+ maxResults=limit
+ )
+ response = request.execute()
+ return response.get('items', [])
+
+ def _get_video_comments(self, video_id):
+ request = self.youtube.commentThreads().list(
+ part='snippet',
+ videoId=video_id,
+ textFormat='plainText'
+ )
+
+ try:
+ response = request.execute()
+ except HttpError as e:
+ print(f"Error fetching comments for video {video_id}: {e}")
+ return []
+ return response.get('items', [])
diff --git a/server/core/auth.py b/server/core/auth.py
index 625c3c2..34bb93c 100644
--- a/server/core/auth.py
+++ b/server/core/auth.py
@@ -1,6 +1,10 @@
+import re
+
from server.db.database import PostgresConnector
from flask_bcrypt import Bcrypt
+EMAIL_REGEX = re.compile(r"[^@]+@[^@]+\.[^@]+")
+
class AuthManager:
def __init__(self, db: PostgresConnector, bcrypt: Bcrypt):
self.db = db
@@ -18,6 +22,12 @@ class AuthManager:
def register_user(self, username, email, password):
hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8")
+ if len(username) < 3:
+ raise ValueError("Username must be longer than 3 characters")
+
+ if not EMAIL_REGEX.match(email):
+ raise ValueError("Please enter a valid email address")
+
if self.get_user_by_email(email):
raise ValueError("Email already registered")
diff --git a/server/core/datasets.py b/server/core/datasets.py
index 5886cfc..4690454 100644
--- a/server/core/datasets.py
+++ b/server/core/datasets.py
@@ -1,7 +1,7 @@
import pandas as pd
from server.db.database import PostgresConnector
from psycopg2.extras import Json
-from server.exceptions import NotAuthorisedException, NonExistentDatasetException
+from server.exceptions import NonExistentDatasetException
class DatasetManager:
def __init__(self, db: PostgresConnector):
@@ -114,7 +114,7 @@ class DatasetManager:
self.db.execute_batch(query, values)
def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None):
- if status not in ["processing", "complete", "error"]:
+ if status not in ["fetching", "processing", "complete", "error"]:
raise ValueError("Invalid status")
query = """
diff --git a/server/db/schema.sql b/server/db/schema.sql
index 051a396..4550633 100644
--- a/server/db/schema.sql
+++ b/server/db/schema.sql
@@ -23,7 +23,7 @@ CREATE TABLE datasets (
-- Enforce valid states
CONSTRAINT datasets_status_check
- CHECK (status IN ('processing', 'complete', 'error'))
+ CHECK (status IN ('fetching', 'processing', 'complete', 'error'))
);
CREATE TABLE events (
diff --git a/server/queue/tasks.py b/server/queue/tasks.py
index a089596..95248d1 100644
--- a/server/queue/tasks.py
+++ b/server/queue/tasks.py
@@ -1,9 +1,13 @@
import pandas as pd
+import logging
from server.queue.celery_app import celery
from server.analysis.enrichment import DatasetEnrichment
from server.db.database import PostgresConnector
from server.core.datasets import DatasetManager
+from server.connectors.registry import get_available_connectors
+
+logger = logging.getLogger(__name__)
@celery.task(bind=True, max_retries=3)
def process_dataset(self, dataset_id: int, posts: list, topics: dict):
@@ -16,6 +20,41 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict):
processor = DatasetEnrichment(df, topics)
enriched_df = processor.enrich()
+ dataset_manager.save_dataset_content(dataset_id, enriched_df)
+ dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
+ except Exception as e:
+ dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")
+
+@celery.task(bind=True, max_retries=3)
+def fetch_and_process_dataset(self,
+ dataset_id: int,
+ source_info: list[dict],
+ topics: dict):
+ connectors = get_available_connectors()
+ db = PostgresConnector()
+ dataset_manager = DatasetManager(db)
+ posts = []
+
+ try:
+ for metadata in source_info:
+ name = metadata["name"]
+ search = metadata.get("search")
+ category = metadata.get("category")
+ limit = metadata.get("limit", 100)
+
+ connector = connectors[name]()
+ raw_posts = connector.get_new_posts_by_search(
+ search=search,
+ category=category,
+ post_limit=limit
+ )
+ posts.extend(post.to_dict() for post in raw_posts)
+
+ df = pd.DataFrame(posts)
+
+ processor = DatasetEnrichment(df, topics)
+ enriched_df = processor.enrich()
+
dataset_manager.save_dataset_content(dataset_id, enriched_df)
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
except Exception as e:
diff --git a/server/topics.json b/server/topics.json
new file mode 100644
index 0000000..271913a
--- /dev/null
+++ b/server/topics.json
@@ -0,0 +1,67 @@
+{
+ "Personal Life": "daily life, life updates, what happened today, personal stories, life events, reflections",
+
+ "Relationships": "dating, relationships, breakups, friendships, family relationships, marriage, relationship advice",
+
+ "Family & Parenting": "parents, parenting, children, raising kids, family dynamics, family stories",
+
+ "Work & Careers": "jobs, workplaces, office life, promotions, quitting jobs, career advice, workplace drama",
+
+ "Education": "school, studying, exams, university, homework, academic pressure, learning experiences",
+
+ "Money & Finance": "saving money, debt, budgeting, cost of living, financial advice, personal finance",
+
+ "Health & Fitness": "exercise, gym, workouts, running, diet, fitness routines, weight loss",
+
+ "Mental Health": "stress, anxiety, depression, burnout, therapy, emotional wellbeing",
+
+ "Food & Cooking": "meals, cooking, recipes, restaurants, snacks, food opinions",
+
+ "Travel": "holidays, trips, tourism, travel experiences, airports, flights, travel tips",
+
+ "Entertainment": "movies, TV shows, streaming services, celebrities, pop culture",
+
+ "Music": "songs, albums, artists, concerts, music opinions",
+
+ "Gaming": "video games, gaming culture, consoles, PC gaming, esports",
+
+ "Sports": "sports matches, teams, players, competitions, sports opinions",
+
+ "Technology": "phones, gadgets, apps, AI, software, tech trends",
+
+ "Internet Culture": "memes, viral trends, online jokes, internet drama, trending topics",
+
+ "Social Media": "platforms, influencers, content creators, algorithms, online communities",
+
+ "News & Current Events": "breaking news, world events, major incidents, public discussions",
+
+ "Politics": "political debates, elections, government policies, ideology",
+
+ "Culture & Society": "social issues, cultural trends, generational debates, societal changes",
+
+ "Identity & Lifestyle": "personal identity, lifestyle choices, values, self-expression",
+
+ "Hobbies & Interests": "art, photography, crafts, collecting, hobbies",
+
+ "Fashion & Beauty": "clothing, style, makeup, skincare, fashion trends",
+
+ "Animals & Pets": "pets, animal videos, pet care, wildlife",
+
+ "Humour": "jokes, funny stories, sarcasm, memes",
+
+ "Opinions & Debates": "hot takes, controversial opinions, arguments, discussions",
+
+ "Advice & Tips": "life advice, tutorials, how-to tips, recommendations",
+
+ "Product Reviews": "reviews, recommendations, experiences with products",
+
+ "Complaints & Rants": "frustrations, complaining, venting about things",
+
+ "Motivation & Inspiration": "motivational quotes, success stories, encouragement",
+
+ "Questions & Curiosity": "asking questions, seeking opinions, curiosity posts",
+
+ "Celebrations & Achievements": "birthdays, milestones, achievements, good news",
+
+ "Random Thoughts": "shower thoughts, observations, random ideas"
+}
\ No newline at end of file
diff --git a/server/utils.py b/server/utils.py
index 815739f..fb42953 100644
--- a/server/utils.py
+++ b/server/utils.py
@@ -1,4 +1,5 @@
import datetime
+import os
from flask import request
def parse_datetime_filter(value):
@@ -48,3 +49,9 @@ def get_request_filters() -> dict:
filters["data_sources"] = data_sources
return filters
+
+def get_env(name: str) -> str:
+ value = os.getenv(name)
+ if not value:
+ raise RuntimeError(f"Missing required environment variable: {name}")
+ return value