Compare commits

..

37 Commits

Author SHA1 Message Date
94befb61c5 Merge pull request 'Automatic Scraping of dataset options' (#9) from feat/automatic-scraping-datasets into main
Reviewed-on: #9
2026-03-14 21:58:49 +00:00
12f5953146 fix(api): remove error exceptions in API responses
Mainly a security thing, we don't want actual code errors being given in the API response, as someone could find out how the inner workings of the code behaves.
2026-03-14 21:58:00 +00:00
5b0441c34b fix(connector): unnecessary comment limits
In addition, I made some methods private to better align with the BaseConnector parent class.
2026-03-14 21:53:13 +00:00
d2b919cd66 fix(api): enforce integer limit and cap at 1000 in scrape_data function 2026-03-14 17:35:05 +00:00
062937ec3c fix(api): incorrect validation on search 2026-03-14 17:12:02 +00:00
2a00795cc2 chore(connectors): implement category_exists for Boards API 2026-03-14 17:11:49 +00:00
c990f29645 fix(frontend): misaligned loading page for datasets 2026-03-14 17:05:46 +00:00
8a423b2a29 feat(connectors): implement category validation in scraping process 2026-03-14 16:59:43 +00:00
d96f459104 fix(connectors): update URL references to use base_url in BoardsAPI 2026-03-13 21:59:17 +00:00
162a4de64e fix(frontend): detects which sources support category or search 2026-03-12 10:07:28 +00:00
6684780d23 fix(connectors): add stronger validation to scrape endpoint
Strong validation needed, otherwise data goes to Celery and crashes silently. In addition it checks if that specific source supports search or category.
2026-03-12 09:59:07 +00:00
c12f1b4371 chore(connectors): add category and search validation fields 2026-03-12 09:56:34 +00:00
01d6bd0164 fix(connectors): category / search fields breaking
Ideally category and search are fully optional, however some sites break if one or the other is not provided.

Unfortuntely `boards.ie` has a different page type for searches and I'm not bothered to implement a scraper from scratch.

In addition, removed comment limit options.
2026-03-11 21:16:26 +00:00
12cbc24074 chore(utils): remove split_limit function 2026-03-11 19:47:44 +00:00
0658713f42 chore: remove unused dataset creation script 2026-03-11 19:44:38 +00:00
b2ae1a9f70 feat(frontend): add page for scraping endpoint 2026-03-11 19:41:34 +00:00
eff416c34e fix(connectors): hardcoded source name in Youtube connector 2026-03-10 23:36:09 +00:00
524c9c50a0 fix(api): incorrect dataset status update message 2026-03-10 23:28:21 +00:00
2ab74d922a feat(api): support per-source search, category and limit configuration 2026-03-10 23:15:33 +00:00
d520e2af98 fix(auth): missing email and username business rules 2026-03-10 22:48:04 +00:00
8fe84a30f6 fix: data leak when opening topics file 2026-03-10 22:45:07 +00:00
dc330b87b9 fix(celery): process dataset directly in fetch task
Calling the original `process_dataset` function led to issues with JSON serialisation.
2026-03-10 22:17:00 +00:00
7ccc934f71 build: change celery to debug mode 2026-03-10 22:14:45 +00:00
a3dbe04a57 fix(frontend): option to delete dataset not shown after fail 2026-03-10 19:23:48 +00:00
a65c4a461c fix(api): flask delegates dataset fetch to celery 2026-03-10 19:17:41 +00:00
15704a0782 chore(db): update db schema to include "fetching" status 2026-03-10 19:17:08 +00:00
6ec47256d0 feat(api): add database scraping endpoints 2026-03-10 19:04:33 +00:00
2572664e26 chore(utils): add env getter that fails if env not found 2026-03-10 18:50:53 +00:00
17bd4702b2 fix(connectors): connector detectors returning name of ID alongside connector obj 2026-03-10 18:36:40 +00:00
53cb5c2ea5 feat(topics): add generalised topic list
This is easier and quicker compared to deriving a topics list based on the dataset that has been scraped.

While using LLMs to create a personalised topic list based on the query, category or dataset itself would yield better results for most, it is beyond the scope of this project.
2026-03-10 18:36:08 +00:00
0866dda8b3 chore: add util to always split evenly 2026-03-10 18:25:05 +00:00
5ccb2e73cd fix(connectors): incorrect registry location
Registry paths were using the incorrect connector path locations.
2026-03-10 18:18:42 +00:00
2a8d7c7972 refactor(connectors): Youtube & Reddit connectors implement BaseConnector 2026-03-10 18:11:33 +00:00
e7a8c17be4 chore(connectors): add base connector inheritance 2026-03-10 18:08:01 +00:00
cc799f7368 feat(connectors): add base connector and registry for detection
Idea is to have a "plugin-type" system, where new connectors can extend the `BaseConnector` class and implement the fetch posts method.

These are automatically detected by the registry, and automatically used in new Flask endpoints that give a list of possible sources.

Allows for an open-ended system where new data scrapers / API consumers can be added dynamically.
2026-03-09 21:29:03 +00:00
262a70dbf3 refactor(api): rename /upload endpoint
Ensures consistency with the other dataset-based endpoints and follows the REST-API rules more cleanly.
2026-03-09 20:55:12 +00:00
ca444e9cb0 refactor: move connectors to backend dir
They will now be more used in the backend.
2026-03-09 20:53:13 +00:00
20 changed files with 885 additions and 203 deletions

View File

@@ -1,84 +0,0 @@
import os
import datetime
from dotenv import load_dotenv
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from dto.post import Post
from dto.comment import Comment
load_dotenv()
API_KEY = os.getenv("YOUTUBE_API_KEY")
class YouTubeAPI:
def __init__(self):
self.youtube = build('youtube', 'v3', developerKey=API_KEY)
def search_videos(self, query, limit):
request = self.youtube.search().list(
q=query,
part='snippet',
type='video',
maxResults=limit
)
response = request.execute()
return response.get('items', [])
def get_video_comments(self, video_id, limit):
request = self.youtube.commentThreads().list(
part='snippet',
videoId=video_id,
maxResults=limit,
textFormat='plainText'
)
try:
response = request.execute()
except HttpError as e:
print(f"Error fetching comments for video {video_id}: {e}")
return []
return response.get('items', [])
def fetch_videos(self, query, video_limit, comment_limit) -> list[Post]:
videos = self.search_videos(query, video_limit)
posts = []
for video in videos:
video_id = video['id']['videoId']
snippet = video['snippet']
title = snippet['title']
description = snippet['description']
published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
channel_title = snippet['channelTitle']
comments = []
comments_data = self.get_video_comments(video_id, comment_limit)
for comment_thread in comments_data:
comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
comment = Comment(
id=comment_thread['id'],
post_id=video_id,
content=comment_snippet['textDisplay'],
author=comment_snippet['authorDisplayName'],
timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(),
reply_to=None,
source="YouTube"
)
comments.append(comment)
post = Post(
id=video_id,
content=f"{title}\n\n{description}",
author=channel_title,
timestamp=published_at,
url=f"https://www.youtube.com/watch?v={video_id}",
title=title,
source="YouTube",
comments=comments
)
posts.append(post)
return posts

View File

@@ -1,43 +0,0 @@
import json
import logging
from connectors.reddit_api import RedditAPI
from connectors.boards_api import BoardsAPI
from connectors.youtube_api import YouTubeAPI
posts_file = 'posts_test.jsonl'
reddit_connector = RedditAPI()
boards_connector = BoardsAPI()
youtube_connector = YouTubeAPI()
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.WARNING)
def remove_empty_posts(posts):
return [post for post in posts if post.content.strip() != ""]
def save_to_jsonl(filename, posts):
with open(filename, 'a', encoding='utf-8') as f:
for post in posts:
# Convert post object to dict if it's a dataclass
data = post.to_dict()
f.write(json.dumps(data) + '\n')
def main():
boards_posts = boards_connector.get_new_category_posts('cork-city', 1200, 1200)
save_to_jsonl(posts_file, boards_posts)
reddit_posts = reddit_connector.get_new_subreddit_posts('cork', 1200)
reddit_posts = remove_empty_posts(reddit_posts)
save_to_jsonl(posts_file, reddit_posts)
ireland_posts = reddit_connector.search_new_subreddit_posts('cork', 'ireland', 1200)
ireland_posts = remove_empty_posts(ireland_posts)
save_to_jsonl(posts_file, ireland_posts)
youtube_videos = youtube_connector.fetch_videos('cork city', 1200, 1200)
save_to_jsonl(posts_file, youtube_videos)
if __name__ == "__main__":
main()

View File

@@ -43,7 +43,7 @@ services:
- .env - .env
command: > command: >
celery -A server.queue.celery_app.celery worker celery -A server.queue.celery_app.celery worker
--loglevel=info --loglevel=debug
--pool=solo --pool=solo
depends_on: depends_on:
- postgres - postgres

View File

@@ -5,6 +5,7 @@ import DatasetsPage from "./pages/Datasets";
import DatasetStatusPage from "./pages/DatasetStatus"; import DatasetStatusPage from "./pages/DatasetStatus";
import LoginPage from "./pages/Login"; import LoginPage from "./pages/Login";
import UploadPage from "./pages/Upload"; import UploadPage from "./pages/Upload";
import AutoScrapePage from "./pages/AutoScrape";
import StatPage from "./pages/Stats"; import StatPage from "./pages/Stats";
import { getDocumentTitle } from "./utils/documentTitle"; import { getDocumentTitle } from "./utils/documentTitle";
import DatasetEditPage from "./pages/DatasetEdit"; import DatasetEditPage from "./pages/DatasetEdit";
@@ -22,6 +23,7 @@ function App() {
<Route path="/" element={<Navigate to="/login" replace />} /> <Route path="/" element={<Navigate to="/login" replace />} />
<Route path="/login" element={<LoginPage />} /> <Route path="/login" element={<LoginPage />} />
<Route path="/upload" element={<UploadPage />} /> <Route path="/upload" element={<UploadPage />} />
<Route path="/auto-scrape" element={<AutoScrapePage />} />
<Route path="/datasets" element={<DatasetsPage />} /> <Route path="/datasets" element={<DatasetsPage />} />
<Route path="/dataset/:datasetId/status" element={<DatasetStatusPage />} /> <Route path="/dataset/:datasetId/status" element={<DatasetStatusPage />} />
<Route path="/dataset/:datasetId/stats" element={<StatPage />} /> <Route path="/dataset/:datasetId/stats" element={<StatPage />} />

View File

@@ -0,0 +1,338 @@
import axios from "axios";
import { useEffect, useState } from "react";
import { useNavigate } from "react-router-dom";
import StatsStyling from "../styles/stats_styling";
const styles = StatsStyling;
const API_BASE_URL = import.meta.env.VITE_BACKEND_URL;
type SourceOption = {
id: string;
label: string;
search_enabled?: boolean;
categories_enabled?: boolean;
searchEnabled?: boolean;
categoriesEnabled?: boolean;
};
type SourceConfig = {
sourceName: string;
limit: string;
search: string;
category: string;
};
const buildEmptySourceConfig = (sourceName = ""): SourceConfig => ({
sourceName,
limit: "100",
search: "",
category: "",
});
const supportsSearch = (source?: SourceOption): boolean =>
Boolean(source?.search_enabled ?? source?.searchEnabled);
const supportsCategories = (source?: SourceOption): boolean =>
Boolean(source?.categories_enabled ?? source?.categoriesEnabled);
const AutoScrapePage = () => {
const navigate = useNavigate();
const [datasetName, setDatasetName] = useState("");
const [sourceOptions, setSourceOptions] = useState<SourceOption[]>([]);
const [sourceConfigs, setSourceConfigs] = useState<SourceConfig[]>([]);
const [returnMessage, setReturnMessage] = useState("");
const [isLoadingSources, setIsLoadingSources] = useState(true);
const [isSubmitting, setIsSubmitting] = useState(false);
const [hasError, setHasError] = useState(false);
useEffect(() => {
axios
.get<SourceOption[]>(`${API_BASE_URL}/datasets/sources`)
.then((response) => {
const options = response.data || [];
setSourceOptions(options);
setSourceConfigs([buildEmptySourceConfig(options[0]?.id || "")]);
})
.catch((requestError: unknown) => {
setHasError(true);
if (axios.isAxiosError(requestError)) {
setReturnMessage(
`Failed to load available sources: ${String(
requestError.response?.data?.error || requestError.message
)}`
);
} else {
setReturnMessage("Failed to load available sources.");
}
})
.finally(() => {
setIsLoadingSources(false);
});
}, []);
const updateSourceConfig = (index: number, field: keyof SourceConfig, value: string) => {
setSourceConfigs((previous) =>
previous.map((config, configIndex) =>
configIndex === index
? field === "sourceName"
? { ...config, sourceName: value, search: "", category: "" }
: { ...config, [field]: value }
: config
)
);
};
const getSourceOption = (sourceName: string) =>
sourceOptions.find((option) => option.id === sourceName);
const addSourceConfig = () => {
setSourceConfigs((previous) => [
...previous,
buildEmptySourceConfig(sourceOptions[0]?.id || ""),
]);
};
const removeSourceConfig = (index: number) => {
setSourceConfigs((previous) => previous.filter((_, configIndex) => configIndex !== index));
};
const autoScrape = async () => {
const token = localStorage.getItem("access_token");
if (!token) {
setHasError(true);
setReturnMessage("You must be signed in to auto scrape a dataset.");
return;
}
const normalizedDatasetName = datasetName.trim();
if (!normalizedDatasetName) {
setHasError(true);
setReturnMessage("Please add a dataset name before continuing.");
return;
}
if (sourceConfigs.length === 0) {
setHasError(true);
setReturnMessage("Please add at least one source.");
return;
}
const normalizedSources = sourceConfigs.map((source) => {
const sourceOption = getSourceOption(source.sourceName);
return {
name: source.sourceName,
limit: Number(source.limit || 100),
search: supportsSearch(sourceOption) ? source.search.trim() || undefined : undefined,
category: supportsCategories(sourceOption)
? source.category.trim() || undefined
: undefined,
};
});
const invalidSource = normalizedSources.find(
(source) => !source.name || !Number.isFinite(source.limit) || source.limit <= 0
);
if (invalidSource) {
setHasError(true);
setReturnMessage("Every source needs a name and a limit greater than zero.");
return;
}
try {
setIsSubmitting(true);
setHasError(false);
setReturnMessage("");
const response = await axios.post(
`${API_BASE_URL}/datasets/scrape`,
{
name: normalizedDatasetName,
sources: normalizedSources,
},
{
headers: {
Authorization: `Bearer ${token}`,
},
}
);
const datasetId = Number(response.data.dataset_id);
setReturnMessage(
`Auto scrape queued successfully (dataset #${datasetId}). Redirecting to processing status...`
);
setTimeout(() => {
navigate(`/dataset/${datasetId}/status`);
}, 400);
} catch (requestError: unknown) {
setHasError(true);
if (axios.isAxiosError(requestError)) {
const message = String(
requestError.response?.data?.error || requestError.message || "Auto scrape failed."
);
setReturnMessage(`Auto scrape failed: ${message}`);
} else {
setReturnMessage("Auto scrape failed due to an unexpected error.");
}
} finally {
setIsSubmitting(false);
}
};
return (
<div style={styles.page}>
<div style={styles.containerWide}>
<div style={{ ...styles.card, ...styles.headerBar }}>
<div>
<h1 style={styles.sectionHeaderTitle}>Auto Scrape Dataset</h1>
<p style={styles.sectionHeaderSubtitle}>
Select sources and scrape settings, then queue processing automatically.
</p>
</div>
<button
type="button"
style={{ ...styles.buttonPrimary, opacity: isSubmitting || isLoadingSources ? 0.75 : 1 }}
onClick={autoScrape}
disabled={isSubmitting || isLoadingSources}
>
{isSubmitting ? "Queueing..." : "Auto Scrape and Analyze"}
</button>
</div>
<div
style={{
...styles.grid,
marginTop: 14,
gridTemplateColumns: "repeat(auto-fit, minmax(280px, 1fr))",
}}
>
<div style={{ ...styles.card, gridColumn: "auto" }}>
<h2 style={{ ...styles.sectionTitle, color: "#24292f" }}>Dataset Name</h2>
<p style={styles.sectionSubtitle}>Use a clear label so you can identify this run later.</p>
<input
style={{ ...styles.input, ...styles.inputFullWidth }}
type="text"
placeholder="Example: r/cork subreddit - Jan 2026"
value={datasetName}
onChange={(event) => setDatasetName(event.target.value)}
/>
</div>
<div style={{ ...styles.card, gridColumn: "auto" }}>
<h2 style={{ ...styles.sectionTitle, color: "#24292f" }}>Sources</h2>
<p style={styles.sectionSubtitle}>
Configure source, limit, optional search, and optional category.
</p>
{isLoadingSources && <p style={styles.subtleBodyText}>Loading sources...</p>}
{!isLoadingSources && sourceOptions.length === 0 && (
<p style={styles.subtleBodyText}>No source connectors are currently available.</p>
)}
{!isLoadingSources && sourceOptions.length > 0 && (
<div style={{ display: "flex", flexDirection: "column", gap: 10 }}>
{sourceConfigs.map((source, index) => {
const sourceOption = getSourceOption(source.sourceName);
const searchEnabled = supportsSearch(sourceOption);
const categoriesEnabled = supportsCategories(sourceOption);
return (
<div
key={`source-${index}`}
style={{
border: "1px solid #d0d7de",
borderRadius: 8,
padding: 12,
background: "#f6f8fa",
display: "grid",
gap: 8,
}}
>
<select
value={source.sourceName}
style={{ ...styles.input, ...styles.inputFullWidth }}
onChange={(event) => updateSourceConfig(index, "sourceName", event.target.value)}
>
{sourceOptions.map((option) => (
<option key={option.id} value={option.id}>
{option.label}
</option>
))}
</select>
<input
type="number"
min={1}
value={source.limit}
placeholder="Limit"
style={{ ...styles.input, ...styles.inputFullWidth }}
onChange={(event) => updateSourceConfig(index, "limit", event.target.value)}
/>
<input
type="text"
value={source.search}
placeholder={
searchEnabled
? "Search term (optional)"
: "Search not supported for this source"
}
style={{ ...styles.input, ...styles.inputFullWidth }}
disabled={!searchEnabled}
onChange={(event) => updateSourceConfig(index, "search", event.target.value)}
/>
<input
type="text"
value={source.category}
placeholder={
categoriesEnabled
? "Category (optional)"
: "Categories not supported for this source"
}
style={{ ...styles.input, ...styles.inputFullWidth }}
disabled={!categoriesEnabled}
onChange={(event) => updateSourceConfig(index, "category", event.target.value)}
/>
{sourceConfigs.length > 1 && (
<button
type="button"
style={styles.buttonSecondary}
onClick={() => removeSourceConfig(index)}
>
Remove source
</button>
)}
</div>
);
})}
<button type="button" style={styles.buttonSecondary} onClick={addSourceConfig}>
Add another source
</button>
</div>
)}
</div>
</div>
<div
style={{
...styles.card,
marginTop: 14,
...(hasError ? styles.alertCardError : styles.alertCardInfo),
}}
>
{returnMessage ||
"After queueing, your dataset is fetched and processed in the background automatically."}
</div>
</div>
</div>
);
};
export default AutoScrapePage;

View File

@@ -9,7 +9,7 @@ const API_BASE_URL = import.meta.env.VITE_BACKEND_URL;
type DatasetItem = { type DatasetItem = {
id: number; id: number;
name?: string; name?: string;
status?: "processing" | "complete" | "error" | string; status?: "processing" | "complete" | "error" | "fetching" | string;
status_message?: string | null; status_message?: string | null;
completed_at?: string | null; completed_at?: string | null;
created_at?: string | null; created_at?: string | null;
@@ -50,7 +50,24 @@ const DatasetsPage = () => {
}, []); }, []);
if (loading) { if (loading) {
return <p style={{ ...styles.page, minHeight: "100vh" }}>Loading datasets...</p>; return (
<div style={styles.loadingPage}>
<div style={{ ...styles.loadingCard, transform: "translateY(-100px)" }}>
<div style={styles.loadingHeader}>
<div style={styles.loadingSpinner} />
<div>
<h2 style={styles.loadingTitle}>Loading datasets</h2>
</div>
</div>
<div style={styles.loadingSkeleton}>
<div style={{ ...styles.loadingSkeletonLine, ...styles.loadingSkeletonLineLong }} />
<div style={{ ...styles.loadingSkeletonLine, ...styles.loadingSkeletonLineMed }} />
<div style={{ ...styles.loadingSkeletonLine, ...styles.loadingSkeletonLineShort }} />
</div>
</div>
</div>
)
} }
return ( return (
@@ -63,9 +80,18 @@ const DatasetsPage = () => {
View and reopen datasets you previously uploaded. View and reopen datasets you previously uploaded.
</p> </p>
</div> </div>
<button type="button" style={styles.buttonPrimary} onClick={() => navigate("/upload")}> <div style={styles.controlsWrapped}>
Upload New Dataset <button type="button" style={styles.buttonPrimary} onClick={() => navigate("/upload")}>
</button> Upload New Dataset
</button>
<button
type="button"
style={styles.buttonSecondary}
onClick={() => navigate("/auto-scrape")}
>
Auto Scrape Dataset
</button>
</div>
</div> </div>
{error && ( {error && (
@@ -93,7 +119,7 @@ const DatasetsPage = () => {
<div style={{ ...styles.card, marginTop: 14, padding: 0, overflow: "hidden" }}> <div style={{ ...styles.card, marginTop: 14, padding: 0, overflow: "hidden" }}>
<ul style={styles.listNoBullets}> <ul style={styles.listNoBullets}>
{datasets.map((dataset) => { {datasets.map((dataset) => {
const isComplete = dataset.status === "complete"; const isComplete = dataset.status === "complete" || dataset.status === "error";
const editPath = `/dataset/${dataset.id}/edit`; const editPath = `/dataset/${dataset.id}/edit`;
const targetPath = isComplete const targetPath = isComplete
? `/dataset/${dataset.id}/stats` ? `/dataset/${dataset.id}/stats`

View File

@@ -40,7 +40,7 @@ const UploadPage = () => {
setHasError(false); setHasError(false);
setReturnMessage(""); setReturnMessage("");
const response = await axios.post(`${API_BASE_URL}/upload`, formData, { const response = await axios.post(`${API_BASE_URL}/datasets/upload`, formData, {
headers: { headers: {
"Content-Type": "multipart/form-data", "Content-Type": "multipart/form-data",
}, },

View File

@@ -3,6 +3,7 @@ const DEFAULT_TITLE = "Ethnograph View";
const STATIC_TITLES: Record<string, string> = { const STATIC_TITLES: Record<string, string> = {
"/login": "Sign In", "/login": "Sign In",
"/upload": "Upload Dataset", "/upload": "Upload Dataset",
"/auto-scrape": "Auto Scrape Dataset",
"/datasets": "My Datasets", "/datasets": "My Datasets",
}; };

View File

@@ -19,32 +19,38 @@ from server.exceptions import NotAuthorisedException, NonExistentDatasetExceptio
from server.db.database import PostgresConnector from server.db.database import PostgresConnector
from server.core.auth import AuthManager from server.core.auth import AuthManager
from server.core.datasets import DatasetManager from server.core.datasets import DatasetManager
from server.utils import get_request_filters from server.utils import get_request_filters, get_env
from server.queue.tasks import process_dataset from server.queue.tasks import process_dataset, fetch_and_process_dataset
from server.connectors.registry import get_available_connectors, get_connector_metadata
app = Flask(__name__) app = Flask(__name__)
# Env Variables # Env Variables
load_dotenv() load_dotenv()
frontend_url = os.getenv("FRONTEND_URL", "http://localhost:5173") max_fetch_limit = int(get_env("MAX_FETCH_LIMIT"))
jwt_secret_key = os.getenv("JWT_SECRET_KEY", "super-secret-change-this") frontend_url = get_env("FRONTEND_URL")
jwt_access_token_expires = int( jwt_secret_key = get_env("JWT_SECRET_KEY")
os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200) jwt_access_token_expires = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200)) # Default to 20 minutes
) # Default to 20 minutes
# Flask Configuration # Flask Configuration
CORS(app, resources={r"/*": {"origins": frontend_url}}) CORS(app, resources={r"/*": {"origins": frontend_url}})
app.config["JWT_SECRET_KEY"] = jwt_secret_key app.config["JWT_SECRET_KEY"] = jwt_secret_key
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = jwt_access_token_expires app.config["JWT_ACCESS_TOKEN_EXPIRES"] = jwt_access_token_expires
# Security
bcrypt = Bcrypt(app) bcrypt = Bcrypt(app)
jwt = JWTManager(app) jwt = JWTManager(app)
# Helper Objects
db = PostgresConnector() db = PostgresConnector()
auth_manager = AuthManager(db, bcrypt) auth_manager = AuthManager(db, bcrypt)
dataset_manager = DatasetManager(db) dataset_manager = DatasetManager(db)
stat_gen = StatGen() stat_gen = StatGen()
connectors = get_available_connectors()
# Default Files
with open("server/topics.json") as f:
default_topic_list = json.load(f)
@app.route("/register", methods=["POST"]) @app.route("/register", methods=["POST"])
def register_user(): def register_user():
@@ -68,7 +74,7 @@ def register_user():
return jsonify({"error": str(e)}), 400 return jsonify({"error": str(e)}), 400
except Exception as e: except Exception as e:
print(traceback.format_exc()) print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred"}), 500
print(f"Registered new user: {username}") print(f"Registered new user: {username}")
return jsonify({"message": f"User '{username}' registered successfully"}), 200 return jsonify({"message": f"User '{username}' registered successfully"}), 200
@@ -93,7 +99,7 @@ def login_user():
return jsonify({"error": "Invalid username or password"}), 401 return jsonify({"error": "Invalid username or password"}), 401
except Exception as e: except Exception as e:
print(traceback.format_exc()) print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/profile", methods=["GET"]) @app.route("/profile", methods=["GET"])
@@ -111,7 +117,95 @@ def get_user_datasets():
current_user = int(get_jwt_identity()) current_user = int(get_jwt_identity())
return jsonify(dataset_manager.get_user_datasets(current_user)), 200 return jsonify(dataset_manager.get_user_datasets(current_user)), 200
@app.route("/upload", methods=["POST"]) @app.route("/datasets/sources", methods=["GET"])
def get_dataset_sources():
list_metadata = list(get_connector_metadata().values())
return jsonify(list_metadata)
@app.route("/datasets/scrape", methods=["POST"])
@jwt_required()
def scrape_data():
data = request.get_json()
connector_metadata = get_connector_metadata()
# Strong validation needed, otherwise data goes to Celery and crashes silently
if not data or "sources" not in data:
return jsonify({"error": "Sources must be provided"}), 400
if "name" not in data or not str(data["name"]).strip():
return jsonify({"error": "Dataset name is required"}), 400
dataset_name = data["name"].strip()
user_id = int(get_jwt_identity())
source_configs = data["sources"]
if not isinstance(source_configs, list) or len(source_configs) == 0:
return jsonify({"error": "Sources must be a non-empty list"}), 400
for source in source_configs:
if not isinstance(source, dict):
return jsonify({"error": "Each source must be an object"}), 400
if "name" not in source:
return jsonify({"error": "Each source must contain a name"}), 400
name = source["name"]
limit = source.get("limit", 1000)
category = source.get("category")
search = source.get("search")
if limit:
try:
limit = int(limit)
except (ValueError, TypeError):
return jsonify({"error": "Limit must be an integer"}), 400
if limit > 1000:
limit = 1000
if name not in connector_metadata:
return jsonify({"error": "Source not supported"}), 400
if search and not connector_metadata[name]["search_enabled"]:
return jsonify({"error": f"Source {name} does not support search"}), 400
if category and not connector_metadata[name]["categories_enabled"]:
return jsonify({"error": f"Source {name} does not support categories"}), 400
if category and not connectors[name]().category_exists(category):
return jsonify({"error": f"Category does not exist for {name}"}), 400
try:
dataset_id = dataset_manager.save_dataset_info(
user_id,
dataset_name,
default_topic_list
)
dataset_manager.set_dataset_status(
dataset_id,
"fetching",
f"Data is being fetched from {', '.join(source['name'] for source in source_configs)}"
)
fetch_and_process_dataset.delay(
dataset_id,
source_configs,
default_topic_list
)
except Exception:
print(traceback.format_exc())
return jsonify({"error": "Failed to queue dataset processing"}), 500
return jsonify({
"message": "Dataset queued for processing",
"dataset_id": dataset_id,
"status": "processing"
}), 202
@app.route("/datasets/upload", methods=["POST"])
@jwt_required() @jwt_required()
def upload_data(): def upload_data():
if "posts" not in request.files or "topics" not in request.files: if "posts" not in request.files or "topics" not in request.files:
@@ -151,9 +245,9 @@ def upload_data():
} }
), 202 ), 202
except ValueError as e: except ValueError as e:
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400 return jsonify({"error": f"Failed to read JSONL file"}), 400
except Exception as e: except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>", methods=["GET"]) @app.route("/dataset/<int:dataset_id>", methods=["GET"])
@jwt_required() @jwt_required()
@@ -256,10 +350,10 @@ def content_endpoint(dataset_id):
except NonExistentDatasetException: except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404 return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e: except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e: except Exception as e:
print(traceback.format_exc()) print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/summary", methods=["GET"]) @app.route("/dataset/<int:dataset_id>/summary", methods=["GET"])
@@ -278,10 +372,10 @@ def get_summary(dataset_id):
except NonExistentDatasetException: except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404 return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e: except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e: except Exception as e:
print(traceback.format_exc()) print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/time", methods=["GET"]) @app.route("/dataset/<int:dataset_id>/time", methods=["GET"])
@@ -300,10 +394,10 @@ def get_time_analysis(dataset_id):
except NonExistentDatasetException: except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404 return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e: except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e: except Exception as e:
print(traceback.format_exc()) print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/user", methods=["GET"]) @app.route("/dataset/<int:dataset_id>/user", methods=["GET"])
@@ -322,10 +416,10 @@ def get_user_analysis(dataset_id):
except NonExistentDatasetException: except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404 return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e: except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e: except Exception as e:
print(traceback.format_exc()) print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/cultural", methods=["GET"]) @app.route("/dataset/<int:dataset_id>/cultural", methods=["GET"])
@@ -344,10 +438,10 @@ def get_cultural_analysis(dataset_id):
except NonExistentDatasetException: except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404 return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e: except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e: except Exception as e:
print(traceback.format_exc()) print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/interaction", methods=["GET"]) @app.route("/dataset/<int:dataset_id>/interaction", methods=["GET"])
@@ -366,10 +460,10 @@ def get_interaction_analysis(dataset_id):
except NonExistentDatasetException: except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404 return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e: except ValueError as e:
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400 return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e: except Exception as e:
print(traceback.format_exc()) print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred"}), 500
if __name__ == "__main__": if __name__ == "__main__":

29
server/connectors/base.py Normal file
View File

@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from dto.post import Post
class BaseConnector(ABC):
# Each subclass declares these at the class level
source_name: str # machine-readable: "reddit", "youtube"
display_name: str # human-readable: "Reddit", "YouTube"
required_env: list[str] = [] # env vars needed to activate
search_enabled: bool
categories_enabled: bool
@classmethod
def is_available(cls) -> bool:
"""Returns True if all required env vars are set."""
import os
return all(os.getenv(var) for var in cls.required_env)
@abstractmethod
def get_new_posts_by_search(self,
search: str = None,
category: str = None,
post_limit: int = 10
) -> list[Post]:
...
@abstractmethod
def category_exists(self, category: str) -> bool:
...

View File

@@ -7,6 +7,7 @@ from dto.post import Post
from dto.comment import Comment from dto.comment import Comment
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from server.connectors.base import BaseConnector
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -14,25 +15,64 @@ HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)" "User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)"
} }
class BoardsAPI: class BoardsAPI(BaseConnector):
def __init__(self): source_name: str = "boards.ie"
self.url = "https://www.boards.ie" display_name: str = "Boards.ie"
self.source_name = "Boards.ie"
def get_new_category_posts(self, category: str, post_limit: int, comment_limit: int) -> list[Post]: categories_enabled: bool = True
search_enabled: bool = False
def __init__(self):
self.base_url = "https://www.boards.ie"
def get_new_posts_by_search(self,
search: str,
category: str,
post_limit: int
) -> list[Post]:
if search:
raise NotImplementedError("Search not compatible with boards.ie")
if category:
return self._get_posts(f"{self.base_url}/categories/{category}", post_limit)
else:
return self._get_posts(f"{self.base_url}/discussions", post_limit)
def category_exists(self, category: str) -> bool:
if not category:
return False
url = f"{self.base_url}/categories/{category}"
try:
response = requests.head(url, headers=HEADERS, allow_redirects=True)
if response.status_code == 200:
return True
if response.status_code == 404:
return False
# fallback if HEAD not supported
response = requests.get(url, headers=HEADERS)
return response.status_code == 200
except requests.RequestException as e:
logger.error(f"Error checking category '{category}': {e}")
return False
## Private
def _get_posts(self, url, limit) -> list[Post]:
urls = [] urls = []
current_page = 1 current_page = 1
logger.info(f"Fetching posts from category: {category}") while len(urls) < limit:
url = f"{url}/p{current_page}"
while len(urls) < post_limit:
url = f"{self.url}/categories/{category}/p{current_page}"
html = self._fetch_page(url) html = self._fetch_page(url)
soup = BeautifulSoup(html, "html.parser") soup = BeautifulSoup(html, "html.parser")
logger.debug(f"Processing page {current_page} for category {category}") logger.debug(f"Processing page {current_page} for link: {url}")
for a in soup.select("a.threadbit-threadlink"): for a in soup.select("a.threadbit-threadlink"):
if len(urls) >= post_limit: if len(urls) >= limit:
break break
href = a.get("href") href = a.get("href")
@@ -41,14 +81,14 @@ class BoardsAPI:
current_page += 1 current_page += 1
logger.debug(f"Fetched {len(urls)} post URLs from category {category}") logger.debug(f"Fetched {len(urls)} post URLs")
# Fetch post details for each URL and create Post objects # Fetch post details for each URL and create Post objects
posts = [] posts = []
def fetch_and_parse(post_url): def fetch_and_parse(post_url):
html = self._fetch_page(post_url) html = self._fetch_page(post_url)
post = self._parse_thread(html, post_url, comment_limit) post = self._parse_thread(html, post_url)
return post return post
with ThreadPoolExecutor(max_workers=30) as executor: with ThreadPoolExecutor(max_workers=30) as executor:
@@ -71,7 +111,7 @@ class BoardsAPI:
response.raise_for_status() response.raise_for_status()
return response.text return response.text
def _parse_thread(self, html: str, post_url: str, comment_limit: int) -> Post: def _parse_thread(self, html: str, post_url: str) -> Post:
soup = BeautifulSoup(html, "html.parser") soup = BeautifulSoup(html, "html.parser")
# Author # Author
@@ -100,7 +140,7 @@ class BoardsAPI:
title = title_tag.text.strip() if title_tag else None title = title_tag.text.strip() if title_tag else None
# Comments # Comments
comments = self._parse_comments(post_url, post_num, comment_limit) comments = self._parse_comments(post_url, post_num)
post = Post( post = Post(
id=post_num, id=post_num,
@@ -115,11 +155,11 @@ class BoardsAPI:
return post return post
def _parse_comments(self, url: str, post_id: str, comment_limit: int) -> list[Comment]: def _parse_comments(self, url: str, post_id: str) -> list[Comment]:
comments = [] comments = []
current_url = url current_url = url
while current_url and len(comments) < comment_limit: while current_url:
html = self._fetch_page(current_url) html = self._fetch_page(current_url)
page_comments = self._parse_page_comments(html, post_id) page_comments = self._parse_page_comments(html, post_id)
comments.extend(page_comments) comments.extend(page_comments)
@@ -130,7 +170,7 @@ class BoardsAPI:
if next_link and next_link.get('href'): if next_link and next_link.get('href'):
href = next_link.get('href') href = next_link.get('href')
current_url = href if href.startswith('http') else self.url + href current_url = href if href.startswith('http') else url + href
else: else:
current_url = None current_url = None

View File

@@ -5,44 +5,63 @@ import time
from dto.post import Post from dto.post import Post
from dto.user import User from dto.user import User
from dto.comment import Comment from dto.comment import Comment
from server.connectors.base import BaseConnector
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RedditAPI: class RedditAPI(BaseConnector):
source_name: str = "reddit"
display_name: str = "Reddit"
search_enabled: bool = True
categories_enabled: bool = True
def __init__(self): def __init__(self):
self.url = "https://www.reddit.com/" self.url = "https://www.reddit.com/"
self.source_name = "Reddit"
# Public Methods # # Public Methods #
def search_new_subreddit_posts(self, search: str, subreddit: str, limit: int) -> list[Post]: def get_new_posts_by_search(self,
params = { search: str,
'q': search, category: str,
'limit': limit, post_limit: int
'restrict_sr': 'on', ) -> list[Post]:
'sort': 'new'
}
logger.info(f"Searching subreddit '{subreddit}' for '{search}' with limit {limit}")
url = f"r/{subreddit}/search.json"
posts = []
while len(posts) < limit: prefix = f"r/{category}/" if category else ""
batch_limit = min(100, limit - len(posts)) params = {'limit': post_limit}
if search:
endpoint = f"{prefix}search.json"
params.update({
'q': search,
'sort': 'new',
'restrict_sr': 'on' if category else 'off'
})
else:
endpoint = f"{prefix}new.json"
posts = []
after = None
while len(posts) < post_limit:
batch_limit = min(100, post_limit - len(posts))
params['limit'] = batch_limit params['limit'] = batch_limit
if after:
params['after'] = after
data = self._fetch_post_overviews(url, params) data = self._fetch_post_overviews(endpoint, params)
batch_posts = self._parse_posts(data)
if not data or 'data' not in data or not data['data'].get('children'):
logger.debug(f"Fetched {len(batch_posts)} posts from search in subreddit {subreddit}")
if not batch_posts:
break break
batch_posts = self._parse_posts(data)
posts.extend(batch_posts) posts.extend(batch_posts)
return posts after = data['data'].get('after')
if not after:
break
return posts[:post_limit]
def get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]: def _get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]:
posts = [] posts = []
after = None after = None
url = f"r/{subreddit}/new.json" url = f"r/{subreddit}/new.json"
@@ -75,6 +94,17 @@ class RedditAPI:
data = self._fetch_post_overviews(f"user/{username}/about.json", {}) data = self._fetch_post_overviews(f"user/{username}/about.json", {})
return self._parse_user(data) return self._parse_user(data)
def category_exists(self, category: str) -> bool:
try:
data = self._fetch_post_overviews(f"r/{category}/about.json", {})
return (
data is not None
and 'data' in data
and data['data'].get('id') is not None
)
except Exception:
return False
## Private Methods ## ## Private Methods ##
def _parse_posts(self, data) -> list[Post]: def _parse_posts(self, data) -> list[Post]:
posts = [] posts = []

View File

@@ -0,0 +1,30 @@
import pkgutil
import importlib
import server.connectors
from server.connectors.base import BaseConnector
def _discover_connectors() -> list[type[BaseConnector]]:
"""Walk the connectors package and collect all BaseConnector subclasses."""
for _, module_name, _ in pkgutil.iter_modules(server.connectors.__path__):
if module_name in ("base", "registry"):
continue
importlib.import_module(f"server.connectors.{module_name}")
return [
cls for cls in BaseConnector.__subclasses__()
if cls.source_name # guard against abstract intermediaries
]
def get_available_connectors() -> dict[str, type[BaseConnector]]:
return {c.source_name: c for c in _discover_connectors() if c.is_available()}
def get_connector_metadata() -> dict[str, dict]:
res = {}
for id, obj in get_available_connectors().items():
res[id] = {"id": id,
"label": obj.display_name,
"search_enabled": obj.search_enabled,
"categories_enabled": obj.categories_enabled
}
return res

View File

@@ -0,0 +1,96 @@
import os
import datetime
from dotenv import load_dotenv
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from dto.post import Post
from dto.comment import Comment
from server.connectors.base import BaseConnector
load_dotenv()
API_KEY = os.getenv("YOUTUBE_API_KEY")
class YouTubeAPI(BaseConnector):
source_name: str = "youtube"
display_name: str = "YouTube"
search_enabled: bool = True
categories_enabled: bool = False
def __init__(self):
self.youtube = build('youtube', 'v3', developerKey=API_KEY)
def get_new_posts_by_search(self,
search: str,
category: str,
post_limit: int
) -> list[Post]:
videos = self._search_videos(search, post_limit)
posts = []
for video in videos:
video_id = video['id']['videoId']
snippet = video['snippet']
title = snippet['title']
description = snippet['description']
published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
channel_title = snippet['channelTitle']
comments = []
comments_data = self._get_video_comments(video_id)
for comment_thread in comments_data:
comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
comment = Comment(
id=comment_thread['id'],
post_id=video_id,
content=comment_snippet['textDisplay'],
author=comment_snippet['authorDisplayName'],
timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(),
reply_to=None,
source=self.source_name
)
comments.append(comment)
post = Post(
id=video_id,
content=f"{title}\n\n{description}",
author=channel_title,
timestamp=published_at,
url=f"https://www.youtube.com/watch?v={video_id}",
title=title,
source=self.source_name,
comments=comments
)
posts.append(post)
return posts
def category_exists(self, category):
return True
def _search_videos(self, query, limit):
request = self.youtube.search().list(
q=query,
part='snippet',
type='video',
maxResults=limit
)
response = request.execute()
return response.get('items', [])
def _get_video_comments(self, video_id):
request = self.youtube.commentThreads().list(
part='snippet',
videoId=video_id,
textFormat='plainText'
)
try:
response = request.execute()
except HttpError as e:
print(f"Error fetching comments for video {video_id}: {e}")
return []
return response.get('items', [])

View File

@@ -1,6 +1,10 @@
import re
from server.db.database import PostgresConnector from server.db.database import PostgresConnector
from flask_bcrypt import Bcrypt from flask_bcrypt import Bcrypt
EMAIL_REGEX = re.compile(r"[^@]+@[^@]+\.[^@]+")
class AuthManager: class AuthManager:
def __init__(self, db: PostgresConnector, bcrypt: Bcrypt): def __init__(self, db: PostgresConnector, bcrypt: Bcrypt):
self.db = db self.db = db
@@ -18,6 +22,12 @@ class AuthManager:
def register_user(self, username, email, password): def register_user(self, username, email, password):
hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8") hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8")
if len(username) < 3:
raise ValueError("Username must be longer than 3 characters")
if not EMAIL_REGEX.match(email):
raise ValueError("Please enter a valid email address")
if self.get_user_by_email(email): if self.get_user_by_email(email):
raise ValueError("Email already registered") raise ValueError("Email already registered")

View File

@@ -1,7 +1,7 @@
import pandas as pd import pandas as pd
from server.db.database import PostgresConnector from server.db.database import PostgresConnector
from psycopg2.extras import Json from psycopg2.extras import Json
from server.exceptions import NotAuthorisedException, NonExistentDatasetException from server.exceptions import NonExistentDatasetException
class DatasetManager: class DatasetManager:
def __init__(self, db: PostgresConnector): def __init__(self, db: PostgresConnector):
@@ -114,7 +114,7 @@ class DatasetManager:
self.db.execute_batch(query, values) self.db.execute_batch(query, values)
def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None): def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None):
if status not in ["processing", "complete", "error"]: if status not in ["fetching", "processing", "complete", "error"]:
raise ValueError("Invalid status") raise ValueError("Invalid status")
query = """ query = """

View File

@@ -23,7 +23,7 @@ CREATE TABLE datasets (
-- Enforce valid states -- Enforce valid states
CONSTRAINT datasets_status_check CONSTRAINT datasets_status_check
CHECK (status IN ('processing', 'complete', 'error')) CHECK (status IN ('fetching', 'processing', 'complete', 'error'))
); );
CREATE TABLE events ( CREATE TABLE events (

View File

@@ -1,9 +1,13 @@
import pandas as pd import pandas as pd
import logging
from server.queue.celery_app import celery from server.queue.celery_app import celery
from server.analysis.enrichment import DatasetEnrichment from server.analysis.enrichment import DatasetEnrichment
from server.db.database import PostgresConnector from server.db.database import PostgresConnector
from server.core.datasets import DatasetManager from server.core.datasets import DatasetManager
from server.connectors.registry import get_available_connectors
logger = logging.getLogger(__name__)
@celery.task(bind=True, max_retries=3) @celery.task(bind=True, max_retries=3)
def process_dataset(self, dataset_id: int, posts: list, topics: dict): def process_dataset(self, dataset_id: int, posts: list, topics: dict):
@@ -16,6 +20,41 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict):
processor = DatasetEnrichment(df, topics) processor = DatasetEnrichment(df, topics)
enriched_df = processor.enrich() enriched_df = processor.enrich()
dataset_manager.save_dataset_content(dataset_id, enriched_df)
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
except Exception as e:
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")
@celery.task(bind=True, max_retries=3)
def fetch_and_process_dataset(self,
dataset_id: int,
source_info: list[dict],
topics: dict):
connectors = get_available_connectors()
db = PostgresConnector()
dataset_manager = DatasetManager(db)
posts = []
try:
for metadata in source_info:
name = metadata["name"]
search = metadata.get("search")
category = metadata.get("category")
limit = metadata.get("limit", 100)
connector = connectors[name]()
raw_posts = connector.get_new_posts_by_search(
search=search,
category=category,
post_limit=limit
)
posts.extend(post.to_dict() for post in raw_posts)
df = pd.DataFrame(posts)
processor = DatasetEnrichment(df, topics)
enriched_df = processor.enrich()
dataset_manager.save_dataset_content(dataset_id, enriched_df) dataset_manager.save_dataset_content(dataset_id, enriched_df)
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully") dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
except Exception as e: except Exception as e:

67
server/topics.json Normal file
View File

@@ -0,0 +1,67 @@
{
"Personal Life": "daily life, life updates, what happened today, personal stories, life events, reflections",
"Relationships": "dating, relationships, breakups, friendships, family relationships, marriage, relationship advice",
"Family & Parenting": "parents, parenting, children, raising kids, family dynamics, family stories",
"Work & Careers": "jobs, workplaces, office life, promotions, quitting jobs, career advice, workplace drama",
"Education": "school, studying, exams, university, homework, academic pressure, learning experiences",
"Money & Finance": "saving money, debt, budgeting, cost of living, financial advice, personal finance",
"Health & Fitness": "exercise, gym, workouts, running, diet, fitness routines, weight loss",
"Mental Health": "stress, anxiety, depression, burnout, therapy, emotional wellbeing",
"Food & Cooking": "meals, cooking, recipes, restaurants, snacks, food opinions",
"Travel": "holidays, trips, tourism, travel experiences, airports, flights, travel tips",
"Entertainment": "movies, TV shows, streaming services, celebrities, pop culture",
"Music": "songs, albums, artists, concerts, music opinions",
"Gaming": "video games, gaming culture, consoles, PC gaming, esports",
"Sports": "sports matches, teams, players, competitions, sports opinions",
"Technology": "phones, gadgets, apps, AI, software, tech trends",
"Internet Culture": "memes, viral trends, online jokes, internet drama, trending topics",
"Social Media": "platforms, influencers, content creators, algorithms, online communities",
"News & Current Events": "breaking news, world events, major incidents, public discussions",
"Politics": "political debates, elections, government policies, ideology",
"Culture & Society": "social issues, cultural trends, generational debates, societal changes",
"Identity & Lifestyle": "personal identity, lifestyle choices, values, self-expression",
"Hobbies & Interests": "art, photography, crafts, collecting, hobbies",
"Fashion & Beauty": "clothing, style, makeup, skincare, fashion trends",
"Animals & Pets": "pets, animal videos, pet care, wildlife",
"Humour": "jokes, funny stories, sarcasm, memes",
"Opinions & Debates": "hot takes, controversial opinions, arguments, discussions",
"Advice & Tips": "life advice, tutorials, how-to tips, recommendations",
"Product Reviews": "reviews, recommendations, experiences with products",
"Complaints & Rants": "frustrations, complaining, venting about things",
"Motivation & Inspiration": "motivational quotes, success stories, encouragement",
"Questions & Curiosity": "asking questions, seeking opinions, curiosity posts",
"Celebrations & Achievements": "birthdays, milestones, achievements, good news",
"Random Thoughts": "shower thoughts, observations, random ideas"
}

View File

@@ -1,4 +1,5 @@
import datetime import datetime
import os
from flask import request from flask import request
def parse_datetime_filter(value): def parse_datetime_filter(value):
@@ -48,3 +49,9 @@ def get_request_filters() -> dict:
filters["data_sources"] = data_sources filters["data_sources"] = data_sources
return filters return filters
def get_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise RuntimeError(f"Missing required environment variable: {name}")
return value