From 3a5870563550f618fffbb04f28475c6478d57087 Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 12:27:14 +0000 Subject: [PATCH] feat: add celery & redis for background data processing --- server/app.py | 17 ++++++++++------- server/queue/celery_app.py | 16 ++++++++++++++++ server/queue/tasks.py | 19 +++++++++++++++++++ 3 files changed, 45 insertions(+), 7 deletions(-) create mode 100644 server/queue/celery_app.py create mode 100644 server/queue/tasks.py diff --git a/server/app.py b/server/app.py index be48607..d2cdce1 100644 --- a/server/app.py +++ b/server/app.py @@ -21,6 +21,7 @@ from server.db.database import PostgresConnector from server.core.auth import AuthManager from server.core.datasets import DatasetManager from server.utils import get_request_filters +from server.queue.tasks import process_dataset app = Flask(__name__) @@ -129,19 +130,21 @@ def upload_data(): posts_df = pd.read_json(post_file, lines=True, convert_dates=False) topics = json.load(topic_file) - - processor = DatasetEnrichment(posts_df, topics) - enriched_df = processor.enrich() dataset_id = dataset_manager.save_dataset_info(current_user, f"dataset_{current_user}", topics) - dataset_manager.save_dataset_content(dataset_id, enriched_df) + + process_dataset.delay( + dataset_id, + posts_df.to_dict(orient="records"), + topics + ) return jsonify( { - "message": "File uploaded successfully", - "event_count": len(enriched_df), + "message": "Dataset queued for processing", "dataset_id": dataset_id, + "status": "processing" } - ), 200 + ), 202 except ValueError as e: return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400 except Exception as e: diff --git a/server/queue/celery_app.py b/server/queue/celery_app.py new file mode 100644 index 0000000..0bcd58a --- /dev/null +++ b/server/queue/celery_app.py @@ -0,0 +1,16 @@ +from celery import Celery + +def create_celery(): + celery = Celery( + "ethnograph", + broker="redis://redis:6379/0", + backend="redis://redis:6379/0", + ) + celery.conf.task_serializer = "json" + celery.conf.result_serializer = "json" + celery.conf.accept_content = ["json"] + return celery + +celery = create_celery() + +from server.queue import tasks \ No newline at end of file diff --git a/server/queue/tasks.py b/server/queue/tasks.py new file mode 100644 index 0000000..6076581 --- /dev/null +++ b/server/queue/tasks.py @@ -0,0 +1,19 @@ +import pandas as pd + +from server.queue.celery_app import celery +from server.analysis.enrichment import DatasetEnrichment + +@celery.task(bind=True, max_retries=3) +def process_dataset(self, dataset_id: int, posts: list, topics: dict): + from server.db.database import PostgresConnector + from server.core.datasets import DatasetManager + + db = PostgresConnector() + dataset_manager = DatasetManager(db) + + df = pd.DataFrame(posts) + + processor = DatasetEnrichment(df, topics) + enriched_df = processor.enrich() + + dataset_manager.save_dataset_content(dataset_id, enriched_df) \ No newline at end of file