Compare commits

..

37 Commits

Author SHA1 Message Date
94befb61c5 Merge pull request 'Automatic Scraping of dataset options' (#9) from feat/automatic-scraping-datasets into main
Reviewed-on: #9
2026-03-14 21:58:49 +00:00
12f5953146 fix(api): remove error exceptions in API responses
Mainly a security thing, we don't want actual code errors being given in the API response, as someone could find out how the inner workings of the code behaves.
2026-03-14 21:58:00 +00:00
5b0441c34b fix(connector): unnecessary comment limits
In addition, I made some methods private to better align with the BaseConnector parent class.
2026-03-14 21:53:13 +00:00
d2b919cd66 fix(api): enforce integer limit and cap at 1000 in scrape_data function 2026-03-14 17:35:05 +00:00
062937ec3c fix(api): incorrect validation on search 2026-03-14 17:12:02 +00:00
2a00795cc2 chore(connectors): implement category_exists for Boards API 2026-03-14 17:11:49 +00:00
c990f29645 fix(frontend): misaligned loading page for datasets 2026-03-14 17:05:46 +00:00
8a423b2a29 feat(connectors): implement category validation in scraping process 2026-03-14 16:59:43 +00:00
d96f459104 fix(connectors): update URL references to use base_url in BoardsAPI 2026-03-13 21:59:17 +00:00
162a4de64e fix(frontend): detects which sources support category or search 2026-03-12 10:07:28 +00:00
6684780d23 fix(connectors): add stronger validation to scrape endpoint
Strong validation needed, otherwise data goes to Celery and crashes silently. In addition it checks if that specific source supports search or category.
2026-03-12 09:59:07 +00:00
c12f1b4371 chore(connectors): add category and search validation fields 2026-03-12 09:56:34 +00:00
01d6bd0164 fix(connectors): category / search fields breaking
Ideally category and search are fully optional, however some sites break if one or the other is not provided.

Unfortuntely `boards.ie` has a different page type for searches and I'm not bothered to implement a scraper from scratch.

In addition, removed comment limit options.
2026-03-11 21:16:26 +00:00
12cbc24074 chore(utils): remove split_limit function 2026-03-11 19:47:44 +00:00
0658713f42 chore: remove unused dataset creation script 2026-03-11 19:44:38 +00:00
b2ae1a9f70 feat(frontend): add page for scraping endpoint 2026-03-11 19:41:34 +00:00
eff416c34e fix(connectors): hardcoded source name in Youtube connector 2026-03-10 23:36:09 +00:00
524c9c50a0 fix(api): incorrect dataset status update message 2026-03-10 23:28:21 +00:00
2ab74d922a feat(api): support per-source search, category and limit configuration 2026-03-10 23:15:33 +00:00
d520e2af98 fix(auth): missing email and username business rules 2026-03-10 22:48:04 +00:00
8fe84a30f6 fix: data leak when opening topics file 2026-03-10 22:45:07 +00:00
dc330b87b9 fix(celery): process dataset directly in fetch task
Calling the original `process_dataset` function led to issues with JSON serialisation.
2026-03-10 22:17:00 +00:00
7ccc934f71 build: change celery to debug mode 2026-03-10 22:14:45 +00:00
a3dbe04a57 fix(frontend): option to delete dataset not shown after fail 2026-03-10 19:23:48 +00:00
a65c4a461c fix(api): flask delegates dataset fetch to celery 2026-03-10 19:17:41 +00:00
15704a0782 chore(db): update db schema to include "fetching" status 2026-03-10 19:17:08 +00:00
6ec47256d0 feat(api): add database scraping endpoints 2026-03-10 19:04:33 +00:00
2572664e26 chore(utils): add env getter that fails if env not found 2026-03-10 18:50:53 +00:00
17bd4702b2 fix(connectors): connector detectors returning name of ID alongside connector obj 2026-03-10 18:36:40 +00:00
53cb5c2ea5 feat(topics): add generalised topic list
This is easier and quicker compared to deriving a topics list based on the dataset that has been scraped.

While using LLMs to create a personalised topic list based on the query, category or dataset itself would yield better results for most, it is beyond the scope of this project.
2026-03-10 18:36:08 +00:00
0866dda8b3 chore: add util to always split evenly 2026-03-10 18:25:05 +00:00
5ccb2e73cd fix(connectors): incorrect registry location
Registry paths were using the incorrect connector path locations.
2026-03-10 18:18:42 +00:00
2a8d7c7972 refactor(connectors): Youtube & Reddit connectors implement BaseConnector 2026-03-10 18:11:33 +00:00
e7a8c17be4 chore(connectors): add base connector inheritance 2026-03-10 18:08:01 +00:00
cc799f7368 feat(connectors): add base connector and registry for detection
Idea is to have a "plugin-type" system, where new connectors can extend the `BaseConnector` class and implement the fetch posts method.

These are automatically detected by the registry, and automatically used in new Flask endpoints that give a list of possible sources.

Allows for an open-ended system where new data scrapers / API consumers can be added dynamically.
2026-03-09 21:29:03 +00:00
262a70dbf3 refactor(api): rename /upload endpoint
Ensures consistency with the other dataset-based endpoints and follows the REST-API rules more cleanly.
2026-03-09 20:55:12 +00:00
ca444e9cb0 refactor: move connectors to backend dir
They will now be more used in the backend.
2026-03-09 20:53:13 +00:00
20 changed files with 885 additions and 203 deletions

View File

@@ -1,84 +0,0 @@
import os
import datetime
from dotenv import load_dotenv
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from dto.post import Post
from dto.comment import Comment
load_dotenv()
API_KEY = os.getenv("YOUTUBE_API_KEY")
class YouTubeAPI:
def __init__(self):
self.youtube = build('youtube', 'v3', developerKey=API_KEY)
def search_videos(self, query, limit):
request = self.youtube.search().list(
q=query,
part='snippet',
type='video',
maxResults=limit
)
response = request.execute()
return response.get('items', [])
def get_video_comments(self, video_id, limit):
request = self.youtube.commentThreads().list(
part='snippet',
videoId=video_id,
maxResults=limit,
textFormat='plainText'
)
try:
response = request.execute()
except HttpError as e:
print(f"Error fetching comments for video {video_id}: {e}")
return []
return response.get('items', [])
def fetch_videos(self, query, video_limit, comment_limit) -> list[Post]:
videos = self.search_videos(query, video_limit)
posts = []
for video in videos:
video_id = video['id']['videoId']
snippet = video['snippet']
title = snippet['title']
description = snippet['description']
published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
channel_title = snippet['channelTitle']
comments = []
comments_data = self.get_video_comments(video_id, comment_limit)
for comment_thread in comments_data:
comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
comment = Comment(
id=comment_thread['id'],
post_id=video_id,
content=comment_snippet['textDisplay'],
author=comment_snippet['authorDisplayName'],
timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(),
reply_to=None,
source="YouTube"
)
comments.append(comment)
post = Post(
id=video_id,
content=f"{title}\n\n{description}",
author=channel_title,
timestamp=published_at,
url=f"https://www.youtube.com/watch?v={video_id}",
title=title,
source="YouTube",
comments=comments
)
posts.append(post)
return posts

View File

@@ -1,43 +0,0 @@
import json
import logging
from connectors.reddit_api import RedditAPI
from connectors.boards_api import BoardsAPI
from connectors.youtube_api import YouTubeAPI
posts_file = 'posts_test.jsonl'
reddit_connector = RedditAPI()
boards_connector = BoardsAPI()
youtube_connector = YouTubeAPI()
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.WARNING)
def remove_empty_posts(posts):
return [post for post in posts if post.content.strip() != ""]
def save_to_jsonl(filename, posts):
with open(filename, 'a', encoding='utf-8') as f:
for post in posts:
# Convert post object to dict if it's a dataclass
data = post.to_dict()
f.write(json.dumps(data) + '\n')
def main():
boards_posts = boards_connector.get_new_category_posts('cork-city', 1200, 1200)
save_to_jsonl(posts_file, boards_posts)
reddit_posts = reddit_connector.get_new_subreddit_posts('cork', 1200)
reddit_posts = remove_empty_posts(reddit_posts)
save_to_jsonl(posts_file, reddit_posts)
ireland_posts = reddit_connector.search_new_subreddit_posts('cork', 'ireland', 1200)
ireland_posts = remove_empty_posts(ireland_posts)
save_to_jsonl(posts_file, ireland_posts)
youtube_videos = youtube_connector.fetch_videos('cork city', 1200, 1200)
save_to_jsonl(posts_file, youtube_videos)
if __name__ == "__main__":
main()

View File

@@ -43,7 +43,7 @@ services:
- .env
command: >
celery -A server.queue.celery_app.celery worker
--loglevel=info
--loglevel=debug
--pool=solo
depends_on:
- postgres

View File

@@ -5,6 +5,7 @@ import DatasetsPage from "./pages/Datasets";
import DatasetStatusPage from "./pages/DatasetStatus";
import LoginPage from "./pages/Login";
import UploadPage from "./pages/Upload";
import AutoScrapePage from "./pages/AutoScrape";
import StatPage from "./pages/Stats";
import { getDocumentTitle } from "./utils/documentTitle";
import DatasetEditPage from "./pages/DatasetEdit";
@@ -22,6 +23,7 @@ function App() {
<Route path="/" element={<Navigate to="/login" replace />} />
<Route path="/login" element={<LoginPage />} />
<Route path="/upload" element={<UploadPage />} />
<Route path="/auto-scrape" element={<AutoScrapePage />} />
<Route path="/datasets" element={<DatasetsPage />} />
<Route path="/dataset/:datasetId/status" element={<DatasetStatusPage />} />
<Route path="/dataset/:datasetId/stats" element={<StatPage />} />

View File

@@ -0,0 +1,338 @@
import axios from "axios";
import { useEffect, useState } from "react";
import { useNavigate } from "react-router-dom";
import StatsStyling from "../styles/stats_styling";
const styles = StatsStyling;
const API_BASE_URL = import.meta.env.VITE_BACKEND_URL;
type SourceOption = {
id: string;
label: string;
search_enabled?: boolean;
categories_enabled?: boolean;
searchEnabled?: boolean;
categoriesEnabled?: boolean;
};
type SourceConfig = {
sourceName: string;
limit: string;
search: string;
category: string;
};
const buildEmptySourceConfig = (sourceName = ""): SourceConfig => ({
sourceName,
limit: "100",
search: "",
category: "",
});
const supportsSearch = (source?: SourceOption): boolean =>
Boolean(source?.search_enabled ?? source?.searchEnabled);
const supportsCategories = (source?: SourceOption): boolean =>
Boolean(source?.categories_enabled ?? source?.categoriesEnabled);
const AutoScrapePage = () => {
const navigate = useNavigate();
const [datasetName, setDatasetName] = useState("");
const [sourceOptions, setSourceOptions] = useState<SourceOption[]>([]);
const [sourceConfigs, setSourceConfigs] = useState<SourceConfig[]>([]);
const [returnMessage, setReturnMessage] = useState("");
const [isLoadingSources, setIsLoadingSources] = useState(true);
const [isSubmitting, setIsSubmitting] = useState(false);
const [hasError, setHasError] = useState(false);
useEffect(() => {
axios
.get<SourceOption[]>(`${API_BASE_URL}/datasets/sources`)
.then((response) => {
const options = response.data || [];
setSourceOptions(options);
setSourceConfigs([buildEmptySourceConfig(options[0]?.id || "")]);
})
.catch((requestError: unknown) => {
setHasError(true);
if (axios.isAxiosError(requestError)) {
setReturnMessage(
`Failed to load available sources: ${String(
requestError.response?.data?.error || requestError.message
)}`
);
} else {
setReturnMessage("Failed to load available sources.");
}
})
.finally(() => {
setIsLoadingSources(false);
});
}, []);
const updateSourceConfig = (index: number, field: keyof SourceConfig, value: string) => {
setSourceConfigs((previous) =>
previous.map((config, configIndex) =>
configIndex === index
? field === "sourceName"
? { ...config, sourceName: value, search: "", category: "" }
: { ...config, [field]: value }
: config
)
);
};
const getSourceOption = (sourceName: string) =>
sourceOptions.find((option) => option.id === sourceName);
const addSourceConfig = () => {
setSourceConfigs((previous) => [
...previous,
buildEmptySourceConfig(sourceOptions[0]?.id || ""),
]);
};
const removeSourceConfig = (index: number) => {
setSourceConfigs((previous) => previous.filter((_, configIndex) => configIndex !== index));
};
const autoScrape = async () => {
const token = localStorage.getItem("access_token");
if (!token) {
setHasError(true);
setReturnMessage("You must be signed in to auto scrape a dataset.");
return;
}
const normalizedDatasetName = datasetName.trim();
if (!normalizedDatasetName) {
setHasError(true);
setReturnMessage("Please add a dataset name before continuing.");
return;
}
if (sourceConfigs.length === 0) {
setHasError(true);
setReturnMessage("Please add at least one source.");
return;
}
const normalizedSources = sourceConfigs.map((source) => {
const sourceOption = getSourceOption(source.sourceName);
return {
name: source.sourceName,
limit: Number(source.limit || 100),
search: supportsSearch(sourceOption) ? source.search.trim() || undefined : undefined,
category: supportsCategories(sourceOption)
? source.category.trim() || undefined
: undefined,
};
});
const invalidSource = normalizedSources.find(
(source) => !source.name || !Number.isFinite(source.limit) || source.limit <= 0
);
if (invalidSource) {
setHasError(true);
setReturnMessage("Every source needs a name and a limit greater than zero.");
return;
}
try {
setIsSubmitting(true);
setHasError(false);
setReturnMessage("");
const response = await axios.post(
`${API_BASE_URL}/datasets/scrape`,
{
name: normalizedDatasetName,
sources: normalizedSources,
},
{
headers: {
Authorization: `Bearer ${token}`,
},
}
);
const datasetId = Number(response.data.dataset_id);
setReturnMessage(
`Auto scrape queued successfully (dataset #${datasetId}). Redirecting to processing status...`
);
setTimeout(() => {
navigate(`/dataset/${datasetId}/status`);
}, 400);
} catch (requestError: unknown) {
setHasError(true);
if (axios.isAxiosError(requestError)) {
const message = String(
requestError.response?.data?.error || requestError.message || "Auto scrape failed."
);
setReturnMessage(`Auto scrape failed: ${message}`);
} else {
setReturnMessage("Auto scrape failed due to an unexpected error.");
}
} finally {
setIsSubmitting(false);
}
};
return (
<div style={styles.page}>
<div style={styles.containerWide}>
<div style={{ ...styles.card, ...styles.headerBar }}>
<div>
<h1 style={styles.sectionHeaderTitle}>Auto Scrape Dataset</h1>
<p style={styles.sectionHeaderSubtitle}>
Select sources and scrape settings, then queue processing automatically.
</p>
</div>
<button
type="button"
style={{ ...styles.buttonPrimary, opacity: isSubmitting || isLoadingSources ? 0.75 : 1 }}
onClick={autoScrape}
disabled={isSubmitting || isLoadingSources}
>
{isSubmitting ? "Queueing..." : "Auto Scrape and Analyze"}
</button>
</div>
<div
style={{
...styles.grid,
marginTop: 14,
gridTemplateColumns: "repeat(auto-fit, minmax(280px, 1fr))",
}}
>
<div style={{ ...styles.card, gridColumn: "auto" }}>
<h2 style={{ ...styles.sectionTitle, color: "#24292f" }}>Dataset Name</h2>
<p style={styles.sectionSubtitle}>Use a clear label so you can identify this run later.</p>
<input
style={{ ...styles.input, ...styles.inputFullWidth }}
type="text"
placeholder="Example: r/cork subreddit - Jan 2026"
value={datasetName}
onChange={(event) => setDatasetName(event.target.value)}
/>
</div>
<div style={{ ...styles.card, gridColumn: "auto" }}>
<h2 style={{ ...styles.sectionTitle, color: "#24292f" }}>Sources</h2>
<p style={styles.sectionSubtitle}>
Configure source, limit, optional search, and optional category.
</p>
{isLoadingSources && <p style={styles.subtleBodyText}>Loading sources...</p>}
{!isLoadingSources && sourceOptions.length === 0 && (
<p style={styles.subtleBodyText}>No source connectors are currently available.</p>
)}
{!isLoadingSources && sourceOptions.length > 0 && (
<div style={{ display: "flex", flexDirection: "column", gap: 10 }}>
{sourceConfigs.map((source, index) => {
const sourceOption = getSourceOption(source.sourceName);
const searchEnabled = supportsSearch(sourceOption);
const categoriesEnabled = supportsCategories(sourceOption);
return (
<div
key={`source-${index}`}
style={{
border: "1px solid #d0d7de",
borderRadius: 8,
padding: 12,
background: "#f6f8fa",
display: "grid",
gap: 8,
}}
>
<select
value={source.sourceName}
style={{ ...styles.input, ...styles.inputFullWidth }}
onChange={(event) => updateSourceConfig(index, "sourceName", event.target.value)}
>
{sourceOptions.map((option) => (
<option key={option.id} value={option.id}>
{option.label}
</option>
))}
</select>
<input
type="number"
min={1}
value={source.limit}
placeholder="Limit"
style={{ ...styles.input, ...styles.inputFullWidth }}
onChange={(event) => updateSourceConfig(index, "limit", event.target.value)}
/>
<input
type="text"
value={source.search}
placeholder={
searchEnabled
? "Search term (optional)"
: "Search not supported for this source"
}
style={{ ...styles.input, ...styles.inputFullWidth }}
disabled={!searchEnabled}
onChange={(event) => updateSourceConfig(index, "search", event.target.value)}
/>
<input
type="text"
value={source.category}
placeholder={
categoriesEnabled
? "Category (optional)"
: "Categories not supported for this source"
}
style={{ ...styles.input, ...styles.inputFullWidth }}
disabled={!categoriesEnabled}
onChange={(event) => updateSourceConfig(index, "category", event.target.value)}
/>
{sourceConfigs.length > 1 && (
<button
type="button"
style={styles.buttonSecondary}
onClick={() => removeSourceConfig(index)}
>
Remove source
</button>
)}
</div>
);
})}
<button type="button" style={styles.buttonSecondary} onClick={addSourceConfig}>
Add another source
</button>
</div>
)}
</div>
</div>
<div
style={{
...styles.card,
marginTop: 14,
...(hasError ? styles.alertCardError : styles.alertCardInfo),
}}
>
{returnMessage ||
"After queueing, your dataset is fetched and processed in the background automatically."}
</div>
</div>
</div>
);
};
export default AutoScrapePage;

View File

@@ -9,7 +9,7 @@ const API_BASE_URL = import.meta.env.VITE_BACKEND_URL;
type DatasetItem = {
id: number;
name?: string;
status?: "processing" | "complete" | "error" | string;
status?: "processing" | "complete" | "error" | "fetching" | string;
status_message?: string | null;
completed_at?: string | null;
created_at?: string | null;
@@ -50,7 +50,24 @@ const DatasetsPage = () => {
}, []);
if (loading) {
return <p style={{ ...styles.page, minHeight: "100vh" }}>Loading datasets...</p>;
return (
<div style={styles.loadingPage}>
<div style={{ ...styles.loadingCard, transform: "translateY(-100px)" }}>
<div style={styles.loadingHeader}>
<div style={styles.loadingSpinner} />
<div>
<h2 style={styles.loadingTitle}>Loading datasets</h2>
</div>
</div>
<div style={styles.loadingSkeleton}>
<div style={{ ...styles.loadingSkeletonLine, ...styles.loadingSkeletonLineLong }} />
<div style={{ ...styles.loadingSkeletonLine, ...styles.loadingSkeletonLineMed }} />
<div style={{ ...styles.loadingSkeletonLine, ...styles.loadingSkeletonLineShort }} />
</div>
</div>
</div>
)
}
return (
@@ -63,9 +80,18 @@ const DatasetsPage = () => {
View and reopen datasets you previously uploaded.
</p>
</div>
<div style={styles.controlsWrapped}>
<button type="button" style={styles.buttonPrimary} onClick={() => navigate("/upload")}>
Upload New Dataset
</button>
<button
type="button"
style={styles.buttonSecondary}
onClick={() => navigate("/auto-scrape")}
>
Auto Scrape Dataset
</button>
</div>
</div>
{error && (
@@ -93,7 +119,7 @@ const DatasetsPage = () => {
<div style={{ ...styles.card, marginTop: 14, padding: 0, overflow: "hidden" }}>
<ul style={styles.listNoBullets}>
{datasets.map((dataset) => {
const isComplete = dataset.status === "complete";
const isComplete = dataset.status === "complete" || dataset.status === "error";
const editPath = `/dataset/${dataset.id}/edit`;
const targetPath = isComplete
? `/dataset/${dataset.id}/stats`

View File

@@ -40,7 +40,7 @@ const UploadPage = () => {
setHasError(false);
setReturnMessage("");
const response = await axios.post(`${API_BASE_URL}/upload`, formData, {
const response = await axios.post(`${API_BASE_URL}/datasets/upload`, formData, {
headers: {
"Content-Type": "multipart/form-data",
},

View File

@@ -3,6 +3,7 @@ const DEFAULT_TITLE = "Ethnograph View";
const STATIC_TITLES: Record<string, string> = {
"/login": "Sign In",
"/upload": "Upload Dataset",
"/auto-scrape": "Auto Scrape Dataset",
"/datasets": "My Datasets",
};

View File

@@ -19,32 +19,38 @@ from server.exceptions import NotAuthorisedException, NonExistentDatasetExceptio
from server.db.database import PostgresConnector
from server.core.auth import AuthManager
from server.core.datasets import DatasetManager
from server.utils import get_request_filters
from server.queue.tasks import process_dataset
from server.utils import get_request_filters, get_env
from server.queue.tasks import process_dataset, fetch_and_process_dataset
from server.connectors.registry import get_available_connectors, get_connector_metadata
app = Flask(__name__)
# Env Variables
load_dotenv()
frontend_url = os.getenv("FRONTEND_URL", "http://localhost:5173")
jwt_secret_key = os.getenv("JWT_SECRET_KEY", "super-secret-change-this")
jwt_access_token_expires = int(
os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200)
) # Default to 20 minutes
max_fetch_limit = int(get_env("MAX_FETCH_LIMIT"))
frontend_url = get_env("FRONTEND_URL")
jwt_secret_key = get_env("JWT_SECRET_KEY")
jwt_access_token_expires = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200)) # Default to 20 minutes
# Flask Configuration
CORS(app, resources={r"/*": {"origins": frontend_url}})
app.config["JWT_SECRET_KEY"] = jwt_secret_key
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = jwt_access_token_expires
# Security
bcrypt = Bcrypt(app)
jwt = JWTManager(app)
# Helper Objects
db = PostgresConnector()
auth_manager = AuthManager(db, bcrypt)
dataset_manager = DatasetManager(db)
stat_gen = StatGen()
connectors = get_available_connectors()
# Default Files
with open("server/topics.json") as f:
default_topic_list = json.load(f)
@app.route("/register", methods=["POST"])
def register_user():
@@ -68,7 +74,7 @@ def register_user():
return jsonify({"error": str(e)}), 400
except Exception as e:
print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
return jsonify({"error": f"An unexpected error occurred"}), 500
print(f"Registered new user: {username}")
return jsonify({"message": f"User '{username}' registered successfully"}), 200
@@ -93,7 +99,7 @@ def login_user():
return jsonify({"error": "Invalid username or password"}), 401
except Exception as e:
print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/profile", methods=["GET"])
@@ -111,7 +117,95 @@ def get_user_datasets():
current_user = int(get_jwt_identity())
return jsonify(dataset_manager.get_user_datasets(current_user)), 200
@app.route("/upload", methods=["POST"])
@app.route("/datasets/sources", methods=["GET"])
def get_dataset_sources():
list_metadata = list(get_connector_metadata().values())
return jsonify(list_metadata)
@app.route("/datasets/scrape", methods=["POST"])
@jwt_required()
def scrape_data():
data = request.get_json()
connector_metadata = get_connector_metadata()
# Strong validation needed, otherwise data goes to Celery and crashes silently
if not data or "sources" not in data:
return jsonify({"error": "Sources must be provided"}), 400
if "name" not in data or not str(data["name"]).strip():
return jsonify({"error": "Dataset name is required"}), 400
dataset_name = data["name"].strip()
user_id = int(get_jwt_identity())
source_configs = data["sources"]
if not isinstance(source_configs, list) or len(source_configs) == 0:
return jsonify({"error": "Sources must be a non-empty list"}), 400
for source in source_configs:
if not isinstance(source, dict):
return jsonify({"error": "Each source must be an object"}), 400
if "name" not in source:
return jsonify({"error": "Each source must contain a name"}), 400
name = source["name"]
limit = source.get("limit", 1000)
category = source.get("category")
search = source.get("search")
if limit:
try:
limit = int(limit)
except (ValueError, TypeError):
return jsonify({"error": "Limit must be an integer"}), 400
if limit > 1000:
limit = 1000
if name not in connector_metadata:
return jsonify({"error": "Source not supported"}), 400
if search and not connector_metadata[name]["search_enabled"]:
return jsonify({"error": f"Source {name} does not support search"}), 400
if category and not connector_metadata[name]["categories_enabled"]:
return jsonify({"error": f"Source {name} does not support categories"}), 400
if category and not connectors[name]().category_exists(category):
return jsonify({"error": f"Category does not exist for {name}"}), 400
try:
dataset_id = dataset_manager.save_dataset_info(
user_id,
dataset_name,
default_topic_list
)
dataset_manager.set_dataset_status(
dataset_id,
"fetching",
f"Data is being fetched from {', '.join(source['name'] for source in source_configs)}"
)
fetch_and_process_dataset.delay(
dataset_id,
source_configs,
default_topic_list
)
except Exception:
print(traceback.format_exc())
return jsonify({"error": "Failed to queue dataset processing"}), 500
return jsonify({
"message": "Dataset queued for processing",
"dataset_id": dataset_id,
"status": "processing"
}), 202
@app.route("/datasets/upload", methods=["POST"])
@jwt_required()
def upload_data():
if "posts" not in request.files or "topics" not in request.files:
@@ -151,9 +245,9 @@ def upload_data():
}
), 202
except ValueError as e:
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
return jsonify({"error": f"Failed to read JSONL file"}), 400
except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>", methods=["GET"])
@jwt_required()
@@ -256,10 +350,10 @@ def content_endpoint(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/summary", methods=["GET"])
@@ -278,10 +372,10 @@ def get_summary(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/time", methods=["GET"])
@@ -300,10 +394,10 @@ def get_time_analysis(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/user", methods=["GET"])
@@ -322,10 +416,10 @@ def get_user_analysis(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/cultural", methods=["GET"])
@@ -344,10 +438,10 @@ def get_cultural_analysis(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/interaction", methods=["GET"])
@@ -366,10 +460,10 @@ def get_interaction_analysis(dataset_id):
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
return jsonify({"error": f"An unexpected error occurred"}), 500
if __name__ == "__main__":

29
server/connectors/base.py Normal file
View File

@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from dto.post import Post
class BaseConnector(ABC):
# Each subclass declares these at the class level
source_name: str # machine-readable: "reddit", "youtube"
display_name: str # human-readable: "Reddit", "YouTube"
required_env: list[str] = [] # env vars needed to activate
search_enabled: bool
categories_enabled: bool
@classmethod
def is_available(cls) -> bool:
"""Returns True if all required env vars are set."""
import os
return all(os.getenv(var) for var in cls.required_env)
@abstractmethod
def get_new_posts_by_search(self,
search: str = None,
category: str = None,
post_limit: int = 10
) -> list[Post]:
...
@abstractmethod
def category_exists(self, category: str) -> bool:
...

View File

@@ -7,6 +7,7 @@ from dto.post import Post
from dto.comment import Comment
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from server.connectors.base import BaseConnector
logger = logging.getLogger(__name__)
@@ -14,25 +15,64 @@ HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)"
}
class BoardsAPI:
def __init__(self):
self.url = "https://www.boards.ie"
self.source_name = "Boards.ie"
class BoardsAPI(BaseConnector):
source_name: str = "boards.ie"
display_name: str = "Boards.ie"
def get_new_category_posts(self, category: str, post_limit: int, comment_limit: int) -> list[Post]:
categories_enabled: bool = True
search_enabled: bool = False
def __init__(self):
self.base_url = "https://www.boards.ie"
def get_new_posts_by_search(self,
search: str,
category: str,
post_limit: int
) -> list[Post]:
if search:
raise NotImplementedError("Search not compatible with boards.ie")
if category:
return self._get_posts(f"{self.base_url}/categories/{category}", post_limit)
else:
return self._get_posts(f"{self.base_url}/discussions", post_limit)
def category_exists(self, category: str) -> bool:
if not category:
return False
url = f"{self.base_url}/categories/{category}"
try:
response = requests.head(url, headers=HEADERS, allow_redirects=True)
if response.status_code == 200:
return True
if response.status_code == 404:
return False
# fallback if HEAD not supported
response = requests.get(url, headers=HEADERS)
return response.status_code == 200
except requests.RequestException as e:
logger.error(f"Error checking category '{category}': {e}")
return False
## Private
def _get_posts(self, url, limit) -> list[Post]:
urls = []
current_page = 1
logger.info(f"Fetching posts from category: {category}")
while len(urls) < post_limit:
url = f"{self.url}/categories/{category}/p{current_page}"
while len(urls) < limit:
url = f"{url}/p{current_page}"
html = self._fetch_page(url)
soup = BeautifulSoup(html, "html.parser")
logger.debug(f"Processing page {current_page} for category {category}")
logger.debug(f"Processing page {current_page} for link: {url}")
for a in soup.select("a.threadbit-threadlink"):
if len(urls) >= post_limit:
if len(urls) >= limit:
break
href = a.get("href")
@@ -41,14 +81,14 @@ class BoardsAPI:
current_page += 1
logger.debug(f"Fetched {len(urls)} post URLs from category {category}")
logger.debug(f"Fetched {len(urls)} post URLs")
# Fetch post details for each URL and create Post objects
posts = []
def fetch_and_parse(post_url):
html = self._fetch_page(post_url)
post = self._parse_thread(html, post_url, comment_limit)
post = self._parse_thread(html, post_url)
return post
with ThreadPoolExecutor(max_workers=30) as executor:
@@ -71,7 +111,7 @@ class BoardsAPI:
response.raise_for_status()
return response.text
def _parse_thread(self, html: str, post_url: str, comment_limit: int) -> Post:
def _parse_thread(self, html: str, post_url: str) -> Post:
soup = BeautifulSoup(html, "html.parser")
# Author
@@ -100,7 +140,7 @@ class BoardsAPI:
title = title_tag.text.strip() if title_tag else None
# Comments
comments = self._parse_comments(post_url, post_num, comment_limit)
comments = self._parse_comments(post_url, post_num)
post = Post(
id=post_num,
@@ -115,11 +155,11 @@ class BoardsAPI:
return post
def _parse_comments(self, url: str, post_id: str, comment_limit: int) -> list[Comment]:
def _parse_comments(self, url: str, post_id: str) -> list[Comment]:
comments = []
current_url = url
while current_url and len(comments) < comment_limit:
while current_url:
html = self._fetch_page(current_url)
page_comments = self._parse_page_comments(html, post_id)
comments.extend(page_comments)
@@ -130,7 +170,7 @@ class BoardsAPI:
if next_link and next_link.get('href'):
href = next_link.get('href')
current_url = href if href.startswith('http') else self.url + href
current_url = href if href.startswith('http') else url + href
else:
current_url = None

View File

@@ -5,44 +5,63 @@ import time
from dto.post import Post
from dto.user import User
from dto.comment import Comment
from server.connectors.base import BaseConnector
logger = logging.getLogger(__name__)
class RedditAPI:
class RedditAPI(BaseConnector):
source_name: str = "reddit"
display_name: str = "Reddit"
search_enabled: bool = True
categories_enabled: bool = True
def __init__(self):
self.url = "https://www.reddit.com/"
self.source_name = "Reddit"
# Public Methods #
def search_new_subreddit_posts(self, search: str, subreddit: str, limit: int) -> list[Post]:
params = {
def get_new_posts_by_search(self,
search: str,
category: str,
post_limit: int
) -> list[Post]:
prefix = f"r/{category}/" if category else ""
params = {'limit': post_limit}
if search:
endpoint = f"{prefix}search.json"
params.update({
'q': search,
'limit': limit,
'restrict_sr': 'on',
'sort': 'new'
}
'sort': 'new',
'restrict_sr': 'on' if category else 'off'
})
else:
endpoint = f"{prefix}new.json"
logger.info(f"Searching subreddit '{subreddit}' for '{search}' with limit {limit}")
url = f"r/{subreddit}/search.json"
posts = []
after = None
while len(posts) < limit:
batch_limit = min(100, limit - len(posts))
while len(posts) < post_limit:
batch_limit = min(100, post_limit - len(posts))
params['limit'] = batch_limit
if after:
params['after'] = after
data = self._fetch_post_overviews(url, params)
batch_posts = self._parse_posts(data)
data = self._fetch_post_overviews(endpoint, params)
logger.debug(f"Fetched {len(batch_posts)} posts from search in subreddit {subreddit}")
if not batch_posts:
if not data or 'data' not in data or not data['data'].get('children'):
break
batch_posts = self._parse_posts(data)
posts.extend(batch_posts)
return posts
after = data['data'].get('after')
if not after:
break
def get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]:
return posts[:post_limit]
def _get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]:
posts = []
after = None
url = f"r/{subreddit}/new.json"
@@ -75,6 +94,17 @@ class RedditAPI:
data = self._fetch_post_overviews(f"user/{username}/about.json", {})
return self._parse_user(data)
def category_exists(self, category: str) -> bool:
try:
data = self._fetch_post_overviews(f"r/{category}/about.json", {})
return (
data is not None
and 'data' in data
and data['data'].get('id') is not None
)
except Exception:
return False
## Private Methods ##
def _parse_posts(self, data) -> list[Post]:
posts = []

View File

@@ -0,0 +1,30 @@
import pkgutil
import importlib
import server.connectors
from server.connectors.base import BaseConnector
def _discover_connectors() -> list[type[BaseConnector]]:
"""Walk the connectors package and collect all BaseConnector subclasses."""
for _, module_name, _ in pkgutil.iter_modules(server.connectors.__path__):
if module_name in ("base", "registry"):
continue
importlib.import_module(f"server.connectors.{module_name}")
return [
cls for cls in BaseConnector.__subclasses__()
if cls.source_name # guard against abstract intermediaries
]
def get_available_connectors() -> dict[str, type[BaseConnector]]:
return {c.source_name: c for c in _discover_connectors() if c.is_available()}
def get_connector_metadata() -> dict[str, dict]:
res = {}
for id, obj in get_available_connectors().items():
res[id] = {"id": id,
"label": obj.display_name,
"search_enabled": obj.search_enabled,
"categories_enabled": obj.categories_enabled
}
return res

View File

@@ -0,0 +1,96 @@
import os
import datetime
from dotenv import load_dotenv
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from dto.post import Post
from dto.comment import Comment
from server.connectors.base import BaseConnector
load_dotenv()
API_KEY = os.getenv("YOUTUBE_API_KEY")
class YouTubeAPI(BaseConnector):
source_name: str = "youtube"
display_name: str = "YouTube"
search_enabled: bool = True
categories_enabled: bool = False
def __init__(self):
self.youtube = build('youtube', 'v3', developerKey=API_KEY)
def get_new_posts_by_search(self,
search: str,
category: str,
post_limit: int
) -> list[Post]:
videos = self._search_videos(search, post_limit)
posts = []
for video in videos:
video_id = video['id']['videoId']
snippet = video['snippet']
title = snippet['title']
description = snippet['description']
published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
channel_title = snippet['channelTitle']
comments = []
comments_data = self._get_video_comments(video_id)
for comment_thread in comments_data:
comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
comment = Comment(
id=comment_thread['id'],
post_id=video_id,
content=comment_snippet['textDisplay'],
author=comment_snippet['authorDisplayName'],
timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(),
reply_to=None,
source=self.source_name
)
comments.append(comment)
post = Post(
id=video_id,
content=f"{title}\n\n{description}",
author=channel_title,
timestamp=published_at,
url=f"https://www.youtube.com/watch?v={video_id}",
title=title,
source=self.source_name,
comments=comments
)
posts.append(post)
return posts
def category_exists(self, category):
return True
def _search_videos(self, query, limit):
request = self.youtube.search().list(
q=query,
part='snippet',
type='video',
maxResults=limit
)
response = request.execute()
return response.get('items', [])
def _get_video_comments(self, video_id):
request = self.youtube.commentThreads().list(
part='snippet',
videoId=video_id,
textFormat='plainText'
)
try:
response = request.execute()
except HttpError as e:
print(f"Error fetching comments for video {video_id}: {e}")
return []
return response.get('items', [])

View File

@@ -1,6 +1,10 @@
import re
from server.db.database import PostgresConnector
from flask_bcrypt import Bcrypt
EMAIL_REGEX = re.compile(r"[^@]+@[^@]+\.[^@]+")
class AuthManager:
def __init__(self, db: PostgresConnector, bcrypt: Bcrypt):
self.db = db
@@ -18,6 +22,12 @@ class AuthManager:
def register_user(self, username, email, password):
hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8")
if len(username) < 3:
raise ValueError("Username must be longer than 3 characters")
if not EMAIL_REGEX.match(email):
raise ValueError("Please enter a valid email address")
if self.get_user_by_email(email):
raise ValueError("Email already registered")

View File

@@ -1,7 +1,7 @@
import pandas as pd
from server.db.database import PostgresConnector
from psycopg2.extras import Json
from server.exceptions import NotAuthorisedException, NonExistentDatasetException
from server.exceptions import NonExistentDatasetException
class DatasetManager:
def __init__(self, db: PostgresConnector):
@@ -114,7 +114,7 @@ class DatasetManager:
self.db.execute_batch(query, values)
def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None):
if status not in ["processing", "complete", "error"]:
if status not in ["fetching", "processing", "complete", "error"]:
raise ValueError("Invalid status")
query = """

View File

@@ -23,7 +23,7 @@ CREATE TABLE datasets (
-- Enforce valid states
CONSTRAINT datasets_status_check
CHECK (status IN ('processing', 'complete', 'error'))
CHECK (status IN ('fetching', 'processing', 'complete', 'error'))
);
CREATE TABLE events (

View File

@@ -1,9 +1,13 @@
import pandas as pd
import logging
from server.queue.celery_app import celery
from server.analysis.enrichment import DatasetEnrichment
from server.db.database import PostgresConnector
from server.core.datasets import DatasetManager
from server.connectors.registry import get_available_connectors
logger = logging.getLogger(__name__)
@celery.task(bind=True, max_retries=3)
def process_dataset(self, dataset_id: int, posts: list, topics: dict):
@@ -20,3 +24,38 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict):
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
except Exception as e:
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")
@celery.task(bind=True, max_retries=3)
def fetch_and_process_dataset(self,
dataset_id: int,
source_info: list[dict],
topics: dict):
connectors = get_available_connectors()
db = PostgresConnector()
dataset_manager = DatasetManager(db)
posts = []
try:
for metadata in source_info:
name = metadata["name"]
search = metadata.get("search")
category = metadata.get("category")
limit = metadata.get("limit", 100)
connector = connectors[name]()
raw_posts = connector.get_new_posts_by_search(
search=search,
category=category,
post_limit=limit
)
posts.extend(post.to_dict() for post in raw_posts)
df = pd.DataFrame(posts)
processor = DatasetEnrichment(df, topics)
enriched_df = processor.enrich()
dataset_manager.save_dataset_content(dataset_id, enriched_df)
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
except Exception as e:
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")

67
server/topics.json Normal file
View File

@@ -0,0 +1,67 @@
{
"Personal Life": "daily life, life updates, what happened today, personal stories, life events, reflections",
"Relationships": "dating, relationships, breakups, friendships, family relationships, marriage, relationship advice",
"Family & Parenting": "parents, parenting, children, raising kids, family dynamics, family stories",
"Work & Careers": "jobs, workplaces, office life, promotions, quitting jobs, career advice, workplace drama",
"Education": "school, studying, exams, university, homework, academic pressure, learning experiences",
"Money & Finance": "saving money, debt, budgeting, cost of living, financial advice, personal finance",
"Health & Fitness": "exercise, gym, workouts, running, diet, fitness routines, weight loss",
"Mental Health": "stress, anxiety, depression, burnout, therapy, emotional wellbeing",
"Food & Cooking": "meals, cooking, recipes, restaurants, snacks, food opinions",
"Travel": "holidays, trips, tourism, travel experiences, airports, flights, travel tips",
"Entertainment": "movies, TV shows, streaming services, celebrities, pop culture",
"Music": "songs, albums, artists, concerts, music opinions",
"Gaming": "video games, gaming culture, consoles, PC gaming, esports",
"Sports": "sports matches, teams, players, competitions, sports opinions",
"Technology": "phones, gadgets, apps, AI, software, tech trends",
"Internet Culture": "memes, viral trends, online jokes, internet drama, trending topics",
"Social Media": "platforms, influencers, content creators, algorithms, online communities",
"News & Current Events": "breaking news, world events, major incidents, public discussions",
"Politics": "political debates, elections, government policies, ideology",
"Culture & Society": "social issues, cultural trends, generational debates, societal changes",
"Identity & Lifestyle": "personal identity, lifestyle choices, values, self-expression",
"Hobbies & Interests": "art, photography, crafts, collecting, hobbies",
"Fashion & Beauty": "clothing, style, makeup, skincare, fashion trends",
"Animals & Pets": "pets, animal videos, pet care, wildlife",
"Humour": "jokes, funny stories, sarcasm, memes",
"Opinions & Debates": "hot takes, controversial opinions, arguments, discussions",
"Advice & Tips": "life advice, tutorials, how-to tips, recommendations",
"Product Reviews": "reviews, recommendations, experiences with products",
"Complaints & Rants": "frustrations, complaining, venting about things",
"Motivation & Inspiration": "motivational quotes, success stories, encouragement",
"Questions & Curiosity": "asking questions, seeking opinions, curiosity posts",
"Celebrations & Achievements": "birthdays, milestones, achievements, good news",
"Random Thoughts": "shower thoughts, observations, random ideas"
}

View File

@@ -1,4 +1,5 @@
import datetime
import os
from flask import request
def parse_datetime_filter(value):
@@ -48,3 +49,9 @@ def get_request_filters() -> dict:
filters["data_sources"] = data_sources
return filters
def get_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise RuntimeError(f"Missing required environment variable: {name}")
return value