diff --git a/server/app.py b/server/app.py index d896ac2..16d6f39 100644 --- a/server/app.py +++ b/server/app.py @@ -20,7 +20,7 @@ from server.db.database import PostgresConnector from server.core.auth import AuthManager from server.core.datasets import DatasetManager from server.utils import get_request_filters, split_limit, get_env -from server.queue.tasks import process_dataset +from server.queue.tasks import process_dataset, fetch_and_process_dataset from server.connectors.registry import get_available_connectors, get_connector_metadata app = Flask(__name__) @@ -130,8 +130,6 @@ def scrape_data(): search = request.form.get("search") category = request.form.get("category") - print(sources) - if limit > max_fetch_limit: return jsonify({"error": f"Due to API limitations, we cannot receive more than ${max_fetch_limit} posts"}), 400 @@ -141,20 +139,11 @@ def scrape_data(): limits = split_limit(limit, len(sources)) per_source = dict(zip(sources, limits)) + dataset_id = dataset_manager.save_dataset_info(user_id, dataset_name, default_topic_list) + dataset_manager.set_dataset_status(dataset_id, "fetching", f"Data is being fetched from {str(sources)}") try: - posts = [] - for source_name, source_limit in per_source.items(): - connector = connectors[source_name]() - posts.extend(connector.get_new_posts_by_search( - search=search, - category=category, - post_limit=source_limit, - comment_limit=source_limit - )) - - dataset_id = dataset_manager.save_dataset_info(user_id, dataset_name, {}) - process_dataset.delay(dataset_id, [p.to_dict() for p in posts], default_topic_list) + fetch_and_process_dataset.delay(dataset_id, per_source, search, category, default_topic_list) return jsonify( { diff --git a/server/queue/tasks.py b/server/queue/tasks.py index a089596..8a71680 100644 --- a/server/queue/tasks.py +++ b/server/queue/tasks.py @@ -4,6 +4,7 @@ from server.queue.celery_app import celery from server.analysis.enrichment import DatasetEnrichment from server.db.database import PostgresConnector from server.core.datasets import DatasetManager +from server.connectors.registry import get_available_connectors @celery.task(bind=True, max_retries=3) def process_dataset(self, dataset_id: int, posts: list, topics: dict): @@ -18,5 +19,31 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict): dataset_manager.save_dataset_content(dataset_id, enriched_df) dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully") + except Exception as e: + dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}") + +@celery.task(bind=True, max_retries=3) +def fetch_and_process_dataset(self, + dataset_id: int, + per_source: dict[str, int], + search: str, + category: str, + topics: dict): + connectors = get_available_connectors() + db = PostgresConnector() + dataset_manager = DatasetManager(db) + posts = [] + + try: + for source_name, source_limit in per_source.items(): + connector = connectors[source_name]() + posts.extend(connector.get_new_posts_by_search( + search=search, + category=category, + post_limit=source_limit, + comment_limit=source_limit + )) + + process_dataset.delay(dataset_id, [p.to_dict() for p in posts], topics) except Exception as e: dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}") \ No newline at end of file