From c767f59b2674abf5d591d86fcbfabb9c52888e0e Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 11:27:01 +0000 Subject: [PATCH 01/14] feat: add redis to docker compose --- docker-compose.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 3522b91..0462db1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,5 +11,11 @@ services: - ./server/db/postgres_vol:/var/lib/postgresql/data - ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql + redis: + image: redis:7 + container_name: redis + ports: + - "6379:6379" + volumes: postgres_data: \ No newline at end of file From 14b472ea60a2207e313cc4858c580458acf00cbc Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 12:08:12 +0000 Subject: [PATCH 02/14] build: add dockerfile for constructing backend --- Dockerfile | 19 +++++++++++++++++++ requirements.txt | 6 +++++- 2 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..10a910d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +# Use slim to reduce size +FROM python:3.13-slim + +# Prevent Python from buffering stdout +ENV PYTHONUNBUFFERED=1 + +# System deps required for psycopg2 + torch +RUN apt-get update && apt-get install -y \ + build-essential \ + libpq-dev \ + gcc \ + curl \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY . . +CMD ["python", "main.py"] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f4ae5f2..b11b94e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,17 @@ beautifulsoup4==4.14.3 +celery==5.6.2 +redis==7.2.1 Flask==3.1.3 +Flask_Bcrypt==1.0.1 flask_cors==6.0.2 +Flask_JWT_Extended==4.7.1 google_api_python_client==2.188.0 nltk==3.9.2 numpy==2.4.2 pandas==3.0.1 psycopg2==2.9.11 psycopg2_binary==2.9.11 -python-dotenv==1.2.1 +python-dotenv==1.2.2 Requests==2.32.5 sentence_transformers==5.2.2 torch==2.10.0 From 2e0e842525f44e17ff500d03f43cb41d9e96920e Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 12:09:50 +0000 Subject: [PATCH 03/14] build: update reqs and docker compose --- docker-compose.yml | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 0462db1..e9d7fdc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,8 +14,31 @@ services: redis: image: redis:7 container_name: redis + restart: unless-stopped ports: - "6379:6379" -volumes: - postgres_data: \ No newline at end of file + backend: + build: . + container_name: flask_backend + env_file: + - .env + ports: + - "5000:5000" + command: flask --app server.app run --host=0.0.0.0 + depends_on: + - postgres + - redis + + worker: + build: . + container_name: celery_worker + env_file: + - .env + command: > + celery -A server.queue.celery_app.celery worker + --loglevel=info + --pool=solo + depends_on: + - postgres + - redis \ No newline at end of file From 3a5870563550f618fffbb04f28475c6478d57087 Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 12:27:14 +0000 Subject: [PATCH 04/14] feat: add celery & redis for background data processing --- server/app.py | 17 ++++++++++------- server/queue/celery_app.py | 16 ++++++++++++++++ server/queue/tasks.py | 19 +++++++++++++++++++ 3 files changed, 45 insertions(+), 7 deletions(-) create mode 100644 server/queue/celery_app.py create mode 100644 server/queue/tasks.py diff --git a/server/app.py b/server/app.py index be48607..d2cdce1 100644 --- a/server/app.py +++ b/server/app.py @@ -21,6 +21,7 @@ from server.db.database import PostgresConnector from server.core.auth import AuthManager from server.core.datasets import DatasetManager from server.utils import get_request_filters +from server.queue.tasks import process_dataset app = Flask(__name__) @@ -129,19 +130,21 @@ def upload_data(): posts_df = pd.read_json(post_file, lines=True, convert_dates=False) topics = json.load(topic_file) - - processor = DatasetEnrichment(posts_df, topics) - enriched_df = processor.enrich() dataset_id = dataset_manager.save_dataset_info(current_user, f"dataset_{current_user}", topics) - dataset_manager.save_dataset_content(dataset_id, enriched_df) + + process_dataset.delay( + dataset_id, + posts_df.to_dict(orient="records"), + topics + ) return jsonify( { - "message": "File uploaded successfully", - "event_count": len(enriched_df), + "message": "Dataset queued for processing", "dataset_id": dataset_id, + "status": "processing" } - ), 200 + ), 202 except ValueError as e: return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400 except Exception as e: diff --git a/server/queue/celery_app.py b/server/queue/celery_app.py new file mode 100644 index 0000000..0bcd58a --- /dev/null +++ b/server/queue/celery_app.py @@ -0,0 +1,16 @@ +from celery import Celery + +def create_celery(): + celery = Celery( + "ethnograph", + broker="redis://redis:6379/0", + backend="redis://redis:6379/0", + ) + celery.conf.task_serializer = "json" + celery.conf.result_serializer = "json" + celery.conf.accept_content = ["json"] + return celery + +celery = create_celery() + +from server.queue import tasks \ No newline at end of file diff --git a/server/queue/tasks.py b/server/queue/tasks.py new file mode 100644 index 0000000..6076581 --- /dev/null +++ b/server/queue/tasks.py @@ -0,0 +1,19 @@ +import pandas as pd + +from server.queue.celery_app import celery +from server.analysis.enrichment import DatasetEnrichment + +@celery.task(bind=True, max_retries=3) +def process_dataset(self, dataset_id: int, posts: list, topics: dict): + from server.db.database import PostgresConnector + from server.core.datasets import DatasetManager + + db = PostgresConnector() + dataset_manager = DatasetManager(db) + + df = pd.DataFrame(posts) + + processor = DatasetEnrichment(df, topics) + enriched_df = processor.enrich() + + dataset_manager.save_dataset_content(dataset_id, enriched_df) \ No newline at end of file From f4894759d741365897f490d9c0b5f7a33f48e687 Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 12:34:51 +0000 Subject: [PATCH 05/14] feat: add docker-compose dev --- docker-compose.dev.yml | 48 ++++++++++++++++++++++++++++++++++++++++++ docker-compose.yml | 8 +++---- 2 files changed, 52 insertions(+), 4 deletions(-) create mode 100644 docker-compose.dev.yml diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml new file mode 100644 index 0000000..17e4d1c --- /dev/null +++ b/docker-compose.dev.yml @@ -0,0 +1,48 @@ +services: + postgres: + image: postgres:16 + container_name: crosspost_db + restart: unless-stopped + env_file: + - .env + ports: + - "5432:5432" + volumes: + - ./server/db/postgres_vol:/var/lib/postgresql/data + - ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql + + redis: + image: redis:7 + container_name: crosspost_redis + restart: unless-stopped + ports: + - "6379:6379" + + backend: + build: . + container_name: crosspost_flask + volumes: + - .:/app + env_file: + - .env + ports: + - "5000:5000" + command: flask --app server.app run --host=0.0.0.0 --debug + depends_on: + - postgres + - redis + + worker: + build: . + volumes: + - .:/app + container_name: crosspost_worker + env_file: + - .env + command: > + celery -A server.queue.celery_app.celery worker + --loglevel=info + --pool=solo + depends_on: + - postgres + - redis \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index e9d7fdc..baf8048 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ services: postgres: image: postgres:16 - container_name: postgres_db + container_name: crosspost_db restart: unless-stopped env_file: - .env @@ -13,14 +13,14 @@ services: redis: image: redis:7 - container_name: redis + container_name: crosspost_redis restart: unless-stopped ports: - "6379:6379" backend: build: . - container_name: flask_backend + container_name: crosspost_flask env_file: - .env ports: @@ -32,7 +32,7 @@ services: worker: build: . - container_name: celery_worker + container_name: crosspost_worker env_file: - .env command: > From 3772f83d116de210317dbbe9ac8adf4f5e1ed146 Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 12:41:02 +0000 Subject: [PATCH 06/14] fix: add title column to db This was accidentally removed in a previous merge --- server/core/datasets.py | 5 ++++- server/db/schema.sql | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/server/core/datasets.py b/server/core/datasets.py index 0d5dba5..f7081ec 100644 --- a/server/core/datasets.py +++ b/server/core/datasets.py @@ -49,6 +49,7 @@ class DatasetManager: type, parent_id, author, + title, content, timestamp, date, @@ -70,7 +71,8 @@ class DatasetManager: %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s + %s, %s, %s, %s, %s, + %s ) """ @@ -80,6 +82,7 @@ class DatasetManager: row["type"], row["parent_id"], row["author"], + row.get("title"), row["content"], row["timestamp"], row["date"], diff --git a/server/db/schema.sql b/server/db/schema.sql index 693f821..5a9eaee 100644 --- a/server/db/schema.sql +++ b/server/db/schema.sql @@ -30,7 +30,10 @@ CREATE TABLE events ( hour INTEGER NOT NULL, weekday VARCHAR(255) NOT NULL, - /* Comments and Replies */ + /* Posts Only */ + title VARCHAR(255), + + /* Comments Only*/ parent_id VARCHAR(255), reply_to VARCHAR(255), source VARCHAR(255) NOT NULL, From 6d6082080016638f8e23b192fbe617adc535ce9f Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 13:00:19 +0000 Subject: [PATCH 07/14] build: add persistent model caching --- docker-compose.dev.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 17e4d1c..07a7085 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -23,6 +23,7 @@ services: container_name: crosspost_flask volumes: - .:/app + - model_cache:/models env_file: - .env ports: @@ -36,6 +37,7 @@ services: build: . volumes: - .:/app + - model_cache:/models container_name: crosspost_worker env_file: - .env @@ -45,4 +47,7 @@ services: --pool=solo depends_on: - postgres - - redis \ No newline at end of file + - redis + +volumes: + model_cache: \ No newline at end of file From a4c527ce5bf0b601aae317e567de0d0279d79e2d Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 13:10:50 +0000 Subject: [PATCH 08/14] fix(db): execute not committing if fetch flag was set --- server/db/database.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/server/db/database.py b/server/db/database.py index efbbfcd..346ef8b 100644 --- a/server/db/database.py +++ b/server/db/database.py @@ -27,19 +27,21 @@ class PostgresConnector: self.connection.autocommit = False def execute(self, query, params=None, fetch=False) -> list: - with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: - cursor.execute(query, params) - if fetch: - return cursor.fetchall() + try: + with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: + cursor.execute(query, params) + result = cursor.fetchall() if fetch else None self.connection.commit() + return result + except Exception: + self.connection.rollback() + raise def execute_batch(self, query, values): with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: execute_batch(cursor, query, values) self.connection.commit() - - ## User Management Methods def close(self): if self.connection: self.connection.close() \ No newline at end of file From 075e1fba8565da854d4deb9a7d7db80a0f4a9739 Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 13:12:28 +0000 Subject: [PATCH 09/14] fix: typo in exception naming --- server/app.py | 4 ++-- server/exceptions.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/app.py b/server/app.py index d2cdce1..900469f 100644 --- a/server/app.py +++ b/server/app.py @@ -16,7 +16,7 @@ from flask_jwt_extended import ( from server.analysis.stat_gen import StatGen from server.analysis.enrichment import DatasetEnrichment -from server.exceptions import NotAuthorisedException, NotExistentDatasetException +from server.exceptions import NotAuthorisedException, NonExistentDatasetException from server.db.database import PostgresConnector from server.core.auth import AuthManager from server.core.datasets import DatasetManager @@ -162,7 +162,7 @@ def get_dataset(dataset_id): return jsonify(filtered_dataset), 200 except NotAuthorisedException: return jsonify({"error": "User is not authorised to access this content"}), 403 - except NotExistentDatasetException: + except NonExistentDatasetException: return jsonify({"error": "Dataset does not exist"}), 404 except Exception: print(traceback.format_exc()) diff --git a/server/exceptions.py b/server/exceptions.py index e63be49..10a9f9f 100644 --- a/server/exceptions.py +++ b/server/exceptions.py @@ -1,7 +1,7 @@ class NotAuthorisedException(Exception): pass -class NotExistentDatasetException(Exception): +class NonExistentDatasetException(Exception): pass class DatabaseNotConfiguredException(Exception): From f93e45b82712ab9abb5096854f6c3ded89830c3c Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 13:13:40 +0000 Subject: [PATCH 10/14] fix(dataset): silent erros if dataset did not exist --- server/analysis/enrichment.py | 2 +- server/core/datasets.py | 27 +++++++++++++-------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/server/analysis/enrichment.py b/server/analysis/enrichment.py index 065caea..cc9694c 100644 --- a/server/analysis/enrichment.py +++ b/server/analysis/enrichment.py @@ -3,7 +3,7 @@ import pandas as pd from server.analysis.nlp import NLP class DatasetEnrichment: - def __init__(self, df, topics): + def __init__(self, df: pd.DataFrame, topics: dict): self.df = self._explode_comments(df) self.topics = topics self.nlp = NLP(self.df, "title", "content", self.topics) diff --git a/server/core/datasets.py b/server/core/datasets.py index f7081ec..a1835e6 100644 --- a/server/core/datasets.py +++ b/server/core/datasets.py @@ -1,7 +1,7 @@ import pandas as pd from server.db.database import PostgresConnector from psycopg2.extras import Json -from server.exceptions import NotAuthorisedException +from server.exceptions import NotAuthorisedException, NonExistentDatasetException class DatasetManager: def __init__(self, db: PostgresConnector): @@ -23,21 +23,20 @@ class DatasetManager: def get_dataset_info(self, dataset_id: int) -> dict: query = "SELECT * FROM datasets WHERE id = %s" result = self.db.execute(query, (dataset_id,), fetch=True) - return result[0] if result else None + + if not result: + raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist") + + return result[0] def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int: - query = """ - INSERT INTO datasets (user_id, name, topics) - VALUES (%s, %s, %s) - RETURNING id - """ - result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True) - return result[0]["id"] if result else None - - def get_dataset_content(self, dataset_id: int) -> pd.DataFrame: - query = "SELECT * FROM events WHERE dataset_id = %s" - result = self.db.execute(query, (dataset_id,), fetch=True) - return pd.DataFrame(result) + query = """ + INSERT INTO datasets (user_id, name, topics) + VALUES (%s, %s, %s) + RETURNING id + """ + result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True) + return result[0]["id"] if result else None def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame): if event_data.empty: From 63cd4651890f3c2bcc9533fb53de61c81beb03b3 Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 13:46:06 +0000 Subject: [PATCH 11/14] feat(db): add status and constraints to the schema --- server/db/schema.sql | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/server/db/schema.sql b/server/db/schema.sql index 5a9eaee..5379a95 100644 --- a/server/db/schema.sql +++ b/server/db/schema.sql @@ -11,9 +11,19 @@ CREATE TABLE datasets ( user_id INTEGER NOT NULL, name VARCHAR(255) NOT NULL, description TEXT, + + -- Job state machine + status TEXT NOT NULL DEFAULT 'processing', + status_message TEXT, + completed_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, topics JSONB, - FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + + -- Enforce valid states + CONSTRAINT datasets_status_check + CHECK (status IN ('processing', 'complete', 'error')) ); CREATE TABLE events ( From eb4187c559aec560f6d70834dbcb82ad4a12dbb8 Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 13:46:37 +0000 Subject: [PATCH 12/14] feat(api): add status returns for NLP processing --- server/app.py | 71 +++++++++++++++++++++++++++++++---------- server/core/datasets.py | 43 ++++++++++++++++++++++--- server/queue/tasks.py | 21 +++++++----- 3 files changed, 107 insertions(+), 28 deletions(-) diff --git a/server/app.py b/server/app.py index 900469f..a92f4a7 100644 --- a/server/app.py +++ b/server/app.py @@ -126,7 +126,7 @@ def upload_data(): ), 400 try: - current_user = get_jwt_identity() + current_user = int(get_jwt_identity()) posts_df = pd.read_json(post_file, lines=True, convert_dates=False) topics = json.load(topic_file) @@ -150,13 +150,16 @@ def upload_data(): except Exception as e: return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 - @app.route("/dataset/", methods=["GET"]) @jwt_required() def get_dataset(dataset_id): try: - user_id = get_jwt_identity() - dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) + user_id = int(get_jwt_identity()) + + if not dataset_manager.authorize_user_dataset(dataset_id, user_id): + raise NotAuthorisedException("This user is not authorised to access this dataset") + + dataset_content = dataset_manager.get_dataset_content(dataset_id) filters = get_request_filters() filtered_dataset = stat_gen.filter_dataset(dataset_content, filters) return jsonify(filtered_dataset), 200 @@ -168,13 +171,34 @@ def get_dataset(dataset_id): print(traceback.format_exc()) return jsonify({"error": "An unexpected error occured"}), 500 +@app.route("/dataset//status", methods=["GET"]) +@jwt_required() +def get_dataset_status(dataset_id): + try: + user_id = int(get_jwt_identity()) + + if not dataset_manager.authorize_user_dataset(dataset_id, user_id): + raise NotAuthorisedException("This user is not authorised to access this dataset") + + dataset_status = dataset_manager.get_dataset_status(dataset_id) + return jsonify(dataset_status), 200 + except NotAuthorisedException: + return jsonify({"error": "User is not authorised to access this content"}), 403 + except NonExistentDatasetException: + return jsonify({"error": "Dataset does not exist"}), 404 + except Exception: + print(traceback.format_exc()) + return jsonify({"error": "An unexpected error occured"}), 500 @app.route("/dataset//content", methods=["GET"]) @jwt_required() def content_endpoint(dataset_id): try: - user_id = get_jwt_identity() - dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) + user_id = int(get_jwt_identity()) + if not dataset_manager.authorize_user_dataset(dataset_id, user_id): + raise NotAuthorisedException("This user is not authorised to access this dataset") + + dataset_content = dataset_manager.get_dataset_content(dataset_id) filters = get_request_filters() return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200 except NotAuthorisedException: @@ -190,8 +214,11 @@ def content_endpoint(dataset_id): @jwt_required() def get_summary(dataset_id): try: - user_id = get_jwt_identity() - dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) + user_id = int(get_jwt_identity()) + if not dataset_manager.authorize_user_dataset(dataset_id, user_id): + raise NotAuthorisedException("This user is not authorised to access this dataset") + + dataset_content = dataset_manager.get_dataset_content(dataset_id) filters = get_request_filters() return jsonify(stat_gen.summary(dataset_content, filters)), 200 except NotAuthorisedException: @@ -207,8 +234,11 @@ def get_summary(dataset_id): @jwt_required() def get_time_analysis(dataset_id): try: - user_id = get_jwt_identity() - dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) + user_id = int(get_jwt_identity()) + if not dataset_manager.authorize_user_dataset(dataset_id, user_id): + raise NotAuthorisedException("This user is not authorised to access this dataset") + + dataset_content = dataset_manager.get_dataset_content(dataset_id) filters = get_request_filters() return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200 except NotAuthorisedException: @@ -224,8 +254,11 @@ def get_time_analysis(dataset_id): @jwt_required() def get_user_analysis(dataset_id): try: - user_id = get_jwt_identity() - dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) + user_id = int(get_jwt_identity()) + if not dataset_manager.authorize_user_dataset(dataset_id, user_id): + raise NotAuthorisedException("This user is not authorised to access this dataset") + + dataset_content = dataset_manager.get_dataset_content(dataset_id) filters = get_request_filters() return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200 except NotAuthorisedException: @@ -241,8 +274,11 @@ def get_user_analysis(dataset_id): @jwt_required() def get_cultural_analysis(dataset_id): try: - user_id = get_jwt_identity() - dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) + user_id = int(get_jwt_identity()) + if not dataset_manager.authorize_user_dataset(dataset_id, user_id): + raise NotAuthorisedException("This user is not authorised to access this dataset") + + dataset_content = dataset_manager.get_dataset_content(dataset_id) filters = get_request_filters() return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200 except NotAuthorisedException: @@ -258,8 +294,11 @@ def get_cultural_analysis(dataset_id): @jwt_required() def get_interaction_analysis(dataset_id): try: - user_id = get_jwt_identity() - dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) + user_id = int(get_jwt_identity()) + if not dataset_manager.authorize_user_dataset(dataset_id, user_id): + raise NotAuthorisedException("This user is not authorised to access this dataset") + + dataset_content = dataset_manager.get_dataset_content(dataset_id) filters = get_request_filters() return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200 except NotAuthorisedException: diff --git a/server/core/datasets.py b/server/core/datasets.py index a1835e6..541db5d 100644 --- a/server/core/datasets.py +++ b/server/core/datasets.py @@ -7,13 +7,16 @@ class DatasetManager: def __init__(self, db: PostgresConnector): self.db = db - def get_dataset_and_validate(self, dataset_id: int, user_id: int) -> pd.DataFrame: + def authorize_user_dataset(self, dataset_id: int, user_id: int) -> bool: dataset_info = self.get_dataset_info(dataset_id) + if dataset_info.get("user_id", None) == None: + return False + if dataset_info.get("user_id") != user_id: - raise NotAuthorisedException("This user is not authorised to access this dataset") + return False - return self.get_dataset_content(dataset_id) + return True def get_dataset_content(self, dataset_id: int) -> pd.DataFrame: query = "SELECT * FROM events WHERE dataset_id = %s" @@ -102,4 +105,36 @@ class DatasetManager: for _, row in event_data.iterrows() ] - self.db.execute_batch(query, values) \ No newline at end of file + self.db.execute_batch(query, values) + + def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None): + if status not in ["processing", "complete", "error"]: + raise ValueError("Invalid status") + + query = """ + UPDATE datasets + SET status = %s, + status_message = %s, + completed_at = CASE + WHEN %s = 'complete' THEN NOW() + ELSE NULL + END + WHERE id = %s + """ + + self.db.execute(query, (status, status_message, status, dataset_id)) + + def get_dataset_status(self, dataset_id: int): + query = """ + SELECT status, status_message, completed_at + FROM datasets + WHERE id = %s + """ + + result = self.db.execute(query, (dataset_id, ), fetch=True) + + if not result: + print(result) + raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist") + + return result[0] \ No newline at end of file diff --git a/server/queue/tasks.py b/server/queue/tasks.py index 6076581..f2f3268 100644 --- a/server/queue/tasks.py +++ b/server/queue/tasks.py @@ -5,15 +5,20 @@ from server.analysis.enrichment import DatasetEnrichment @celery.task(bind=True, max_retries=3) def process_dataset(self, dataset_id: int, posts: list, topics: dict): - from server.db.database import PostgresConnector - from server.core.datasets import DatasetManager - db = PostgresConnector() - dataset_manager = DatasetManager(db) + try: + from server.db.database import PostgresConnector + from server.core.datasets import DatasetManager - df = pd.DataFrame(posts) + db = PostgresConnector() + dataset_manager = DatasetManager(db) - processor = DatasetEnrichment(df, topics) - enriched_df = processor.enrich() + df = pd.DataFrame(posts) - dataset_manager.save_dataset_content(dataset_id, enriched_df) \ No newline at end of file + processor = DatasetEnrichment(df, topics) + enriched_df = processor.enrich() + + dataset_manager.save_dataset_content(dataset_id, enriched_df) + dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully") + except Exception as e: + dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}") \ No newline at end of file From 0ede7fe071c149d1b646d9b82207b1b52e9d7bbe Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 14:18:43 +0000 Subject: [PATCH 13/14] fix(compose): add GPU support to celery worker --- docker-compose.dev.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 07a7085..04d435b 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -48,6 +48,13 @@ services: depends_on: - postgres - redis + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] volumes: model_cache: \ No newline at end of file From 9d1e8960fcdcf46acee145e586f84f14b52a2c61 Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Tue, 3 Mar 2026 14:25:25 +0000 Subject: [PATCH 14/14] perf: update cultural analysis to use regex instead of Counter --- server/analysis/cultural.py | 90 ++++++++++++------------------------- 1 file changed, 28 insertions(+), 62 deletions(-) diff --git a/server/analysis/cultural.py b/server/analysis/cultural.py index 909233e..fc4a93a 100644 --- a/server/analysis/cultural.py +++ b/server/analysis/cultural.py @@ -1,7 +1,6 @@ import pandas as pd import re -from collections import Counter from typing import Any @@ -14,9 +13,6 @@ class CulturalAnalysis: df = original_df.copy() s = df[self.content_col].fillna("").astype(str).str.lower() - in_group_words = {"we", "us", "our", "ourselves"} - out_group_words = {"they", "them", "their", "themselves"} - emotion_exclusions = {"emotion_neutral", "emotion_surprise"} emotion_cols = [ c for c in df.columns @@ -24,11 +20,13 @@ class CulturalAnalysis: ] # Tokenize per row - tokens_per_row = s.apply(lambda txt: re.findall(r"\b[a-z]{2,}\b", txt)) + in_pattern = re.compile(r"\b(we|us|our|ourselves)\b") + out_pattern = re.compile(r"\b(they|them|their|themselves)\b") + token_pattern = re.compile(r"\b[a-z]{2,}\b") - total_tokens = int(tokens_per_row.map(len).sum()) - in_hits = tokens_per_row.map(lambda toks: sum(t in in_group_words for t in toks)).astype(int) - out_hits = tokens_per_row.map(lambda toks: sum(t in out_group_words for t in toks)).astype(int) + in_hits = s.str.count(in_pattern) + out_hits = s.str.count(out_pattern) + total_tokens = s.str.count(token_pattern).sum() in_count = int(in_hits.sum()) out_count = int(out_hits.sum()) @@ -62,33 +60,15 @@ class CulturalAnalysis: def get_stance_markers(self, df: pd.DataFrame) -> dict[str, Any]: s = df[self.content_col].fillna("").astype(str) - hedges = { - "maybe", "perhaps", "possibly", "probably", "likely", "seems", "seem", - "i think", "i feel", "i guess", "kind of", "sort of", "somewhat" - } - certainty = { - "definitely", "certainly", "clearly", "obviously", "undeniably", "always", "never" - } + hedge_pattern = re.compile(r"\b(maybe|perhaps|possibly|probably|likely|seems|seem|i think|i feel|i guess|kind of|sort of|somewhat)\b") + certainty_pattern = re.compile(r"\b(definitely|certainly|clearly|obviously|undeniably|always|never)\b") + deontic_pattern = re.compile(r"\b(must|should|need|needs|have to|has to|ought|required|require)\b") + permission_pattern = re.compile(r"\b(can|allowed|okay|ok|permitted)\b") - deontic = { - "must", "should", "need", "needs", "have to", "has to", "ought", "required", "require" - } - - permission = {"can", "allowed", "okay", "ok", "permitted"} - - def count_phrases(text: str, phrases: set[str]) -> int: - c = 0 - for p in phrases: - if " " in p: - c += len(re.findall(r"\b" + re.escape(p) + r"\b", text)) - else: - c += len(re.findall(r"\b" + re.escape(p) + r"\b", text)) - return c - - hedge_counts = s.apply(lambda t: count_phrases(t, hedges)) - certainty_counts = s.apply(lambda t: count_phrases(t, certainty)) - deontic_counts = s.apply(lambda t: count_phrases(t, deontic)) - perm_counts = s.apply(lambda t: count_phrases(t, permission)) + hedge_counts = s.str.count(hedge_pattern) + certainty_counts = s.str.count(certainty_pattern) + deontic_counts = s.str.count(deontic_pattern) + perm_counts = s.str.count(permission_pattern) token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace(0, 1) @@ -108,44 +88,30 @@ class CulturalAnalysis: return {"entity_emotion_avg": {}} emotion_cols = [c for c in df.columns if c.startswith("emotion_")] - entity_counter = Counter() - for row in df["entities"].dropna(): - if isinstance(row, list): - for ent in row: - if isinstance(ent, dict): - text = ent.get("text") - if isinstance(text, str): - text = text.strip() - if len(text) >= 3: # filter short junk - entity_counter[text] += 1 + entity_df = df[["entities"] + emotion_cols].explode("entities") - top_entities = entity_counter.most_common(top_n) + entity_df["entity_text"] = entity_df["entities"].apply( + lambda e: e.get("text").strip() + if isinstance(e, dict) and isinstance(e.get("text"), str) and len(e.get("text")) >= 3 + else None + ) + entity_df = entity_df.dropna(subset=["entity_text"]) + entity_counts = entity_df["entity_text"].value_counts().head(top_n) entity_emotion_avg = {} - for entity_text, _ in top_entities: - mask = df["entities"].apply( - lambda ents: isinstance(ents, list) and - any(isinstance(e, dict) and e.get("text") == entity_text for e in ents) - ) - - post_count = int(mask.sum()) - - if post_count >= min_posts: + for entity_text, count in entity_counts.items(): + if count >= min_posts: emo_means = ( - df.loc[mask, emotion_cols] - .apply(pd.to_numeric, errors="coerce") - .fillna(0.0) + entity_df[entity_df["entity_text"] == entity_text][emotion_cols] .mean() .to_dict() ) entity_emotion_avg[entity_text] = { - "post_count": post_count, - "emotion_avg": emo_means + "post_count": int(count), + "emotion_avg": emo_means, } - return { - "entity_emotion_avg": entity_emotion_avg - } \ No newline at end of file + return {"entity_emotion_avg": entity_emotion_avg} \ No newline at end of file