Implement job queue for asynchronous NLP #6

Merged
dylan merged 14 commits from feat/implement-job-queue into main 2026-03-03 14:26:38 +00:00
3 changed files with 107 additions and 28 deletions
Showing only changes of commit eb4187c559 - Show all commits

View File

@@ -126,7 +126,7 @@ def upload_data():
), 400 ), 400
try: try:
current_user = get_jwt_identity() current_user = int(get_jwt_identity())
posts_df = pd.read_json(post_file, lines=True, convert_dates=False) posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
topics = json.load(topic_file) topics = json.load(topic_file)
@@ -150,13 +150,16 @@ def upload_data():
except Exception as e: except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
@app.route("/dataset/<int:dataset_id>", methods=["GET"]) @app.route("/dataset/<int:dataset_id>", methods=["GET"])
@jwt_required() @jwt_required()
def get_dataset(dataset_id): def get_dataset(dataset_id):
try: try:
user_id = get_jwt_identity() user_id = int(get_jwt_identity())
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters() filters = get_request_filters()
filtered_dataset = stat_gen.filter_dataset(dataset_content, filters) filtered_dataset = stat_gen.filter_dataset(dataset_content, filters)
return jsonify(filtered_dataset), 200 return jsonify(filtered_dataset), 200
@@ -168,13 +171,34 @@ def get_dataset(dataset_id):
print(traceback.format_exc()) print(traceback.format_exc())
return jsonify({"error": "An unexpected error occured"}), 500 return jsonify({"error": "An unexpected error occured"}), 500
@app.route("/dataset/<int:dataset_id>/status", methods=["GET"])
@jwt_required()
def get_dataset_status(dataset_id):
try:
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_status = dataset_manager.get_dataset_status(dataset_id)
return jsonify(dataset_status), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except Exception:
print(traceback.format_exc())
return jsonify({"error": "An unexpected error occured"}), 500
@app.route("/dataset/<int:dataset_id>/content", methods=["GET"]) @app.route("/dataset/<int:dataset_id>/content", methods=["GET"])
@jwt_required() @jwt_required()
def content_endpoint(dataset_id): def content_endpoint(dataset_id):
try: try:
user_id = get_jwt_identity() user_id = int(get_jwt_identity())
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -190,8 +214,11 @@ def content_endpoint(dataset_id):
@jwt_required() @jwt_required()
def get_summary(dataset_id): def get_summary(dataset_id):
try: try:
user_id = get_jwt_identity() user_id = int(get_jwt_identity())
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.summary(dataset_content, filters)), 200 return jsonify(stat_gen.summary(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -207,8 +234,11 @@ def get_summary(dataset_id):
@jwt_required() @jwt_required()
def get_time_analysis(dataset_id): def get_time_analysis(dataset_id):
try: try:
user_id = get_jwt_identity() user_id = int(get_jwt_identity())
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -224,8 +254,11 @@ def get_time_analysis(dataset_id):
@jwt_required() @jwt_required()
def get_user_analysis(dataset_id): def get_user_analysis(dataset_id):
try: try:
user_id = get_jwt_identity() user_id = int(get_jwt_identity())
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -241,8 +274,11 @@ def get_user_analysis(dataset_id):
@jwt_required() @jwt_required()
def get_cultural_analysis(dataset_id): def get_cultural_analysis(dataset_id):
try: try:
user_id = get_jwt_identity() user_id = int(get_jwt_identity())
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -258,8 +294,11 @@ def get_cultural_analysis(dataset_id):
@jwt_required() @jwt_required()
def get_interaction_analysis(dataset_id): def get_interaction_analysis(dataset_id):
try: try:
user_id = get_jwt_identity() user_id = int(get_jwt_identity())
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:

View File

@@ -7,13 +7,16 @@ class DatasetManager:
def __init__(self, db: PostgresConnector): def __init__(self, db: PostgresConnector):
self.db = db self.db = db
def get_dataset_and_validate(self, dataset_id: int, user_id: int) -> pd.DataFrame: def authorize_user_dataset(self, dataset_id: int, user_id: int) -> bool:
dataset_info = self.get_dataset_info(dataset_id) dataset_info = self.get_dataset_info(dataset_id)
if dataset_info.get("user_id") != user_id: if dataset_info.get("user_id", None) == None:
raise NotAuthorisedException("This user is not authorised to access this dataset") return False
return self.get_dataset_content(dataset_id) if dataset_info.get("user_id") != user_id:
return False
return True
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame: def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s" query = "SELECT * FROM events WHERE dataset_id = %s"
@@ -103,3 +106,35 @@ class DatasetManager:
] ]
self.db.execute_batch(query, values) self.db.execute_batch(query, values)
def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None):
if status not in ["processing", "complete", "error"]:
raise ValueError("Invalid status")
query = """
UPDATE datasets
SET status = %s,
status_message = %s,
completed_at = CASE
WHEN %s = 'complete' THEN NOW()
ELSE NULL
END
WHERE id = %s
"""
self.db.execute(query, (status, status_message, status, dataset_id))
def get_dataset_status(self, dataset_id: int):
query = """
SELECT status, status_message, completed_at
FROM datasets
WHERE id = %s
"""
result = self.db.execute(query, (dataset_id, ), fetch=True)
if not result:
print(result)
raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist")
return result[0]

View File

@@ -5,6 +5,8 @@ from server.analysis.enrichment import DatasetEnrichment
@celery.task(bind=True, max_retries=3) @celery.task(bind=True, max_retries=3)
def process_dataset(self, dataset_id: int, posts: list, topics: dict): def process_dataset(self, dataset_id: int, posts: list, topics: dict):
try:
from server.db.database import PostgresConnector from server.db.database import PostgresConnector
from server.core.datasets import DatasetManager from server.core.datasets import DatasetManager
@@ -17,3 +19,6 @@ def process_dataset(self, dataset_id: int, posts: list, topics: dict):
enriched_df = processor.enrich() enriched_df = processor.enrich()
dataset_manager.save_dataset_content(dataset_id, enriched_df) dataset_manager.save_dataset_content(dataset_id, enriched_df)
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
except Exception as e:
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")