Implement job queue for asynchronous NLP #6

Merged
dylan merged 14 commits from feat/implement-job-queue into main 2026-03-03 14:26:38 +00:00
3 changed files with 45 additions and 7 deletions
Showing only changes of commit 3a58705635 - Show all commits

View File

@@ -21,6 +21,7 @@ from server.db.database import PostgresConnector
from server.core.auth import AuthManager
from server.core.datasets import DatasetManager
from server.utils import get_request_filters
from server.queue.tasks import process_dataset
app = Flask(__name__)
@@ -129,19 +130,21 @@ def upload_data():
posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
topics = json.load(topic_file)
processor = DatasetEnrichment(posts_df, topics)
enriched_df = processor.enrich()
dataset_id = dataset_manager.save_dataset_info(current_user, f"dataset_{current_user}", topics)
dataset_manager.save_dataset_content(dataset_id, enriched_df)
process_dataset.delay(
dataset_id,
posts_df.to_dict(orient="records"),
topics
)
return jsonify(
{
"message": "File uploaded successfully",
"event_count": len(enriched_df),
"message": "Dataset queued for processing",
"dataset_id": dataset_id,
"status": "processing"
}
), 200
), 202
except ValueError as e:
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
except Exception as e:

View File

@@ -0,0 +1,16 @@
from celery import Celery
def create_celery():
celery = Celery(
"ethnograph",
broker="redis://redis:6379/0",
backend="redis://redis:6379/0",
)
celery.conf.task_serializer = "json"
celery.conf.result_serializer = "json"
celery.conf.accept_content = ["json"]
return celery
celery = create_celery()
from server.queue import tasks

19
server/queue/tasks.py Normal file
View File

@@ -0,0 +1,19 @@
import pandas as pd
from server.queue.celery_app import celery
from server.analysis.enrichment import DatasetEnrichment
@celery.task(bind=True, max_retries=3)
def process_dataset(self, dataset_id: int, posts: list, topics: dict):
from server.db.database import PostgresConnector
from server.core.datasets import DatasetManager
db = PostgresConnector()
dataset_manager = DatasetManager(db)
df = pd.DataFrame(posts)
processor = DatasetEnrichment(df, topics)
enriched_df = processor.enrich()
dataset_manager.save_dataset_content(dataset_id, enriched_df)