fix(api): flask delegates dataset fetch to celery
This commit is contained in:
@@ -20,7 +20,7 @@ from server.db.database import PostgresConnector
|
|||||||
from server.core.auth import AuthManager
|
from server.core.auth import AuthManager
|
||||||
from server.core.datasets import DatasetManager
|
from server.core.datasets import DatasetManager
|
||||||
from server.utils import get_request_filters, split_limit, get_env
|
from server.utils import get_request_filters, split_limit, get_env
|
||||||
from server.queue.tasks import process_dataset
|
from server.queue.tasks import process_dataset, fetch_and_process_dataset
|
||||||
from server.connectors.registry import get_available_connectors, get_connector_metadata
|
from server.connectors.registry import get_available_connectors, get_connector_metadata
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
@@ -130,8 +130,6 @@ def scrape_data():
|
|||||||
search = request.form.get("search")
|
search = request.form.get("search")
|
||||||
category = request.form.get("category")
|
category = request.form.get("category")
|
||||||
|
|
||||||
print(sources)
|
|
||||||
|
|
||||||
if limit > max_fetch_limit:
|
if limit > max_fetch_limit:
|
||||||
return jsonify({"error": f"Due to API limitations, we cannot receive more than ${max_fetch_limit} posts"}), 400
|
return jsonify({"error": f"Due to API limitations, we cannot receive more than ${max_fetch_limit} posts"}), 400
|
||||||
|
|
||||||
@@ -141,20 +139,11 @@ def scrape_data():
|
|||||||
|
|
||||||
limits = split_limit(limit, len(sources))
|
limits = split_limit(limit, len(sources))
|
||||||
per_source = dict(zip(sources, limits))
|
per_source = dict(zip(sources, limits))
|
||||||
|
dataset_id = dataset_manager.save_dataset_info(user_id, dataset_name, default_topic_list)
|
||||||
|
dataset_manager.set_dataset_status(dataset_id, "fetching", f"Data is being fetched from {str(sources)}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
posts = []
|
fetch_and_process_dataset.delay(dataset_id, per_source, search, category, default_topic_list)
|
||||||
for source_name, source_limit in per_source.items():
|
|
||||||
connector = connectors[source_name]()
|
|
||||||
posts.extend(connector.get_new_posts_by_search(
|
|
||||||
search=search,
|
|
||||||
category=category,
|
|
||||||
post_limit=source_limit,
|
|
||||||
comment_limit=source_limit
|
|
||||||
))
|
|
||||||
|
|
||||||
dataset_id = dataset_manager.save_dataset_info(user_id, dataset_name, {})
|
|
||||||
process_dataset.delay(dataset_id, [p.to_dict() for p in posts], default_topic_list)
|
|
||||||
|
|
||||||
return jsonify(
|
return jsonify(
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from server.queue.celery_app import celery
|
|||||||
from server.analysis.enrichment import DatasetEnrichment
|
from server.analysis.enrichment import DatasetEnrichment
|
||||||
from server.db.database import PostgresConnector
|
from server.db.database import PostgresConnector
|
||||||
from server.core.datasets import DatasetManager
|
from server.core.datasets import DatasetManager
|
||||||
|
from server.connectors.registry import get_available_connectors
|
||||||
|
|
||||||
@celery.task(bind=True, max_retries=3)
|
@celery.task(bind=True, max_retries=3)
|
||||||
def process_dataset(self, dataset_id: int, posts: list, topics: dict):
|
def process_dataset(self, dataset_id: int, posts: list, topics: dict):
|
||||||
@@ -18,5 +19,31 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict):
|
|||||||
|
|
||||||
dataset_manager.save_dataset_content(dataset_id, enriched_df)
|
dataset_manager.save_dataset_content(dataset_id, enriched_df)
|
||||||
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
|
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
|
||||||
|
except Exception as e:
|
||||||
|
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")
|
||||||
|
|
||||||
|
@celery.task(bind=True, max_retries=3)
|
||||||
|
def fetch_and_process_dataset(self,
|
||||||
|
dataset_id: int,
|
||||||
|
per_source: dict[str, int],
|
||||||
|
search: str,
|
||||||
|
category: str,
|
||||||
|
topics: dict):
|
||||||
|
connectors = get_available_connectors()
|
||||||
|
db = PostgresConnector()
|
||||||
|
dataset_manager = DatasetManager(db)
|
||||||
|
posts = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
for source_name, source_limit in per_source.items():
|
||||||
|
connector = connectors[source_name]()
|
||||||
|
posts.extend(connector.get_new_posts_by_search(
|
||||||
|
search=search,
|
||||||
|
category=category,
|
||||||
|
post_limit=source_limit,
|
||||||
|
comment_limit=source_limit
|
||||||
|
))
|
||||||
|
|
||||||
|
process_dataset.delay(dataset_id, [p.to_dict() for p in posts], topics)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")
|
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")
|
||||||
Reference in New Issue
Block a user