Compare commits
37 Commits
738af5415b
...
94befb61c5
| Author | SHA1 | Date | |
|---|---|---|---|
| 94befb61c5 | |||
| 12f5953146 | |||
| 5b0441c34b | |||
| d2b919cd66 | |||
| 062937ec3c | |||
| 2a00795cc2 | |||
| c990f29645 | |||
| 8a423b2a29 | |||
| d96f459104 | |||
| 162a4de64e | |||
| 6684780d23 | |||
| c12f1b4371 | |||
| 01d6bd0164 | |||
| 12cbc24074 | |||
| 0658713f42 | |||
| b2ae1a9f70 | |||
| eff416c34e | |||
| 524c9c50a0 | |||
| 2ab74d922a | |||
| d520e2af98 | |||
| 8fe84a30f6 | |||
| dc330b87b9 | |||
| 7ccc934f71 | |||
| a3dbe04a57 | |||
| a65c4a461c | |||
| 15704a0782 | |||
| 6ec47256d0 | |||
| 2572664e26 | |||
| 17bd4702b2 | |||
| 53cb5c2ea5 | |||
| 0866dda8b3 | |||
| 5ccb2e73cd | |||
| 2a8d7c7972 | |||
| e7a8c17be4 | |||
| cc799f7368 | |||
| 262a70dbf3 | |||
| ca444e9cb0 |
@@ -1,84 +0,0 @@
|
||||
import os
|
||||
import datetime
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from googleapiclient.discovery import build
|
||||
from googleapiclient.errors import HttpError
|
||||
from dto.post import Post
|
||||
from dto.comment import Comment
|
||||
|
||||
load_dotenv()
|
||||
|
||||
API_KEY = os.getenv("YOUTUBE_API_KEY")
|
||||
|
||||
class YouTubeAPI:
|
||||
def __init__(self):
|
||||
self.youtube = build('youtube', 'v3', developerKey=API_KEY)
|
||||
|
||||
def search_videos(self, query, limit):
|
||||
request = self.youtube.search().list(
|
||||
q=query,
|
||||
part='snippet',
|
||||
type='video',
|
||||
maxResults=limit
|
||||
)
|
||||
response = request.execute()
|
||||
return response.get('items', [])
|
||||
|
||||
def get_video_comments(self, video_id, limit):
|
||||
request = self.youtube.commentThreads().list(
|
||||
part='snippet',
|
||||
videoId=video_id,
|
||||
maxResults=limit,
|
||||
textFormat='plainText'
|
||||
)
|
||||
|
||||
try:
|
||||
response = request.execute()
|
||||
except HttpError as e:
|
||||
print(f"Error fetching comments for video {video_id}: {e}")
|
||||
return []
|
||||
return response.get('items', [])
|
||||
|
||||
def fetch_videos(self, query, video_limit, comment_limit) -> list[Post]:
|
||||
videos = self.search_videos(query, video_limit)
|
||||
posts = []
|
||||
|
||||
for video in videos:
|
||||
video_id = video['id']['videoId']
|
||||
snippet = video['snippet']
|
||||
title = snippet['title']
|
||||
description = snippet['description']
|
||||
published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
|
||||
channel_title = snippet['channelTitle']
|
||||
|
||||
comments = []
|
||||
comments_data = self.get_video_comments(video_id, comment_limit)
|
||||
for comment_thread in comments_data:
|
||||
comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
|
||||
comment = Comment(
|
||||
id=comment_thread['id'],
|
||||
post_id=video_id,
|
||||
content=comment_snippet['textDisplay'],
|
||||
author=comment_snippet['authorDisplayName'],
|
||||
timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(),
|
||||
reply_to=None,
|
||||
source="YouTube"
|
||||
)
|
||||
|
||||
comments.append(comment)
|
||||
|
||||
post = Post(
|
||||
id=video_id,
|
||||
content=f"{title}\n\n{description}",
|
||||
author=channel_title,
|
||||
timestamp=published_at,
|
||||
url=f"https://www.youtube.com/watch?v={video_id}",
|
||||
title=title,
|
||||
source="YouTube",
|
||||
comments=comments
|
||||
)
|
||||
|
||||
posts.append(post)
|
||||
|
||||
return posts
|
||||
@@ -1,43 +0,0 @@
|
||||
import json
|
||||
import logging
|
||||
from connectors.reddit_api import RedditAPI
|
||||
from connectors.boards_api import BoardsAPI
|
||||
from connectors.youtube_api import YouTubeAPI
|
||||
|
||||
posts_file = 'posts_test.jsonl'
|
||||
|
||||
reddit_connector = RedditAPI()
|
||||
boards_connector = BoardsAPI()
|
||||
youtube_connector = YouTubeAPI()
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
|
||||
def remove_empty_posts(posts):
|
||||
return [post for post in posts if post.content.strip() != ""]
|
||||
|
||||
def save_to_jsonl(filename, posts):
|
||||
with open(filename, 'a', encoding='utf-8') as f:
|
||||
for post in posts:
|
||||
# Convert post object to dict if it's a dataclass
|
||||
data = post.to_dict()
|
||||
f.write(json.dumps(data) + '\n')
|
||||
|
||||
|
||||
def main():
|
||||
boards_posts = boards_connector.get_new_category_posts('cork-city', 1200, 1200)
|
||||
save_to_jsonl(posts_file, boards_posts)
|
||||
|
||||
reddit_posts = reddit_connector.get_new_subreddit_posts('cork', 1200)
|
||||
reddit_posts = remove_empty_posts(reddit_posts)
|
||||
save_to_jsonl(posts_file, reddit_posts)
|
||||
|
||||
ireland_posts = reddit_connector.search_new_subreddit_posts('cork', 'ireland', 1200)
|
||||
ireland_posts = remove_empty_posts(ireland_posts)
|
||||
save_to_jsonl(posts_file, ireland_posts)
|
||||
|
||||
youtube_videos = youtube_connector.fetch_videos('cork city', 1200, 1200)
|
||||
save_to_jsonl(posts_file, youtube_videos)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -43,7 +43,7 @@ services:
|
||||
- .env
|
||||
command: >
|
||||
celery -A server.queue.celery_app.celery worker
|
||||
--loglevel=info
|
||||
--loglevel=debug
|
||||
--pool=solo
|
||||
depends_on:
|
||||
- postgres
|
||||
|
||||
@@ -5,6 +5,7 @@ import DatasetsPage from "./pages/Datasets";
|
||||
import DatasetStatusPage from "./pages/DatasetStatus";
|
||||
import LoginPage from "./pages/Login";
|
||||
import UploadPage from "./pages/Upload";
|
||||
import AutoScrapePage from "./pages/AutoScrape";
|
||||
import StatPage from "./pages/Stats";
|
||||
import { getDocumentTitle } from "./utils/documentTitle";
|
||||
import DatasetEditPage from "./pages/DatasetEdit";
|
||||
@@ -22,6 +23,7 @@ function App() {
|
||||
<Route path="/" element={<Navigate to="/login" replace />} />
|
||||
<Route path="/login" element={<LoginPage />} />
|
||||
<Route path="/upload" element={<UploadPage />} />
|
||||
<Route path="/auto-scrape" element={<AutoScrapePage />} />
|
||||
<Route path="/datasets" element={<DatasetsPage />} />
|
||||
<Route path="/dataset/:datasetId/status" element={<DatasetStatusPage />} />
|
||||
<Route path="/dataset/:datasetId/stats" element={<StatPage />} />
|
||||
|
||||
338
frontend/src/pages/AutoScrape.tsx
Normal file
338
frontend/src/pages/AutoScrape.tsx
Normal file
@@ -0,0 +1,338 @@
|
||||
import axios from "axios";
|
||||
import { useEffect, useState } from "react";
|
||||
import { useNavigate } from "react-router-dom";
|
||||
import StatsStyling from "../styles/stats_styling";
|
||||
|
||||
const styles = StatsStyling;
|
||||
const API_BASE_URL = import.meta.env.VITE_BACKEND_URL;
|
||||
|
||||
type SourceOption = {
|
||||
id: string;
|
||||
label: string;
|
||||
search_enabled?: boolean;
|
||||
categories_enabled?: boolean;
|
||||
searchEnabled?: boolean;
|
||||
categoriesEnabled?: boolean;
|
||||
};
|
||||
|
||||
type SourceConfig = {
|
||||
sourceName: string;
|
||||
limit: string;
|
||||
search: string;
|
||||
category: string;
|
||||
};
|
||||
|
||||
const buildEmptySourceConfig = (sourceName = ""): SourceConfig => ({
|
||||
sourceName,
|
||||
limit: "100",
|
||||
search: "",
|
||||
category: "",
|
||||
});
|
||||
|
||||
const supportsSearch = (source?: SourceOption): boolean =>
|
||||
Boolean(source?.search_enabled ?? source?.searchEnabled);
|
||||
|
||||
const supportsCategories = (source?: SourceOption): boolean =>
|
||||
Boolean(source?.categories_enabled ?? source?.categoriesEnabled);
|
||||
|
||||
const AutoScrapePage = () => {
|
||||
const navigate = useNavigate();
|
||||
const [datasetName, setDatasetName] = useState("");
|
||||
const [sourceOptions, setSourceOptions] = useState<SourceOption[]>([]);
|
||||
const [sourceConfigs, setSourceConfigs] = useState<SourceConfig[]>([]);
|
||||
const [returnMessage, setReturnMessage] = useState("");
|
||||
const [isLoadingSources, setIsLoadingSources] = useState(true);
|
||||
const [isSubmitting, setIsSubmitting] = useState(false);
|
||||
const [hasError, setHasError] = useState(false);
|
||||
|
||||
useEffect(() => {
|
||||
axios
|
||||
.get<SourceOption[]>(`${API_BASE_URL}/datasets/sources`)
|
||||
.then((response) => {
|
||||
const options = response.data || [];
|
||||
setSourceOptions(options);
|
||||
setSourceConfigs([buildEmptySourceConfig(options[0]?.id || "")]);
|
||||
})
|
||||
.catch((requestError: unknown) => {
|
||||
setHasError(true);
|
||||
if (axios.isAxiosError(requestError)) {
|
||||
setReturnMessage(
|
||||
`Failed to load available sources: ${String(
|
||||
requestError.response?.data?.error || requestError.message
|
||||
)}`
|
||||
);
|
||||
} else {
|
||||
setReturnMessage("Failed to load available sources.");
|
||||
}
|
||||
})
|
||||
.finally(() => {
|
||||
setIsLoadingSources(false);
|
||||
});
|
||||
}, []);
|
||||
|
||||
const updateSourceConfig = (index: number, field: keyof SourceConfig, value: string) => {
|
||||
setSourceConfigs((previous) =>
|
||||
previous.map((config, configIndex) =>
|
||||
configIndex === index
|
||||
? field === "sourceName"
|
||||
? { ...config, sourceName: value, search: "", category: "" }
|
||||
: { ...config, [field]: value }
|
||||
: config
|
||||
)
|
||||
);
|
||||
};
|
||||
|
||||
const getSourceOption = (sourceName: string) =>
|
||||
sourceOptions.find((option) => option.id === sourceName);
|
||||
|
||||
const addSourceConfig = () => {
|
||||
setSourceConfigs((previous) => [
|
||||
...previous,
|
||||
buildEmptySourceConfig(sourceOptions[0]?.id || ""),
|
||||
]);
|
||||
};
|
||||
|
||||
const removeSourceConfig = (index: number) => {
|
||||
setSourceConfigs((previous) => previous.filter((_, configIndex) => configIndex !== index));
|
||||
};
|
||||
|
||||
const autoScrape = async () => {
|
||||
const token = localStorage.getItem("access_token");
|
||||
if (!token) {
|
||||
setHasError(true);
|
||||
setReturnMessage("You must be signed in to auto scrape a dataset.");
|
||||
return;
|
||||
}
|
||||
|
||||
const normalizedDatasetName = datasetName.trim();
|
||||
if (!normalizedDatasetName) {
|
||||
setHasError(true);
|
||||
setReturnMessage("Please add a dataset name before continuing.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (sourceConfigs.length === 0) {
|
||||
setHasError(true);
|
||||
setReturnMessage("Please add at least one source.");
|
||||
return;
|
||||
}
|
||||
|
||||
const normalizedSources = sourceConfigs.map((source) => {
|
||||
const sourceOption = getSourceOption(source.sourceName);
|
||||
|
||||
return {
|
||||
name: source.sourceName,
|
||||
limit: Number(source.limit || 100),
|
||||
search: supportsSearch(sourceOption) ? source.search.trim() || undefined : undefined,
|
||||
category: supportsCategories(sourceOption)
|
||||
? source.category.trim() || undefined
|
||||
: undefined,
|
||||
};
|
||||
});
|
||||
|
||||
const invalidSource = normalizedSources.find(
|
||||
(source) => !source.name || !Number.isFinite(source.limit) || source.limit <= 0
|
||||
);
|
||||
|
||||
if (invalidSource) {
|
||||
setHasError(true);
|
||||
setReturnMessage("Every source needs a name and a limit greater than zero.");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
setIsSubmitting(true);
|
||||
setHasError(false);
|
||||
setReturnMessage("");
|
||||
|
||||
const response = await axios.post(
|
||||
`${API_BASE_URL}/datasets/scrape`,
|
||||
{
|
||||
name: normalizedDatasetName,
|
||||
sources: normalizedSources,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
const datasetId = Number(response.data.dataset_id);
|
||||
|
||||
setReturnMessage(
|
||||
`Auto scrape queued successfully (dataset #${datasetId}). Redirecting to processing status...`
|
||||
);
|
||||
|
||||
setTimeout(() => {
|
||||
navigate(`/dataset/${datasetId}/status`);
|
||||
}, 400);
|
||||
} catch (requestError: unknown) {
|
||||
setHasError(true);
|
||||
if (axios.isAxiosError(requestError)) {
|
||||
const message = String(
|
||||
requestError.response?.data?.error || requestError.message || "Auto scrape failed."
|
||||
);
|
||||
setReturnMessage(`Auto scrape failed: ${message}`);
|
||||
} else {
|
||||
setReturnMessage("Auto scrape failed due to an unexpected error.");
|
||||
}
|
||||
} finally {
|
||||
setIsSubmitting(false);
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<div style={styles.page}>
|
||||
<div style={styles.containerWide}>
|
||||
<div style={{ ...styles.card, ...styles.headerBar }}>
|
||||
<div>
|
||||
<h1 style={styles.sectionHeaderTitle}>Auto Scrape Dataset</h1>
|
||||
<p style={styles.sectionHeaderSubtitle}>
|
||||
Select sources and scrape settings, then queue processing automatically.
|
||||
</p>
|
||||
</div>
|
||||
<button
|
||||
type="button"
|
||||
style={{ ...styles.buttonPrimary, opacity: isSubmitting || isLoadingSources ? 0.75 : 1 }}
|
||||
onClick={autoScrape}
|
||||
disabled={isSubmitting || isLoadingSources}
|
||||
>
|
||||
{isSubmitting ? "Queueing..." : "Auto Scrape and Analyze"}
|
||||
</button>
|
||||
</div>
|
||||
|
||||
<div
|
||||
style={{
|
||||
...styles.grid,
|
||||
marginTop: 14,
|
||||
gridTemplateColumns: "repeat(auto-fit, minmax(280px, 1fr))",
|
||||
}}
|
||||
>
|
||||
<div style={{ ...styles.card, gridColumn: "auto" }}>
|
||||
<h2 style={{ ...styles.sectionTitle, color: "#24292f" }}>Dataset Name</h2>
|
||||
<p style={styles.sectionSubtitle}>Use a clear label so you can identify this run later.</p>
|
||||
<input
|
||||
style={{ ...styles.input, ...styles.inputFullWidth }}
|
||||
type="text"
|
||||
placeholder="Example: r/cork subreddit - Jan 2026"
|
||||
value={datasetName}
|
||||
onChange={(event) => setDatasetName(event.target.value)}
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div style={{ ...styles.card, gridColumn: "auto" }}>
|
||||
<h2 style={{ ...styles.sectionTitle, color: "#24292f" }}>Sources</h2>
|
||||
<p style={styles.sectionSubtitle}>
|
||||
Configure source, limit, optional search, and optional category.
|
||||
</p>
|
||||
|
||||
{isLoadingSources && <p style={styles.subtleBodyText}>Loading sources...</p>}
|
||||
|
||||
{!isLoadingSources && sourceOptions.length === 0 && (
|
||||
<p style={styles.subtleBodyText}>No source connectors are currently available.</p>
|
||||
)}
|
||||
|
||||
{!isLoadingSources && sourceOptions.length > 0 && (
|
||||
<div style={{ display: "flex", flexDirection: "column", gap: 10 }}>
|
||||
{sourceConfigs.map((source, index) => {
|
||||
const sourceOption = getSourceOption(source.sourceName);
|
||||
const searchEnabled = supportsSearch(sourceOption);
|
||||
const categoriesEnabled = supportsCategories(sourceOption);
|
||||
|
||||
return (
|
||||
<div
|
||||
key={`source-${index}`}
|
||||
style={{
|
||||
border: "1px solid #d0d7de",
|
||||
borderRadius: 8,
|
||||
padding: 12,
|
||||
background: "#f6f8fa",
|
||||
display: "grid",
|
||||
gap: 8,
|
||||
}}
|
||||
>
|
||||
<select
|
||||
value={source.sourceName}
|
||||
style={{ ...styles.input, ...styles.inputFullWidth }}
|
||||
onChange={(event) => updateSourceConfig(index, "sourceName", event.target.value)}
|
||||
>
|
||||
{sourceOptions.map((option) => (
|
||||
<option key={option.id} value={option.id}>
|
||||
{option.label}
|
||||
</option>
|
||||
))}
|
||||
</select>
|
||||
|
||||
<input
|
||||
type="number"
|
||||
min={1}
|
||||
value={source.limit}
|
||||
placeholder="Limit"
|
||||
style={{ ...styles.input, ...styles.inputFullWidth }}
|
||||
onChange={(event) => updateSourceConfig(index, "limit", event.target.value)}
|
||||
/>
|
||||
|
||||
<input
|
||||
type="text"
|
||||
value={source.search}
|
||||
placeholder={
|
||||
searchEnabled
|
||||
? "Search term (optional)"
|
||||
: "Search not supported for this source"
|
||||
}
|
||||
style={{ ...styles.input, ...styles.inputFullWidth }}
|
||||
disabled={!searchEnabled}
|
||||
onChange={(event) => updateSourceConfig(index, "search", event.target.value)}
|
||||
/>
|
||||
|
||||
<input
|
||||
type="text"
|
||||
value={source.category}
|
||||
placeholder={
|
||||
categoriesEnabled
|
||||
? "Category (optional)"
|
||||
: "Categories not supported for this source"
|
||||
}
|
||||
style={{ ...styles.input, ...styles.inputFullWidth }}
|
||||
disabled={!categoriesEnabled}
|
||||
onChange={(event) => updateSourceConfig(index, "category", event.target.value)}
|
||||
/>
|
||||
|
||||
{sourceConfigs.length > 1 && (
|
||||
<button
|
||||
type="button"
|
||||
style={styles.buttonSecondary}
|
||||
onClick={() => removeSourceConfig(index)}
|
||||
>
|
||||
Remove source
|
||||
</button>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
|
||||
<button type="button" style={styles.buttonSecondary} onClick={addSourceConfig}>
|
||||
Add another source
|
||||
</button>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div
|
||||
style={{
|
||||
...styles.card,
|
||||
marginTop: 14,
|
||||
...(hasError ? styles.alertCardError : styles.alertCardInfo),
|
||||
}}
|
||||
>
|
||||
{returnMessage ||
|
||||
"After queueing, your dataset is fetched and processed in the background automatically."}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
export default AutoScrapePage;
|
||||
@@ -9,7 +9,7 @@ const API_BASE_URL = import.meta.env.VITE_BACKEND_URL;
|
||||
type DatasetItem = {
|
||||
id: number;
|
||||
name?: string;
|
||||
status?: "processing" | "complete" | "error" | string;
|
||||
status?: "processing" | "complete" | "error" | "fetching" | string;
|
||||
status_message?: string | null;
|
||||
completed_at?: string | null;
|
||||
created_at?: string | null;
|
||||
@@ -50,7 +50,24 @@ const DatasetsPage = () => {
|
||||
}, []);
|
||||
|
||||
if (loading) {
|
||||
return <p style={{ ...styles.page, minHeight: "100vh" }}>Loading datasets...</p>;
|
||||
return (
|
||||
<div style={styles.loadingPage}>
|
||||
<div style={{ ...styles.loadingCard, transform: "translateY(-100px)" }}>
|
||||
<div style={styles.loadingHeader}>
|
||||
<div style={styles.loadingSpinner} />
|
||||
<div>
|
||||
<h2 style={styles.loadingTitle}>Loading datasets</h2>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div style={styles.loadingSkeleton}>
|
||||
<div style={{ ...styles.loadingSkeletonLine, ...styles.loadingSkeletonLineLong }} />
|
||||
<div style={{ ...styles.loadingSkeletonLine, ...styles.loadingSkeletonLineMed }} />
|
||||
<div style={{ ...styles.loadingSkeletonLine, ...styles.loadingSkeletonLineShort }} />
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
return (
|
||||
@@ -63,9 +80,18 @@ const DatasetsPage = () => {
|
||||
View and reopen datasets you previously uploaded.
|
||||
</p>
|
||||
</div>
|
||||
<div style={styles.controlsWrapped}>
|
||||
<button type="button" style={styles.buttonPrimary} onClick={() => navigate("/upload")}>
|
||||
Upload New Dataset
|
||||
</button>
|
||||
<button
|
||||
type="button"
|
||||
style={styles.buttonSecondary}
|
||||
onClick={() => navigate("/auto-scrape")}
|
||||
>
|
||||
Auto Scrape Dataset
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{error && (
|
||||
@@ -93,7 +119,7 @@ const DatasetsPage = () => {
|
||||
<div style={{ ...styles.card, marginTop: 14, padding: 0, overflow: "hidden" }}>
|
||||
<ul style={styles.listNoBullets}>
|
||||
{datasets.map((dataset) => {
|
||||
const isComplete = dataset.status === "complete";
|
||||
const isComplete = dataset.status === "complete" || dataset.status === "error";
|
||||
const editPath = `/dataset/${dataset.id}/edit`;
|
||||
const targetPath = isComplete
|
||||
? `/dataset/${dataset.id}/stats`
|
||||
|
||||
@@ -40,7 +40,7 @@ const UploadPage = () => {
|
||||
setHasError(false);
|
||||
setReturnMessage("");
|
||||
|
||||
const response = await axios.post(`${API_BASE_URL}/upload`, formData, {
|
||||
const response = await axios.post(`${API_BASE_URL}/datasets/upload`, formData, {
|
||||
headers: {
|
||||
"Content-Type": "multipart/form-data",
|
||||
},
|
||||
|
||||
@@ -3,6 +3,7 @@ const DEFAULT_TITLE = "Ethnograph View";
|
||||
const STATIC_TITLES: Record<string, string> = {
|
||||
"/login": "Sign In",
|
||||
"/upload": "Upload Dataset",
|
||||
"/auto-scrape": "Auto Scrape Dataset",
|
||||
"/datasets": "My Datasets",
|
||||
};
|
||||
|
||||
|
||||
142
server/app.py
142
server/app.py
@@ -19,32 +19,38 @@ from server.exceptions import NotAuthorisedException, NonExistentDatasetExceptio
|
||||
from server.db.database import PostgresConnector
|
||||
from server.core.auth import AuthManager
|
||||
from server.core.datasets import DatasetManager
|
||||
from server.utils import get_request_filters
|
||||
from server.queue.tasks import process_dataset
|
||||
from server.utils import get_request_filters, get_env
|
||||
from server.queue.tasks import process_dataset, fetch_and_process_dataset
|
||||
from server.connectors.registry import get_available_connectors, get_connector_metadata
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
# Env Variables
|
||||
load_dotenv()
|
||||
frontend_url = os.getenv("FRONTEND_URL", "http://localhost:5173")
|
||||
jwt_secret_key = os.getenv("JWT_SECRET_KEY", "super-secret-change-this")
|
||||
jwt_access_token_expires = int(
|
||||
os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200)
|
||||
) # Default to 20 minutes
|
||||
max_fetch_limit = int(get_env("MAX_FETCH_LIMIT"))
|
||||
frontend_url = get_env("FRONTEND_URL")
|
||||
jwt_secret_key = get_env("JWT_SECRET_KEY")
|
||||
jwt_access_token_expires = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRES", 1200)) # Default to 20 minutes
|
||||
|
||||
# Flask Configuration
|
||||
CORS(app, resources={r"/*": {"origins": frontend_url}})
|
||||
app.config["JWT_SECRET_KEY"] = jwt_secret_key
|
||||
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = jwt_access_token_expires
|
||||
|
||||
# Security
|
||||
bcrypt = Bcrypt(app)
|
||||
jwt = JWTManager(app)
|
||||
|
||||
# Helper Objects
|
||||
db = PostgresConnector()
|
||||
auth_manager = AuthManager(db, bcrypt)
|
||||
dataset_manager = DatasetManager(db)
|
||||
stat_gen = StatGen()
|
||||
connectors = get_available_connectors()
|
||||
|
||||
# Default Files
|
||||
with open("server/topics.json") as f:
|
||||
default_topic_list = json.load(f)
|
||||
|
||||
@app.route("/register", methods=["POST"])
|
||||
def register_user():
|
||||
@@ -68,7 +74,7 @@ def register_user():
|
||||
return jsonify({"error": str(e)}), 400
|
||||
except Exception as e:
|
||||
print(traceback.format_exc())
|
||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||
return jsonify({"error": f"An unexpected error occurred"}), 500
|
||||
|
||||
print(f"Registered new user: {username}")
|
||||
return jsonify({"message": f"User '{username}' registered successfully"}), 200
|
||||
@@ -93,7 +99,7 @@ def login_user():
|
||||
return jsonify({"error": "Invalid username or password"}), 401
|
||||
except Exception as e:
|
||||
print(traceback.format_exc())
|
||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||
return jsonify({"error": f"An unexpected error occurred"}), 500
|
||||
|
||||
|
||||
@app.route("/profile", methods=["GET"])
|
||||
@@ -111,7 +117,95 @@ def get_user_datasets():
|
||||
current_user = int(get_jwt_identity())
|
||||
return jsonify(dataset_manager.get_user_datasets(current_user)), 200
|
||||
|
||||
@app.route("/upload", methods=["POST"])
|
||||
@app.route("/datasets/sources", methods=["GET"])
|
||||
def get_dataset_sources():
|
||||
list_metadata = list(get_connector_metadata().values())
|
||||
return jsonify(list_metadata)
|
||||
|
||||
@app.route("/datasets/scrape", methods=["POST"])
|
||||
@jwt_required()
|
||||
def scrape_data():
|
||||
data = request.get_json()
|
||||
connector_metadata = get_connector_metadata()
|
||||
|
||||
# Strong validation needed, otherwise data goes to Celery and crashes silently
|
||||
if not data or "sources" not in data:
|
||||
return jsonify({"error": "Sources must be provided"}), 400
|
||||
|
||||
if "name" not in data or not str(data["name"]).strip():
|
||||
return jsonify({"error": "Dataset name is required"}), 400
|
||||
|
||||
dataset_name = data["name"].strip()
|
||||
user_id = int(get_jwt_identity())
|
||||
|
||||
source_configs = data["sources"]
|
||||
|
||||
if not isinstance(source_configs, list) or len(source_configs) == 0:
|
||||
return jsonify({"error": "Sources must be a non-empty list"}), 400
|
||||
|
||||
for source in source_configs:
|
||||
if not isinstance(source, dict):
|
||||
return jsonify({"error": "Each source must be an object"}), 400
|
||||
|
||||
if "name" not in source:
|
||||
return jsonify({"error": "Each source must contain a name"}), 400
|
||||
|
||||
name = source["name"]
|
||||
limit = source.get("limit", 1000)
|
||||
category = source.get("category")
|
||||
search = source.get("search")
|
||||
|
||||
if limit:
|
||||
try:
|
||||
limit = int(limit)
|
||||
except (ValueError, TypeError):
|
||||
return jsonify({"error": "Limit must be an integer"}), 400
|
||||
|
||||
if limit > 1000:
|
||||
limit = 1000
|
||||
|
||||
if name not in connector_metadata:
|
||||
return jsonify({"error": "Source not supported"}), 400
|
||||
|
||||
if search and not connector_metadata[name]["search_enabled"]:
|
||||
return jsonify({"error": f"Source {name} does not support search"}), 400
|
||||
|
||||
if category and not connector_metadata[name]["categories_enabled"]:
|
||||
return jsonify({"error": f"Source {name} does not support categories"}), 400
|
||||
|
||||
if category and not connectors[name]().category_exists(category):
|
||||
return jsonify({"error": f"Category does not exist for {name}"}), 400
|
||||
|
||||
try:
|
||||
dataset_id = dataset_manager.save_dataset_info(
|
||||
user_id,
|
||||
dataset_name,
|
||||
default_topic_list
|
||||
)
|
||||
|
||||
dataset_manager.set_dataset_status(
|
||||
dataset_id,
|
||||
"fetching",
|
||||
f"Data is being fetched from {', '.join(source['name'] for source in source_configs)}"
|
||||
)
|
||||
|
||||
fetch_and_process_dataset.delay(
|
||||
dataset_id,
|
||||
source_configs,
|
||||
default_topic_list
|
||||
)
|
||||
except Exception:
|
||||
print(traceback.format_exc())
|
||||
return jsonify({"error": "Failed to queue dataset processing"}), 500
|
||||
|
||||
|
||||
return jsonify({
|
||||
"message": "Dataset queued for processing",
|
||||
"dataset_id": dataset_id,
|
||||
"status": "processing"
|
||||
}), 202
|
||||
|
||||
@app.route("/datasets/upload", methods=["POST"])
|
||||
@jwt_required()
|
||||
def upload_data():
|
||||
if "posts" not in request.files or "topics" not in request.files:
|
||||
@@ -151,9 +245,9 @@ def upload_data():
|
||||
}
|
||||
), 202
|
||||
except ValueError as e:
|
||||
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
|
||||
return jsonify({"error": f"Failed to read JSONL file"}), 400
|
||||
except Exception as e:
|
||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||
return jsonify({"error": f"An unexpected error occurred"}), 500
|
||||
|
||||
@app.route("/dataset/<int:dataset_id>", methods=["GET"])
|
||||
@jwt_required()
|
||||
@@ -256,10 +350,10 @@ def content_endpoint(dataset_id):
|
||||
except NonExistentDatasetException:
|
||||
return jsonify({"error": "Dataset does not exist"}), 404
|
||||
except ValueError as e:
|
||||
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
|
||||
return jsonify({"error": f"Malformed or missing data"}), 400
|
||||
except Exception as e:
|
||||
print(traceback.format_exc())
|
||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||
return jsonify({"error": f"An unexpected error occurred"}), 500
|
||||
|
||||
|
||||
@app.route("/dataset/<int:dataset_id>/summary", methods=["GET"])
|
||||
@@ -278,10 +372,10 @@ def get_summary(dataset_id):
|
||||
except NonExistentDatasetException:
|
||||
return jsonify({"error": "Dataset does not exist"}), 404
|
||||
except ValueError as e:
|
||||
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
|
||||
return jsonify({"error": f"Malformed or missing data"}), 400
|
||||
except Exception as e:
|
||||
print(traceback.format_exc())
|
||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||
return jsonify({"error": f"An unexpected error occurred"}), 500
|
||||
|
||||
|
||||
@app.route("/dataset/<int:dataset_id>/time", methods=["GET"])
|
||||
@@ -300,10 +394,10 @@ def get_time_analysis(dataset_id):
|
||||
except NonExistentDatasetException:
|
||||
return jsonify({"error": "Dataset does not exist"}), 404
|
||||
except ValueError as e:
|
||||
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
|
||||
return jsonify({"error": f"Malformed or missing data"}), 400
|
||||
except Exception as e:
|
||||
print(traceback.format_exc())
|
||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||
return jsonify({"error": f"An unexpected error occurred"}), 500
|
||||
|
||||
|
||||
@app.route("/dataset/<int:dataset_id>/user", methods=["GET"])
|
||||
@@ -322,10 +416,10 @@ def get_user_analysis(dataset_id):
|
||||
except NonExistentDatasetException:
|
||||
return jsonify({"error": "Dataset does not exist"}), 404
|
||||
except ValueError as e:
|
||||
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
|
||||
return jsonify({"error": f"Malformed or missing data"}), 400
|
||||
except Exception as e:
|
||||
print(traceback.format_exc())
|
||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||
return jsonify({"error": f"An unexpected error occurred"}), 500
|
||||
|
||||
|
||||
@app.route("/dataset/<int:dataset_id>/cultural", methods=["GET"])
|
||||
@@ -344,10 +438,10 @@ def get_cultural_analysis(dataset_id):
|
||||
except NonExistentDatasetException:
|
||||
return jsonify({"error": "Dataset does not exist"}), 404
|
||||
except ValueError as e:
|
||||
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
|
||||
return jsonify({"error": f"Malformed or missing data"}), 400
|
||||
except Exception as e:
|
||||
print(traceback.format_exc())
|
||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||
return jsonify({"error": f"An unexpected error occurred"}), 500
|
||||
|
||||
|
||||
@app.route("/dataset/<int:dataset_id>/interaction", methods=["GET"])
|
||||
@@ -366,10 +460,10 @@ def get_interaction_analysis(dataset_id):
|
||||
except NonExistentDatasetException:
|
||||
return jsonify({"error": "Dataset does not exist"}), 404
|
||||
except ValueError as e:
|
||||
return jsonify({"error": f"Malformed or missing data: {str(e)}"}), 400
|
||||
return jsonify({"error": f"Malformed or missing data"}), 400
|
||||
except Exception as e:
|
||||
print(traceback.format_exc())
|
||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||
return jsonify({"error": f"An unexpected error occurred"}), 500
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
29
server/connectors/base.py
Normal file
29
server/connectors/base.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from dto.post import Post
|
||||
|
||||
class BaseConnector(ABC):
|
||||
# Each subclass declares these at the class level
|
||||
source_name: str # machine-readable: "reddit", "youtube"
|
||||
display_name: str # human-readable: "Reddit", "YouTube"
|
||||
required_env: list[str] = [] # env vars needed to activate
|
||||
|
||||
search_enabled: bool
|
||||
categories_enabled: bool
|
||||
|
||||
@classmethod
|
||||
def is_available(cls) -> bool:
|
||||
"""Returns True if all required env vars are set."""
|
||||
import os
|
||||
return all(os.getenv(var) for var in cls.required_env)
|
||||
|
||||
@abstractmethod
|
||||
def get_new_posts_by_search(self,
|
||||
search: str = None,
|
||||
category: str = None,
|
||||
post_limit: int = 10
|
||||
) -> list[Post]:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def category_exists(self, category: str) -> bool:
|
||||
...
|
||||
@@ -7,6 +7,7 @@ from dto.post import Post
|
||||
from dto.comment import Comment
|
||||
from bs4 import BeautifulSoup
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from server.connectors.base import BaseConnector
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -14,25 +15,64 @@ HEADERS = {
|
||||
"User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)"
|
||||
}
|
||||
|
||||
class BoardsAPI:
|
||||
def __init__(self):
|
||||
self.url = "https://www.boards.ie"
|
||||
self.source_name = "Boards.ie"
|
||||
class BoardsAPI(BaseConnector):
|
||||
source_name: str = "boards.ie"
|
||||
display_name: str = "Boards.ie"
|
||||
|
||||
def get_new_category_posts(self, category: str, post_limit: int, comment_limit: int) -> list[Post]:
|
||||
categories_enabled: bool = True
|
||||
search_enabled: bool = False
|
||||
|
||||
def __init__(self):
|
||||
self.base_url = "https://www.boards.ie"
|
||||
|
||||
def get_new_posts_by_search(self,
|
||||
search: str,
|
||||
category: str,
|
||||
post_limit: int
|
||||
) -> list[Post]:
|
||||
if search:
|
||||
raise NotImplementedError("Search not compatible with boards.ie")
|
||||
|
||||
if category:
|
||||
return self._get_posts(f"{self.base_url}/categories/{category}", post_limit)
|
||||
else:
|
||||
return self._get_posts(f"{self.base_url}/discussions", post_limit)
|
||||
|
||||
def category_exists(self, category: str) -> bool:
|
||||
if not category:
|
||||
return False
|
||||
|
||||
url = f"{self.base_url}/categories/{category}"
|
||||
|
||||
try:
|
||||
response = requests.head(url, headers=HEADERS, allow_redirects=True)
|
||||
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
if response.status_code == 404:
|
||||
return False
|
||||
|
||||
# fallback if HEAD not supported
|
||||
response = requests.get(url, headers=HEADERS)
|
||||
return response.status_code == 200
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"Error checking category '{category}': {e}")
|
||||
return False
|
||||
|
||||
## Private
|
||||
def _get_posts(self, url, limit) -> list[Post]:
|
||||
urls = []
|
||||
current_page = 1
|
||||
|
||||
logger.info(f"Fetching posts from category: {category}")
|
||||
|
||||
while len(urls) < post_limit:
|
||||
url = f"{self.url}/categories/{category}/p{current_page}"
|
||||
while len(urls) < limit:
|
||||
url = f"{url}/p{current_page}"
|
||||
html = self._fetch_page(url)
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
|
||||
logger.debug(f"Processing page {current_page} for category {category}")
|
||||
logger.debug(f"Processing page {current_page} for link: {url}")
|
||||
for a in soup.select("a.threadbit-threadlink"):
|
||||
if len(urls) >= post_limit:
|
||||
if len(urls) >= limit:
|
||||
break
|
||||
|
||||
href = a.get("href")
|
||||
@@ -41,14 +81,14 @@ class BoardsAPI:
|
||||
|
||||
current_page += 1
|
||||
|
||||
logger.debug(f"Fetched {len(urls)} post URLs from category {category}")
|
||||
logger.debug(f"Fetched {len(urls)} post URLs")
|
||||
|
||||
# Fetch post details for each URL and create Post objects
|
||||
posts = []
|
||||
|
||||
def fetch_and_parse(post_url):
|
||||
html = self._fetch_page(post_url)
|
||||
post = self._parse_thread(html, post_url, comment_limit)
|
||||
post = self._parse_thread(html, post_url)
|
||||
return post
|
||||
|
||||
with ThreadPoolExecutor(max_workers=30) as executor:
|
||||
@@ -71,7 +111,7 @@ class BoardsAPI:
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
|
||||
def _parse_thread(self, html: str, post_url: str, comment_limit: int) -> Post:
|
||||
def _parse_thread(self, html: str, post_url: str) -> Post:
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
|
||||
# Author
|
||||
@@ -100,7 +140,7 @@ class BoardsAPI:
|
||||
title = title_tag.text.strip() if title_tag else None
|
||||
|
||||
# Comments
|
||||
comments = self._parse_comments(post_url, post_num, comment_limit)
|
||||
comments = self._parse_comments(post_url, post_num)
|
||||
|
||||
post = Post(
|
||||
id=post_num,
|
||||
@@ -115,11 +155,11 @@ class BoardsAPI:
|
||||
|
||||
return post
|
||||
|
||||
def _parse_comments(self, url: str, post_id: str, comment_limit: int) -> list[Comment]:
|
||||
def _parse_comments(self, url: str, post_id: str) -> list[Comment]:
|
||||
comments = []
|
||||
current_url = url
|
||||
|
||||
while current_url and len(comments) < comment_limit:
|
||||
while current_url:
|
||||
html = self._fetch_page(current_url)
|
||||
page_comments = self._parse_page_comments(html, post_id)
|
||||
comments.extend(page_comments)
|
||||
@@ -130,7 +170,7 @@ class BoardsAPI:
|
||||
|
||||
if next_link and next_link.get('href'):
|
||||
href = next_link.get('href')
|
||||
current_url = href if href.startswith('http') else self.url + href
|
||||
current_url = href if href.startswith('http') else url + href
|
||||
else:
|
||||
current_url = None
|
||||
|
||||
@@ -5,44 +5,63 @@ import time
|
||||
from dto.post import Post
|
||||
from dto.user import User
|
||||
from dto.comment import Comment
|
||||
from server.connectors.base import BaseConnector
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RedditAPI:
|
||||
class RedditAPI(BaseConnector):
|
||||
source_name: str = "reddit"
|
||||
display_name: str = "Reddit"
|
||||
search_enabled: bool = True
|
||||
categories_enabled: bool = True
|
||||
|
||||
def __init__(self):
|
||||
self.url = "https://www.reddit.com/"
|
||||
self.source_name = "Reddit"
|
||||
|
||||
# Public Methods #
|
||||
def search_new_subreddit_posts(self, search: str, subreddit: str, limit: int) -> list[Post]:
|
||||
params = {
|
||||
def get_new_posts_by_search(self,
|
||||
search: str,
|
||||
category: str,
|
||||
post_limit: int
|
||||
) -> list[Post]:
|
||||
|
||||
prefix = f"r/{category}/" if category else ""
|
||||
params = {'limit': post_limit}
|
||||
|
||||
if search:
|
||||
endpoint = f"{prefix}search.json"
|
||||
params.update({
|
||||
'q': search,
|
||||
'limit': limit,
|
||||
'restrict_sr': 'on',
|
||||
'sort': 'new'
|
||||
}
|
||||
'sort': 'new',
|
||||
'restrict_sr': 'on' if category else 'off'
|
||||
})
|
||||
else:
|
||||
endpoint = f"{prefix}new.json"
|
||||
|
||||
logger.info(f"Searching subreddit '{subreddit}' for '{search}' with limit {limit}")
|
||||
url = f"r/{subreddit}/search.json"
|
||||
posts = []
|
||||
after = None
|
||||
|
||||
while len(posts) < limit:
|
||||
batch_limit = min(100, limit - len(posts))
|
||||
while len(posts) < post_limit:
|
||||
batch_limit = min(100, post_limit - len(posts))
|
||||
params['limit'] = batch_limit
|
||||
if after:
|
||||
params['after'] = after
|
||||
|
||||
data = self._fetch_post_overviews(url, params)
|
||||
batch_posts = self._parse_posts(data)
|
||||
data = self._fetch_post_overviews(endpoint, params)
|
||||
|
||||
logger.debug(f"Fetched {len(batch_posts)} posts from search in subreddit {subreddit}")
|
||||
|
||||
if not batch_posts:
|
||||
if not data or 'data' not in data or not data['data'].get('children'):
|
||||
break
|
||||
|
||||
batch_posts = self._parse_posts(data)
|
||||
posts.extend(batch_posts)
|
||||
|
||||
return posts
|
||||
after = data['data'].get('after')
|
||||
if not after:
|
||||
break
|
||||
|
||||
def get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]:
|
||||
return posts[:post_limit]
|
||||
|
||||
def _get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]:
|
||||
posts = []
|
||||
after = None
|
||||
url = f"r/{subreddit}/new.json"
|
||||
@@ -75,6 +94,17 @@ class RedditAPI:
|
||||
data = self._fetch_post_overviews(f"user/{username}/about.json", {})
|
||||
return self._parse_user(data)
|
||||
|
||||
def category_exists(self, category: str) -> bool:
|
||||
try:
|
||||
data = self._fetch_post_overviews(f"r/{category}/about.json", {})
|
||||
return (
|
||||
data is not None
|
||||
and 'data' in data
|
||||
and data['data'].get('id') is not None
|
||||
)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
## Private Methods ##
|
||||
def _parse_posts(self, data) -> list[Post]:
|
||||
posts = []
|
||||
30
server/connectors/registry.py
Normal file
30
server/connectors/registry.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import pkgutil
|
||||
import importlib
|
||||
import server.connectors
|
||||
from server.connectors.base import BaseConnector
|
||||
|
||||
def _discover_connectors() -> list[type[BaseConnector]]:
|
||||
"""Walk the connectors package and collect all BaseConnector subclasses."""
|
||||
for _, module_name, _ in pkgutil.iter_modules(server.connectors.__path__):
|
||||
if module_name in ("base", "registry"):
|
||||
continue
|
||||
importlib.import_module(f"server.connectors.{module_name}")
|
||||
|
||||
return [
|
||||
cls for cls in BaseConnector.__subclasses__()
|
||||
if cls.source_name # guard against abstract intermediaries
|
||||
]
|
||||
|
||||
def get_available_connectors() -> dict[str, type[BaseConnector]]:
|
||||
return {c.source_name: c for c in _discover_connectors() if c.is_available()}
|
||||
|
||||
def get_connector_metadata() -> dict[str, dict]:
|
||||
res = {}
|
||||
for id, obj in get_available_connectors().items():
|
||||
res[id] = {"id": id,
|
||||
"label": obj.display_name,
|
||||
"search_enabled": obj.search_enabled,
|
||||
"categories_enabled": obj.categories_enabled
|
||||
}
|
||||
|
||||
return res
|
||||
96
server/connectors/youtube_api.py
Normal file
96
server/connectors/youtube_api.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import os
|
||||
import datetime
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from googleapiclient.discovery import build
|
||||
from googleapiclient.errors import HttpError
|
||||
from dto.post import Post
|
||||
from dto.comment import Comment
|
||||
from server.connectors.base import BaseConnector
|
||||
|
||||
load_dotenv()
|
||||
|
||||
API_KEY = os.getenv("YOUTUBE_API_KEY")
|
||||
|
||||
class YouTubeAPI(BaseConnector):
|
||||
source_name: str = "youtube"
|
||||
display_name: str = "YouTube"
|
||||
search_enabled: bool = True
|
||||
categories_enabled: bool = False
|
||||
|
||||
def __init__(self):
|
||||
self.youtube = build('youtube', 'v3', developerKey=API_KEY)
|
||||
|
||||
def get_new_posts_by_search(self,
|
||||
search: str,
|
||||
category: str,
|
||||
post_limit: int
|
||||
) -> list[Post]:
|
||||
videos = self._search_videos(search, post_limit)
|
||||
posts = []
|
||||
|
||||
for video in videos:
|
||||
video_id = video['id']['videoId']
|
||||
snippet = video['snippet']
|
||||
title = snippet['title']
|
||||
description = snippet['description']
|
||||
published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
|
||||
channel_title = snippet['channelTitle']
|
||||
|
||||
comments = []
|
||||
comments_data = self._get_video_comments(video_id)
|
||||
for comment_thread in comments_data:
|
||||
comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
|
||||
comment = Comment(
|
||||
id=comment_thread['id'],
|
||||
post_id=video_id,
|
||||
content=comment_snippet['textDisplay'],
|
||||
author=comment_snippet['authorDisplayName'],
|
||||
timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(),
|
||||
reply_to=None,
|
||||
source=self.source_name
|
||||
)
|
||||
|
||||
comments.append(comment)
|
||||
|
||||
post = Post(
|
||||
id=video_id,
|
||||
content=f"{title}\n\n{description}",
|
||||
author=channel_title,
|
||||
timestamp=published_at,
|
||||
url=f"https://www.youtube.com/watch?v={video_id}",
|
||||
title=title,
|
||||
source=self.source_name,
|
||||
comments=comments
|
||||
)
|
||||
|
||||
posts.append(post)
|
||||
|
||||
return posts
|
||||
|
||||
def category_exists(self, category):
|
||||
return True
|
||||
|
||||
def _search_videos(self, query, limit):
|
||||
request = self.youtube.search().list(
|
||||
q=query,
|
||||
part='snippet',
|
||||
type='video',
|
||||
maxResults=limit
|
||||
)
|
||||
response = request.execute()
|
||||
return response.get('items', [])
|
||||
|
||||
def _get_video_comments(self, video_id):
|
||||
request = self.youtube.commentThreads().list(
|
||||
part='snippet',
|
||||
videoId=video_id,
|
||||
textFormat='plainText'
|
||||
)
|
||||
|
||||
try:
|
||||
response = request.execute()
|
||||
except HttpError as e:
|
||||
print(f"Error fetching comments for video {video_id}: {e}")
|
||||
return []
|
||||
return response.get('items', [])
|
||||
@@ -1,6 +1,10 @@
|
||||
import re
|
||||
|
||||
from server.db.database import PostgresConnector
|
||||
from flask_bcrypt import Bcrypt
|
||||
|
||||
EMAIL_REGEX = re.compile(r"[^@]+@[^@]+\.[^@]+")
|
||||
|
||||
class AuthManager:
|
||||
def __init__(self, db: PostgresConnector, bcrypt: Bcrypt):
|
||||
self.db = db
|
||||
@@ -18,6 +22,12 @@ class AuthManager:
|
||||
def register_user(self, username, email, password):
|
||||
hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8")
|
||||
|
||||
if len(username) < 3:
|
||||
raise ValueError("Username must be longer than 3 characters")
|
||||
|
||||
if not EMAIL_REGEX.match(email):
|
||||
raise ValueError("Please enter a valid email address")
|
||||
|
||||
if self.get_user_by_email(email):
|
||||
raise ValueError("Email already registered")
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import pandas as pd
|
||||
from server.db.database import PostgresConnector
|
||||
from psycopg2.extras import Json
|
||||
from server.exceptions import NotAuthorisedException, NonExistentDatasetException
|
||||
from server.exceptions import NonExistentDatasetException
|
||||
|
||||
class DatasetManager:
|
||||
def __init__(self, db: PostgresConnector):
|
||||
@@ -114,7 +114,7 @@ class DatasetManager:
|
||||
self.db.execute_batch(query, values)
|
||||
|
||||
def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None):
|
||||
if status not in ["processing", "complete", "error"]:
|
||||
if status not in ["fetching", "processing", "complete", "error"]:
|
||||
raise ValueError("Invalid status")
|
||||
|
||||
query = """
|
||||
|
||||
@@ -23,7 +23,7 @@ CREATE TABLE datasets (
|
||||
|
||||
-- Enforce valid states
|
||||
CONSTRAINT datasets_status_check
|
||||
CHECK (status IN ('processing', 'complete', 'error'))
|
||||
CHECK (status IN ('fetching', 'processing', 'complete', 'error'))
|
||||
);
|
||||
|
||||
CREATE TABLE events (
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
import pandas as pd
|
||||
import logging
|
||||
|
||||
from server.queue.celery_app import celery
|
||||
from server.analysis.enrichment import DatasetEnrichment
|
||||
from server.db.database import PostgresConnector
|
||||
from server.core.datasets import DatasetManager
|
||||
from server.connectors.registry import get_available_connectors
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@celery.task(bind=True, max_retries=3)
|
||||
def process_dataset(self, dataset_id: int, posts: list, topics: dict):
|
||||
@@ -20,3 +24,38 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict):
|
||||
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
|
||||
except Exception as e:
|
||||
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")
|
||||
|
||||
@celery.task(bind=True, max_retries=3)
|
||||
def fetch_and_process_dataset(self,
|
||||
dataset_id: int,
|
||||
source_info: list[dict],
|
||||
topics: dict):
|
||||
connectors = get_available_connectors()
|
||||
db = PostgresConnector()
|
||||
dataset_manager = DatasetManager(db)
|
||||
posts = []
|
||||
|
||||
try:
|
||||
for metadata in source_info:
|
||||
name = metadata["name"]
|
||||
search = metadata.get("search")
|
||||
category = metadata.get("category")
|
||||
limit = metadata.get("limit", 100)
|
||||
|
||||
connector = connectors[name]()
|
||||
raw_posts = connector.get_new_posts_by_search(
|
||||
search=search,
|
||||
category=category,
|
||||
post_limit=limit
|
||||
)
|
||||
posts.extend(post.to_dict() for post in raw_posts)
|
||||
|
||||
df = pd.DataFrame(posts)
|
||||
|
||||
processor = DatasetEnrichment(df, topics)
|
||||
enriched_df = processor.enrich()
|
||||
|
||||
dataset_manager.save_dataset_content(dataset_id, enriched_df)
|
||||
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
|
||||
except Exception as e:
|
||||
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")
|
||||
67
server/topics.json
Normal file
67
server/topics.json
Normal file
@@ -0,0 +1,67 @@
|
||||
{
|
||||
"Personal Life": "daily life, life updates, what happened today, personal stories, life events, reflections",
|
||||
|
||||
"Relationships": "dating, relationships, breakups, friendships, family relationships, marriage, relationship advice",
|
||||
|
||||
"Family & Parenting": "parents, parenting, children, raising kids, family dynamics, family stories",
|
||||
|
||||
"Work & Careers": "jobs, workplaces, office life, promotions, quitting jobs, career advice, workplace drama",
|
||||
|
||||
"Education": "school, studying, exams, university, homework, academic pressure, learning experiences",
|
||||
|
||||
"Money & Finance": "saving money, debt, budgeting, cost of living, financial advice, personal finance",
|
||||
|
||||
"Health & Fitness": "exercise, gym, workouts, running, diet, fitness routines, weight loss",
|
||||
|
||||
"Mental Health": "stress, anxiety, depression, burnout, therapy, emotional wellbeing",
|
||||
|
||||
"Food & Cooking": "meals, cooking, recipes, restaurants, snacks, food opinions",
|
||||
|
||||
"Travel": "holidays, trips, tourism, travel experiences, airports, flights, travel tips",
|
||||
|
||||
"Entertainment": "movies, TV shows, streaming services, celebrities, pop culture",
|
||||
|
||||
"Music": "songs, albums, artists, concerts, music opinions",
|
||||
|
||||
"Gaming": "video games, gaming culture, consoles, PC gaming, esports",
|
||||
|
||||
"Sports": "sports matches, teams, players, competitions, sports opinions",
|
||||
|
||||
"Technology": "phones, gadgets, apps, AI, software, tech trends",
|
||||
|
||||
"Internet Culture": "memes, viral trends, online jokes, internet drama, trending topics",
|
||||
|
||||
"Social Media": "platforms, influencers, content creators, algorithms, online communities",
|
||||
|
||||
"News & Current Events": "breaking news, world events, major incidents, public discussions",
|
||||
|
||||
"Politics": "political debates, elections, government policies, ideology",
|
||||
|
||||
"Culture & Society": "social issues, cultural trends, generational debates, societal changes",
|
||||
|
||||
"Identity & Lifestyle": "personal identity, lifestyle choices, values, self-expression",
|
||||
|
||||
"Hobbies & Interests": "art, photography, crafts, collecting, hobbies",
|
||||
|
||||
"Fashion & Beauty": "clothing, style, makeup, skincare, fashion trends",
|
||||
|
||||
"Animals & Pets": "pets, animal videos, pet care, wildlife",
|
||||
|
||||
"Humour": "jokes, funny stories, sarcasm, memes",
|
||||
|
||||
"Opinions & Debates": "hot takes, controversial opinions, arguments, discussions",
|
||||
|
||||
"Advice & Tips": "life advice, tutorials, how-to tips, recommendations",
|
||||
|
||||
"Product Reviews": "reviews, recommendations, experiences with products",
|
||||
|
||||
"Complaints & Rants": "frustrations, complaining, venting about things",
|
||||
|
||||
"Motivation & Inspiration": "motivational quotes, success stories, encouragement",
|
||||
|
||||
"Questions & Curiosity": "asking questions, seeking opinions, curiosity posts",
|
||||
|
||||
"Celebrations & Achievements": "birthdays, milestones, achievements, good news",
|
||||
|
||||
"Random Thoughts": "shower thoughts, observations, random ideas"
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
import datetime
|
||||
import os
|
||||
from flask import request
|
||||
|
||||
def parse_datetime_filter(value):
|
||||
@@ -48,3 +49,9 @@ def get_request_filters() -> dict:
|
||||
filters["data_sources"] = data_sources
|
||||
|
||||
return filters
|
||||
|
||||
def get_env(name: str) -> str:
|
||||
value = os.getenv(name)
|
||||
if not value:
|
||||
raise RuntimeError(f"Missing required environment variable: {name}")
|
||||
return value
|
||||
|
||||
Reference in New Issue
Block a user