Automatic Scraping of dataset options #9

Merged
dylan merged 36 commits from feat/automatic-scraping-datasets into main 2026-03-14 21:58:49 +00:00
2 changed files with 33 additions and 26 deletions
Showing only changes of commit 2ab74d922a - Show all commits

View File

@@ -124,31 +124,33 @@ def get_dataset_sources():
@app.route("/datasets/scrape", methods=["POST"]) @app.route("/datasets/scrape", methods=["POST"])
@jwt_required() @jwt_required()
def scrape_data(): def scrape_data():
if "sources" not in request.form: data = request.get_json()
return jsonify({"error": "Data source names are required."}), 400
if not data or "sources" not in data:
return jsonify({"error": "Sources must be provided"}), 400
user_id = int(get_jwt_identity()) user_id = int(get_jwt_identity())
sources = request.form.getlist("sources") dataset_name = data["name"].strip()
limit = int(request.form.get("limit", max_fetch_limit)) source_configs = data["sources"]
dataset_name = request.form.get("name", "").strip() if not isinstance(source_configs, list) or len(source_configs) == 0:
search = request.form.get("search") return jsonify({"error": "Sources must be a non-empty list"}), 400
category = request.form.get("category")
if limit > max_fetch_limit: # Light Validation
return jsonify({"error": f"Due to API limitations, we cannot receive more than ${max_fetch_limit} posts"}), 400 for source in source_configs:
if "name" not in source:
for source in sources: return jsonify({"error": "Each source must contain a name"}), 400
if source not in connectors.keys(): if "limit" in source:
return jsonify({"error": "Source must exist"}), 400 source["limit"] = int(source["limit"])
limits = split_limit(limit, len(sources))
per_source = dict(zip(sources, limits))
dataset_id = dataset_manager.save_dataset_info(user_id, dataset_name, default_topic_list) dataset_id = dataset_manager.save_dataset_info(user_id, dataset_name, default_topic_list)
dataset_manager.set_dataset_status(dataset_id, "fetching", f"Data is being fetched from {str(sources)}") dataset_manager.set_dataset_status(dataset_id,
"fetching",
f"Data is being fetched from {str(source["name"] + "," for source in source_configs)}"
)
try: try:
fetch_and_process_dataset.delay(dataset_id, per_source, search, category, default_topic_list) fetch_and_process_dataset.delay(dataset_id, source_configs, default_topic_list)
return jsonify( return jsonify(
{ {

View File

@@ -1,5 +1,5 @@
import pandas as pd import pandas as pd
import json import logging
from server.queue.celery_app import celery from server.queue.celery_app import celery
from server.analysis.enrichment import DatasetEnrichment from server.analysis.enrichment import DatasetEnrichment
@@ -7,6 +7,8 @@ from server.db.database import PostgresConnector
from server.core.datasets import DatasetManager from server.core.datasets import DatasetManager
from server.connectors.registry import get_available_connectors from server.connectors.registry import get_available_connectors
logger = logging.getLogger(__name__)
@celery.task(bind=True, max_retries=3) @celery.task(bind=True, max_retries=3)
def process_dataset(self, dataset_id: int, posts: list, topics: dict): def process_dataset(self, dataset_id: int, posts: list, topics: dict):
db = PostgresConnector() db = PostgresConnector()
@@ -26,9 +28,7 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict):
@celery.task(bind=True, max_retries=3) @celery.task(bind=True, max_retries=3)
def fetch_and_process_dataset(self, def fetch_and_process_dataset(self,
dataset_id: int, dataset_id: int,
per_source: dict[str, int], source_info: list[dict],
search: str,
category: str,
topics: dict): topics: dict):
connectors = get_available_connectors() connectors = get_available_connectors()
db = PostgresConnector() db = PostgresConnector()
@@ -36,13 +36,18 @@ def fetch_and_process_dataset(self,
posts = [] posts = []
try: try:
for source_name, source_limit in per_source.items(): for metadata in source_info:
connector = connectors[source_name]() name = metadata["name"]
search = metadata.get("search")
category = metadata.get("category")
limit = metadata.get("limit", 100)
connector = connectors[name]()
raw_posts = connector.get_new_posts_by_search( raw_posts = connector.get_new_posts_by_search(
search=search, search=search,
category=category, category=category,
post_limit=source_limit, post_limit=limit,
comment_limit=source_limit comment_limit=limit
) )
posts.extend(post.to_dict() for post in raw_posts) posts.extend(post.to_dict() for post in raw_posts)