Compare commits

..

15 Commits

Author SHA1 Message Date
6e263cf30b Merge pull request 'Implement job queue for asynchronous NLP' (#6) from feat/implement-job-queue into main
Reviewed-on: #6
2026-03-03 14:26:37 +00:00
9d1e8960fc perf: update cultural analysis to use regex instead of Counter 2026-03-03 14:25:25 +00:00
0ede7fe071 fix(compose): add GPU support to celery worker 2026-03-03 14:18:43 +00:00
eb4187c559 feat(api): add status returns for NLP processing 2026-03-03 13:46:37 +00:00
63cd465189 feat(db): add status and constraints to the schema 2026-03-03 13:46:06 +00:00
f93e45b827 fix(dataset): silent erros if dataset did not exist 2026-03-03 13:13:40 +00:00
075e1fba85 fix: typo in exception naming 2026-03-03 13:12:28 +00:00
a4c527ce5b fix(db): execute not committing if fetch flag was set 2026-03-03 13:10:50 +00:00
6d60820800 build: add persistent model caching 2026-03-03 13:00:19 +00:00
3772f83d11 fix: add title column to db
This was accidentally removed in a previous merge
2026-03-03 12:41:02 +00:00
f4894759d7 feat: add docker-compose dev 2026-03-03 12:34:51 +00:00
3a58705635 feat: add celery & redis for background data processing 2026-03-03 12:27:14 +00:00
2e0e842525 build: update reqs and docker compose 2026-03-03 12:09:50 +00:00
14b472ea60 build: add dockerfile for constructing backend 2026-03-03 12:09:27 +00:00
c767f59b26 feat: add redis to docker compose 2026-03-03 11:27:01 +00:00
13 changed files with 332 additions and 120 deletions

19
Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
# Use slim to reduce size
FROM python:3.13-slim
# Prevent Python from buffering stdout
ENV PYTHONUNBUFFERED=1
# System deps required for psycopg2 + torch
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
gcc \
curl \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]

60
docker-compose.dev.yml Normal file
View File

@@ -0,0 +1,60 @@
services:
postgres:
image: postgres:16
container_name: crosspost_db
restart: unless-stopped
env_file:
- .env
ports:
- "5432:5432"
volumes:
- ./server/db/postgres_vol:/var/lib/postgresql/data
- ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql
redis:
image: redis:7
container_name: crosspost_redis
restart: unless-stopped
ports:
- "6379:6379"
backend:
build: .
container_name: crosspost_flask
volumes:
- .:/app
- model_cache:/models
env_file:
- .env
ports:
- "5000:5000"
command: flask --app server.app run --host=0.0.0.0 --debug
depends_on:
- postgres
- redis
worker:
build: .
volumes:
- .:/app
- model_cache:/models
container_name: crosspost_worker
env_file:
- .env
command: >
celery -A server.queue.celery_app.celery worker
--loglevel=info
--pool=solo
depends_on:
- postgres
- redis
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
model_cache:

View File

@@ -1,7 +1,7 @@
services:
postgres:
image: postgres:16
container_name: postgres_db
container_name: crosspost_db
restart: unless-stopped
env_file:
- .env
@@ -11,5 +11,34 @@ services:
- ./server/db/postgres_vol:/var/lib/postgresql/data
- ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql
volumes:
postgres_data:
redis:
image: redis:7
container_name: crosspost_redis
restart: unless-stopped
ports:
- "6379:6379"
backend:
build: .
container_name: crosspost_flask
env_file:
- .env
ports:
- "5000:5000"
command: flask --app server.app run --host=0.0.0.0
depends_on:
- postgres
- redis
worker:
build: .
container_name: crosspost_worker
env_file:
- .env
command: >
celery -A server.queue.celery_app.celery worker
--loglevel=info
--pool=solo
depends_on:
- postgres
- redis

View File

@@ -1,13 +1,17 @@
beautifulsoup4==4.14.3
celery==5.6.2
redis==7.2.1
Flask==3.1.3
Flask_Bcrypt==1.0.1
flask_cors==6.0.2
Flask_JWT_Extended==4.7.1
google_api_python_client==2.188.0
nltk==3.9.2
numpy==2.4.2
pandas==3.0.1
psycopg2==2.9.11
psycopg2_binary==2.9.11
python-dotenv==1.2.1
python-dotenv==1.2.2
Requests==2.32.5
sentence_transformers==5.2.2
torch==2.10.0

View File

@@ -1,7 +1,6 @@
import pandas as pd
import re
from collections import Counter
from typing import Any
@@ -14,9 +13,6 @@ class CulturalAnalysis:
df = original_df.copy()
s = df[self.content_col].fillna("").astype(str).str.lower()
in_group_words = {"we", "us", "our", "ourselves"}
out_group_words = {"they", "them", "their", "themselves"}
emotion_exclusions = {"emotion_neutral", "emotion_surprise"}
emotion_cols = [
c for c in df.columns
@@ -24,11 +20,13 @@ class CulturalAnalysis:
]
# Tokenize per row
tokens_per_row = s.apply(lambda txt: re.findall(r"\b[a-z]{2,}\b", txt))
in_pattern = re.compile(r"\b(we|us|our|ourselves)\b")
out_pattern = re.compile(r"\b(they|them|their|themselves)\b")
token_pattern = re.compile(r"\b[a-z]{2,}\b")
total_tokens = int(tokens_per_row.map(len).sum())
in_hits = tokens_per_row.map(lambda toks: sum(t in in_group_words for t in toks)).astype(int)
out_hits = tokens_per_row.map(lambda toks: sum(t in out_group_words for t in toks)).astype(int)
in_hits = s.str.count(in_pattern)
out_hits = s.str.count(out_pattern)
total_tokens = s.str.count(token_pattern).sum()
in_count = int(in_hits.sum())
out_count = int(out_hits.sum())
@@ -62,33 +60,15 @@ class CulturalAnalysis:
def get_stance_markers(self, df: pd.DataFrame) -> dict[str, Any]:
s = df[self.content_col].fillna("").astype(str)
hedges = {
"maybe", "perhaps", "possibly", "probably", "likely", "seems", "seem",
"i think", "i feel", "i guess", "kind of", "sort of", "somewhat"
}
certainty = {
"definitely", "certainly", "clearly", "obviously", "undeniably", "always", "never"
}
hedge_pattern = re.compile(r"\b(maybe|perhaps|possibly|probably|likely|seems|seem|i think|i feel|i guess|kind of|sort of|somewhat)\b")
certainty_pattern = re.compile(r"\b(definitely|certainly|clearly|obviously|undeniably|always|never)\b")
deontic_pattern = re.compile(r"\b(must|should|need|needs|have to|has to|ought|required|require)\b")
permission_pattern = re.compile(r"\b(can|allowed|okay|ok|permitted)\b")
deontic = {
"must", "should", "need", "needs", "have to", "has to", "ought", "required", "require"
}
permission = {"can", "allowed", "okay", "ok", "permitted"}
def count_phrases(text: str, phrases: set[str]) -> int:
c = 0
for p in phrases:
if " " in p:
c += len(re.findall(r"\b" + re.escape(p) + r"\b", text))
else:
c += len(re.findall(r"\b" + re.escape(p) + r"\b", text))
return c
hedge_counts = s.apply(lambda t: count_phrases(t, hedges))
certainty_counts = s.apply(lambda t: count_phrases(t, certainty))
deontic_counts = s.apply(lambda t: count_phrases(t, deontic))
perm_counts = s.apply(lambda t: count_phrases(t, permission))
hedge_counts = s.str.count(hedge_pattern)
certainty_counts = s.str.count(certainty_pattern)
deontic_counts = s.str.count(deontic_pattern)
perm_counts = s.str.count(permission_pattern)
token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace(0, 1)
@@ -108,44 +88,30 @@ class CulturalAnalysis:
return {"entity_emotion_avg": {}}
emotion_cols = [c for c in df.columns if c.startswith("emotion_")]
entity_counter = Counter()
for row in df["entities"].dropna():
if isinstance(row, list):
for ent in row:
if isinstance(ent, dict):
text = ent.get("text")
if isinstance(text, str):
text = text.strip()
if len(text) >= 3: # filter short junk
entity_counter[text] += 1
entity_df = df[["entities"] + emotion_cols].explode("entities")
top_entities = entity_counter.most_common(top_n)
entity_emotion_avg = {}
for entity_text, _ in top_entities:
mask = df["entities"].apply(
lambda ents: isinstance(ents, list) and
any(isinstance(e, dict) and e.get("text") == entity_text for e in ents)
entity_df["entity_text"] = entity_df["entities"].apply(
lambda e: e.get("text").strip()
if isinstance(e, dict) and isinstance(e.get("text"), str) and len(e.get("text")) >= 3
else None
)
post_count = int(mask.sum())
entity_df = entity_df.dropna(subset=["entity_text"])
entity_counts = entity_df["entity_text"].value_counts().head(top_n)
entity_emotion_avg = {}
if post_count >= min_posts:
for entity_text, count in entity_counts.items():
if count >= min_posts:
emo_means = (
df.loc[mask, emotion_cols]
.apply(pd.to_numeric, errors="coerce")
.fillna(0.0)
entity_df[entity_df["entity_text"] == entity_text][emotion_cols]
.mean()
.to_dict()
)
entity_emotion_avg[entity_text] = {
"post_count": post_count,
"emotion_avg": emo_means
"post_count": int(count),
"emotion_avg": emo_means,
}
return {
"entity_emotion_avg": entity_emotion_avg
}
return {"entity_emotion_avg": entity_emotion_avg}

View File

@@ -3,7 +3,7 @@ import pandas as pd
from server.analysis.nlp import NLP
class DatasetEnrichment:
def __init__(self, df, topics):
def __init__(self, df: pd.DataFrame, topics: dict):
self.df = self._explode_comments(df)
self.topics = topics
self.nlp = NLP(self.df, "title", "content", self.topics)

View File

@@ -16,11 +16,12 @@ from flask_jwt_extended import (
from server.analysis.stat_gen import StatGen
from server.analysis.enrichment import DatasetEnrichment
from server.exceptions import NotAuthorisedException, NotExistentDatasetException
from server.exceptions import NotAuthorisedException, NonExistentDatasetException
from server.db.database import PostgresConnector
from server.core.auth import AuthManager
from server.core.datasets import DatasetManager
from server.utils import get_request_filters
from server.queue.tasks import process_dataset
app = Flask(__name__)
@@ -125,53 +126,79 @@ def upload_data():
), 400
try:
current_user = get_jwt_identity()
current_user = int(get_jwt_identity())
posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
topics = json.load(topic_file)
processor = DatasetEnrichment(posts_df, topics)
enriched_df = processor.enrich()
dataset_id = dataset_manager.save_dataset_info(current_user, f"dataset_{current_user}", topics)
dataset_manager.save_dataset_content(dataset_id, enriched_df)
process_dataset.delay(
dataset_id,
posts_df.to_dict(orient="records"),
topics
)
return jsonify(
{
"message": "File uploaded successfully",
"event_count": len(enriched_df),
"message": "Dataset queued for processing",
"dataset_id": dataset_id,
"status": "processing"
}
), 200
), 202
except ValueError as e:
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
@app.route("/dataset/<int:dataset_id>", methods=["GET"])
@jwt_required()
def get_dataset(dataset_id):
try:
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
filtered_dataset = stat_gen.filter_dataset(dataset_content, filters)
return jsonify(filtered_dataset), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NotExistentDatasetException:
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except Exception:
print(traceback.format_exc())
return jsonify({"error": "An unexpected error occured"}), 500
@app.route("/dataset/<int:dataset_id>/status", methods=["GET"])
@jwt_required()
def get_dataset_status(dataset_id):
try:
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_status = dataset_manager.get_dataset_status(dataset_id)
return jsonify(dataset_status), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except Exception:
print(traceback.format_exc())
return jsonify({"error": "An unexpected error occured"}), 500
@app.route("/dataset/<int:dataset_id>/content", methods=["GET"])
@jwt_required()
def content_endpoint(dataset_id):
try:
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -187,8 +214,11 @@ def content_endpoint(dataset_id):
@jwt_required()
def get_summary(dataset_id):
try:
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.summary(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -204,8 +234,11 @@ def get_summary(dataset_id):
@jwt_required()
def get_time_analysis(dataset_id):
try:
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -221,8 +254,11 @@ def get_time_analysis(dataset_id):
@jwt_required()
def get_user_analysis(dataset_id):
try:
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -238,8 +274,11 @@ def get_user_analysis(dataset_id):
@jwt_required()
def get_cultural_analysis(dataset_id):
try:
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -255,8 +294,11 @@ def get_cultural_analysis(dataset_id):
@jwt_required()
def get_interaction_analysis(dataset_id):
try:
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200
except NotAuthorisedException:

View File

@@ -1,19 +1,22 @@
import pandas as pd
from server.db.database import PostgresConnector
from psycopg2.extras import Json
from server.exceptions import NotAuthorisedException
from server.exceptions import NotAuthorisedException, NonExistentDatasetException
class DatasetManager:
def __init__(self, db: PostgresConnector):
self.db = db
def get_dataset_and_validate(self, dataset_id: int, user_id: int) -> pd.DataFrame:
def authorize_user_dataset(self, dataset_id: int, user_id: int) -> bool:
dataset_info = self.get_dataset_info(dataset_id)
if dataset_info.get("user_id") != user_id:
raise NotAuthorisedException("This user is not authorised to access this dataset")
if dataset_info.get("user_id", None) == None:
return False
return self.get_dataset_content(dataset_id)
if dataset_info.get("user_id") != user_id:
return False
return True
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
@@ -23,7 +26,11 @@ class DatasetManager:
def get_dataset_info(self, dataset_id: int) -> dict:
query = "SELECT * FROM datasets WHERE id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True)
return result[0] if result else None
if not result:
raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist")
return result[0]
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
query = """
@@ -34,11 +41,6 @@ class DatasetManager:
result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
return result[0]["id"] if result else None
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True)
return pd.DataFrame(result)
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
if event_data.empty:
return
@@ -49,6 +51,7 @@ class DatasetManager:
type,
parent_id,
author,
title,
content,
timestamp,
date,
@@ -70,7 +73,8 @@ class DatasetManager:
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s
%s, %s, %s, %s, %s,
%s
)
"""
@@ -80,6 +84,7 @@ class DatasetManager:
row["type"],
row["parent_id"],
row["author"],
row.get("title"),
row["content"],
row["timestamp"],
row["date"],
@@ -101,3 +106,35 @@ class DatasetManager:
]
self.db.execute_batch(query, values)
def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None):
if status not in ["processing", "complete", "error"]:
raise ValueError("Invalid status")
query = """
UPDATE datasets
SET status = %s,
status_message = %s,
completed_at = CASE
WHEN %s = 'complete' THEN NOW()
ELSE NULL
END
WHERE id = %s
"""
self.db.execute(query, (status, status_message, status, dataset_id))
def get_dataset_status(self, dataset_id: int):
query = """
SELECT status, status_message, completed_at
FROM datasets
WHERE id = %s
"""
result = self.db.execute(query, (dataset_id, ), fetch=True)
if not result:
print(result)
raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist")
return result[0]

View File

@@ -27,19 +27,21 @@ class PostgresConnector:
self.connection.autocommit = False
def execute(self, query, params=None, fetch=False) -> list:
try:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
if fetch:
return cursor.fetchall()
result = cursor.fetchall() if fetch else None
self.connection.commit()
return result
except Exception:
self.connection.rollback()
raise
def execute_batch(self, query, values):
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
execute_batch(cursor, query, values)
self.connection.commit()
## User Management Methods
def close(self):
if self.connection:
self.connection.close()

View File

@@ -11,9 +11,19 @@ CREATE TABLE datasets (
user_id INTEGER NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
-- Job state machine
status TEXT NOT NULL DEFAULT 'processing',
status_message TEXT,
completed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
topics JSONB,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
-- Enforce valid states
CONSTRAINT datasets_status_check
CHECK (status IN ('processing', 'complete', 'error'))
);
CREATE TABLE events (
@@ -30,7 +40,10 @@ CREATE TABLE events (
hour INTEGER NOT NULL,
weekday VARCHAR(255) NOT NULL,
/* Comments and Replies */
/* Posts Only */
title VARCHAR(255),
/* Comments Only*/
parent_id VARCHAR(255),
reply_to VARCHAR(255),
source VARCHAR(255) NOT NULL,

View File

@@ -1,7 +1,7 @@
class NotAuthorisedException(Exception):
pass
class NotExistentDatasetException(Exception):
class NonExistentDatasetException(Exception):
pass
class DatabaseNotConfiguredException(Exception):

View File

@@ -0,0 +1,16 @@
from celery import Celery
def create_celery():
celery = Celery(
"ethnograph",
broker="redis://redis:6379/0",
backend="redis://redis:6379/0",
)
celery.conf.task_serializer = "json"
celery.conf.result_serializer = "json"
celery.conf.accept_content = ["json"]
return celery
celery = create_celery()
from server.queue import tasks

24
server/queue/tasks.py Normal file
View File

@@ -0,0 +1,24 @@
import pandas as pd
from server.queue.celery_app import celery
from server.analysis.enrichment import DatasetEnrichment
@celery.task(bind=True, max_retries=3)
def process_dataset(self, dataset_id: int, posts: list, topics: dict):
try:
from server.db.database import PostgresConnector
from server.core.datasets import DatasetManager
db = PostgresConnector()
dataset_manager = DatasetManager(db)
df = pd.DataFrame(posts)
processor = DatasetEnrichment(df, topics)
enriched_df = processor.enrich()
dataset_manager.save_dataset_content(dataset_id, enriched_df)
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
except Exception as e:
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")