Merge pull request 'Implement job queue for asynchronous NLP' (#6) from feat/implement-job-queue into main
Reviewed-on: #6
This commit was merged in pull request #6.
This commit is contained in:
19
Dockerfile
Normal file
19
Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
# Use slim to reduce size
|
||||||
|
FROM python:3.13-slim
|
||||||
|
|
||||||
|
# Prevent Python from buffering stdout
|
||||||
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
|
||||||
|
# System deps required for psycopg2 + torch
|
||||||
|
RUN apt-get update && apt-get install -y \
|
||||||
|
build-essential \
|
||||||
|
libpq-dev \
|
||||||
|
gcc \
|
||||||
|
curl \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
COPY . .
|
||||||
|
CMD ["python", "main.py"]
|
||||||
60
docker-compose.dev.yml
Normal file
60
docker-compose.dev.yml
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
services:
|
||||||
|
postgres:
|
||||||
|
image: postgres:16
|
||||||
|
container_name: crosspost_db
|
||||||
|
restart: unless-stopped
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
ports:
|
||||||
|
- "5432:5432"
|
||||||
|
volumes:
|
||||||
|
- ./server/db/postgres_vol:/var/lib/postgresql/data
|
||||||
|
- ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql
|
||||||
|
|
||||||
|
redis:
|
||||||
|
image: redis:7
|
||||||
|
container_name: crosspost_redis
|
||||||
|
restart: unless-stopped
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
|
||||||
|
backend:
|
||||||
|
build: .
|
||||||
|
container_name: crosspost_flask
|
||||||
|
volumes:
|
||||||
|
- .:/app
|
||||||
|
- model_cache:/models
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
ports:
|
||||||
|
- "5000:5000"
|
||||||
|
command: flask --app server.app run --host=0.0.0.0 --debug
|
||||||
|
depends_on:
|
||||||
|
- postgres
|
||||||
|
- redis
|
||||||
|
|
||||||
|
worker:
|
||||||
|
build: .
|
||||||
|
volumes:
|
||||||
|
- .:/app
|
||||||
|
- model_cache:/models
|
||||||
|
container_name: crosspost_worker
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
command: >
|
||||||
|
celery -A server.queue.celery_app.celery worker
|
||||||
|
--loglevel=info
|
||||||
|
--pool=solo
|
||||||
|
depends_on:
|
||||||
|
- postgres
|
||||||
|
- redis
|
||||||
|
deploy:
|
||||||
|
resources:
|
||||||
|
reservations:
|
||||||
|
devices:
|
||||||
|
- driver: nvidia
|
||||||
|
count: 1
|
||||||
|
capabilities: [gpu]
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
model_cache:
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
services:
|
services:
|
||||||
postgres:
|
postgres:
|
||||||
image: postgres:16
|
image: postgres:16
|
||||||
container_name: postgres_db
|
container_name: crosspost_db
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
env_file:
|
env_file:
|
||||||
- .env
|
- .env
|
||||||
@@ -11,5 +11,34 @@ services:
|
|||||||
- ./server/db/postgres_vol:/var/lib/postgresql/data
|
- ./server/db/postgres_vol:/var/lib/postgresql/data
|
||||||
- ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql
|
- ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql
|
||||||
|
|
||||||
volumes:
|
redis:
|
||||||
postgres_data:
|
image: redis:7
|
||||||
|
container_name: crosspost_redis
|
||||||
|
restart: unless-stopped
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
|
||||||
|
backend:
|
||||||
|
build: .
|
||||||
|
container_name: crosspost_flask
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
ports:
|
||||||
|
- "5000:5000"
|
||||||
|
command: flask --app server.app run --host=0.0.0.0
|
||||||
|
depends_on:
|
||||||
|
- postgres
|
||||||
|
- redis
|
||||||
|
|
||||||
|
worker:
|
||||||
|
build: .
|
||||||
|
container_name: crosspost_worker
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
command: >
|
||||||
|
celery -A server.queue.celery_app.celery worker
|
||||||
|
--loglevel=info
|
||||||
|
--pool=solo
|
||||||
|
depends_on:
|
||||||
|
- postgres
|
||||||
|
- redis
|
||||||
@@ -1,13 +1,17 @@
|
|||||||
beautifulsoup4==4.14.3
|
beautifulsoup4==4.14.3
|
||||||
|
celery==5.6.2
|
||||||
|
redis==7.2.1
|
||||||
Flask==3.1.3
|
Flask==3.1.3
|
||||||
|
Flask_Bcrypt==1.0.1
|
||||||
flask_cors==6.0.2
|
flask_cors==6.0.2
|
||||||
|
Flask_JWT_Extended==4.7.1
|
||||||
google_api_python_client==2.188.0
|
google_api_python_client==2.188.0
|
||||||
nltk==3.9.2
|
nltk==3.9.2
|
||||||
numpy==2.4.2
|
numpy==2.4.2
|
||||||
pandas==3.0.1
|
pandas==3.0.1
|
||||||
psycopg2==2.9.11
|
psycopg2==2.9.11
|
||||||
psycopg2_binary==2.9.11
|
psycopg2_binary==2.9.11
|
||||||
python-dotenv==1.2.1
|
python-dotenv==1.2.2
|
||||||
Requests==2.32.5
|
Requests==2.32.5
|
||||||
sentence_transformers==5.2.2
|
sentence_transformers==5.2.2
|
||||||
torch==2.10.0
|
torch==2.10.0
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from collections import Counter
|
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
@@ -14,9 +13,6 @@ class CulturalAnalysis:
|
|||||||
df = original_df.copy()
|
df = original_df.copy()
|
||||||
s = df[self.content_col].fillna("").astype(str).str.lower()
|
s = df[self.content_col].fillna("").astype(str).str.lower()
|
||||||
|
|
||||||
in_group_words = {"we", "us", "our", "ourselves"}
|
|
||||||
out_group_words = {"they", "them", "their", "themselves"}
|
|
||||||
|
|
||||||
emotion_exclusions = {"emotion_neutral", "emotion_surprise"}
|
emotion_exclusions = {"emotion_neutral", "emotion_surprise"}
|
||||||
emotion_cols = [
|
emotion_cols = [
|
||||||
c for c in df.columns
|
c for c in df.columns
|
||||||
@@ -24,11 +20,13 @@ class CulturalAnalysis:
|
|||||||
]
|
]
|
||||||
|
|
||||||
# Tokenize per row
|
# Tokenize per row
|
||||||
tokens_per_row = s.apply(lambda txt: re.findall(r"\b[a-z]{2,}\b", txt))
|
in_pattern = re.compile(r"\b(we|us|our|ourselves)\b")
|
||||||
|
out_pattern = re.compile(r"\b(they|them|their|themselves)\b")
|
||||||
|
token_pattern = re.compile(r"\b[a-z]{2,}\b")
|
||||||
|
|
||||||
total_tokens = int(tokens_per_row.map(len).sum())
|
in_hits = s.str.count(in_pattern)
|
||||||
in_hits = tokens_per_row.map(lambda toks: sum(t in in_group_words for t in toks)).astype(int)
|
out_hits = s.str.count(out_pattern)
|
||||||
out_hits = tokens_per_row.map(lambda toks: sum(t in out_group_words for t in toks)).astype(int)
|
total_tokens = s.str.count(token_pattern).sum()
|
||||||
|
|
||||||
in_count = int(in_hits.sum())
|
in_count = int(in_hits.sum())
|
||||||
out_count = int(out_hits.sum())
|
out_count = int(out_hits.sum())
|
||||||
@@ -62,33 +60,15 @@ class CulturalAnalysis:
|
|||||||
def get_stance_markers(self, df: pd.DataFrame) -> dict[str, Any]:
|
def get_stance_markers(self, df: pd.DataFrame) -> dict[str, Any]:
|
||||||
s = df[self.content_col].fillna("").astype(str)
|
s = df[self.content_col].fillna("").astype(str)
|
||||||
|
|
||||||
hedges = {
|
hedge_pattern = re.compile(r"\b(maybe|perhaps|possibly|probably|likely|seems|seem|i think|i feel|i guess|kind of|sort of|somewhat)\b")
|
||||||
"maybe", "perhaps", "possibly", "probably", "likely", "seems", "seem",
|
certainty_pattern = re.compile(r"\b(definitely|certainly|clearly|obviously|undeniably|always|never)\b")
|
||||||
"i think", "i feel", "i guess", "kind of", "sort of", "somewhat"
|
deontic_pattern = re.compile(r"\b(must|should|need|needs|have to|has to|ought|required|require)\b")
|
||||||
}
|
permission_pattern = re.compile(r"\b(can|allowed|okay|ok|permitted)\b")
|
||||||
certainty = {
|
|
||||||
"definitely", "certainly", "clearly", "obviously", "undeniably", "always", "never"
|
|
||||||
}
|
|
||||||
|
|
||||||
deontic = {
|
hedge_counts = s.str.count(hedge_pattern)
|
||||||
"must", "should", "need", "needs", "have to", "has to", "ought", "required", "require"
|
certainty_counts = s.str.count(certainty_pattern)
|
||||||
}
|
deontic_counts = s.str.count(deontic_pattern)
|
||||||
|
perm_counts = s.str.count(permission_pattern)
|
||||||
permission = {"can", "allowed", "okay", "ok", "permitted"}
|
|
||||||
|
|
||||||
def count_phrases(text: str, phrases: set[str]) -> int:
|
|
||||||
c = 0
|
|
||||||
for p in phrases:
|
|
||||||
if " " in p:
|
|
||||||
c += len(re.findall(r"\b" + re.escape(p) + r"\b", text))
|
|
||||||
else:
|
|
||||||
c += len(re.findall(r"\b" + re.escape(p) + r"\b", text))
|
|
||||||
return c
|
|
||||||
|
|
||||||
hedge_counts = s.apply(lambda t: count_phrases(t, hedges))
|
|
||||||
certainty_counts = s.apply(lambda t: count_phrases(t, certainty))
|
|
||||||
deontic_counts = s.apply(lambda t: count_phrases(t, deontic))
|
|
||||||
perm_counts = s.apply(lambda t: count_phrases(t, permission))
|
|
||||||
|
|
||||||
token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace(0, 1)
|
token_counts = s.apply(lambda t: len(re.findall(r"\b[a-z]{2,}\b", t))).replace(0, 1)
|
||||||
|
|
||||||
@@ -108,44 +88,30 @@ class CulturalAnalysis:
|
|||||||
return {"entity_emotion_avg": {}}
|
return {"entity_emotion_avg": {}}
|
||||||
|
|
||||||
emotion_cols = [c for c in df.columns if c.startswith("emotion_")]
|
emotion_cols = [c for c in df.columns if c.startswith("emotion_")]
|
||||||
entity_counter = Counter()
|
|
||||||
|
|
||||||
for row in df["entities"].dropna():
|
entity_df = df[["entities"] + emotion_cols].explode("entities")
|
||||||
if isinstance(row, list):
|
|
||||||
for ent in row:
|
|
||||||
if isinstance(ent, dict):
|
|
||||||
text = ent.get("text")
|
|
||||||
if isinstance(text, str):
|
|
||||||
text = text.strip()
|
|
||||||
if len(text) >= 3: # filter short junk
|
|
||||||
entity_counter[text] += 1
|
|
||||||
|
|
||||||
top_entities = entity_counter.most_common(top_n)
|
entity_df["entity_text"] = entity_df["entities"].apply(
|
||||||
|
lambda e: e.get("text").strip()
|
||||||
entity_emotion_avg = {}
|
if isinstance(e, dict) and isinstance(e.get("text"), str) and len(e.get("text")) >= 3
|
||||||
|
else None
|
||||||
for entity_text, _ in top_entities:
|
|
||||||
mask = df["entities"].apply(
|
|
||||||
lambda ents: isinstance(ents, list) and
|
|
||||||
any(isinstance(e, dict) and e.get("text") == entity_text for e in ents)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
post_count = int(mask.sum())
|
entity_df = entity_df.dropna(subset=["entity_text"])
|
||||||
|
entity_counts = entity_df["entity_text"].value_counts().head(top_n)
|
||||||
|
entity_emotion_avg = {}
|
||||||
|
|
||||||
if post_count >= min_posts:
|
for entity_text, count in entity_counts.items():
|
||||||
|
if count >= min_posts:
|
||||||
emo_means = (
|
emo_means = (
|
||||||
df.loc[mask, emotion_cols]
|
entity_df[entity_df["entity_text"] == entity_text][emotion_cols]
|
||||||
.apply(pd.to_numeric, errors="coerce")
|
|
||||||
.fillna(0.0)
|
|
||||||
.mean()
|
.mean()
|
||||||
.to_dict()
|
.to_dict()
|
||||||
)
|
)
|
||||||
|
|
||||||
entity_emotion_avg[entity_text] = {
|
entity_emotion_avg[entity_text] = {
|
||||||
"post_count": post_count,
|
"post_count": int(count),
|
||||||
"emotion_avg": emo_means
|
"emotion_avg": emo_means,
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {"entity_emotion_avg": entity_emotion_avg}
|
||||||
"entity_emotion_avg": entity_emotion_avg
|
|
||||||
}
|
|
||||||
@@ -3,7 +3,7 @@ import pandas as pd
|
|||||||
from server.analysis.nlp import NLP
|
from server.analysis.nlp import NLP
|
||||||
|
|
||||||
class DatasetEnrichment:
|
class DatasetEnrichment:
|
||||||
def __init__(self, df, topics):
|
def __init__(self, df: pd.DataFrame, topics: dict):
|
||||||
self.df = self._explode_comments(df)
|
self.df = self._explode_comments(df)
|
||||||
self.topics = topics
|
self.topics = topics
|
||||||
self.nlp = NLP(self.df, "title", "content", self.topics)
|
self.nlp = NLP(self.df, "title", "content", self.topics)
|
||||||
|
|||||||
@@ -16,11 +16,12 @@ from flask_jwt_extended import (
|
|||||||
|
|
||||||
from server.analysis.stat_gen import StatGen
|
from server.analysis.stat_gen import StatGen
|
||||||
from server.analysis.enrichment import DatasetEnrichment
|
from server.analysis.enrichment import DatasetEnrichment
|
||||||
from server.exceptions import NotAuthorisedException, NotExistentDatasetException
|
from server.exceptions import NotAuthorisedException, NonExistentDatasetException
|
||||||
from server.db.database import PostgresConnector
|
from server.db.database import PostgresConnector
|
||||||
from server.core.auth import AuthManager
|
from server.core.auth import AuthManager
|
||||||
from server.core.datasets import DatasetManager
|
from server.core.datasets import DatasetManager
|
||||||
from server.utils import get_request_filters
|
from server.utils import get_request_filters
|
||||||
|
from server.queue.tasks import process_dataset
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
||||||
@@ -125,53 +126,79 @@ def upload_data():
|
|||||||
), 400
|
), 400
|
||||||
|
|
||||||
try:
|
try:
|
||||||
current_user = get_jwt_identity()
|
current_user = int(get_jwt_identity())
|
||||||
|
|
||||||
posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
|
posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
|
||||||
topics = json.load(topic_file)
|
topics = json.load(topic_file)
|
||||||
|
|
||||||
processor = DatasetEnrichment(posts_df, topics)
|
|
||||||
enriched_df = processor.enrich()
|
|
||||||
dataset_id = dataset_manager.save_dataset_info(current_user, f"dataset_{current_user}", topics)
|
dataset_id = dataset_manager.save_dataset_info(current_user, f"dataset_{current_user}", topics)
|
||||||
dataset_manager.save_dataset_content(dataset_id, enriched_df)
|
|
||||||
|
process_dataset.delay(
|
||||||
|
dataset_id,
|
||||||
|
posts_df.to_dict(orient="records"),
|
||||||
|
topics
|
||||||
|
)
|
||||||
|
|
||||||
return jsonify(
|
return jsonify(
|
||||||
{
|
{
|
||||||
"message": "File uploaded successfully",
|
"message": "Dataset queued for processing",
|
||||||
"event_count": len(enriched_df),
|
|
||||||
"dataset_id": dataset_id,
|
"dataset_id": dataset_id,
|
||||||
|
"status": "processing"
|
||||||
}
|
}
|
||||||
), 200
|
), 202
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
|
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||||
|
|
||||||
|
|
||||||
@app.route("/dataset/<int:dataset_id>", methods=["GET"])
|
@app.route("/dataset/<int:dataset_id>", methods=["GET"])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_dataset(dataset_id):
|
def get_dataset(dataset_id):
|
||||||
try:
|
try:
|
||||||
user_id = get_jwt_identity()
|
user_id = int(get_jwt_identity())
|
||||||
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
|
|
||||||
|
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
|
||||||
|
raise NotAuthorisedException("This user is not authorised to access this dataset")
|
||||||
|
|
||||||
|
dataset_content = dataset_manager.get_dataset_content(dataset_id)
|
||||||
filters = get_request_filters()
|
filters = get_request_filters()
|
||||||
filtered_dataset = stat_gen.filter_dataset(dataset_content, filters)
|
filtered_dataset = stat_gen.filter_dataset(dataset_content, filters)
|
||||||
return jsonify(filtered_dataset), 200
|
return jsonify(filtered_dataset), 200
|
||||||
except NotAuthorisedException:
|
except NotAuthorisedException:
|
||||||
return jsonify({"error": "User is not authorised to access this content"}), 403
|
return jsonify({"error": "User is not authorised to access this content"}), 403
|
||||||
except NotExistentDatasetException:
|
except NonExistentDatasetException:
|
||||||
return jsonify({"error": "Dataset does not exist"}), 404
|
return jsonify({"error": "Dataset does not exist"}), 404
|
||||||
except Exception:
|
except Exception:
|
||||||
print(traceback.format_exc())
|
print(traceback.format_exc())
|
||||||
return jsonify({"error": "An unexpected error occured"}), 500
|
return jsonify({"error": "An unexpected error occured"}), 500
|
||||||
|
|
||||||
|
@app.route("/dataset/<int:dataset_id>/status", methods=["GET"])
|
||||||
|
@jwt_required()
|
||||||
|
def get_dataset_status(dataset_id):
|
||||||
|
try:
|
||||||
|
user_id = int(get_jwt_identity())
|
||||||
|
|
||||||
|
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
|
||||||
|
raise NotAuthorisedException("This user is not authorised to access this dataset")
|
||||||
|
|
||||||
|
dataset_status = dataset_manager.get_dataset_status(dataset_id)
|
||||||
|
return jsonify(dataset_status), 200
|
||||||
|
except NotAuthorisedException:
|
||||||
|
return jsonify({"error": "User is not authorised to access this content"}), 403
|
||||||
|
except NonExistentDatasetException:
|
||||||
|
return jsonify({"error": "Dataset does not exist"}), 404
|
||||||
|
except Exception:
|
||||||
|
print(traceback.format_exc())
|
||||||
|
return jsonify({"error": "An unexpected error occured"}), 500
|
||||||
|
|
||||||
@app.route("/dataset/<int:dataset_id>/content", methods=["GET"])
|
@app.route("/dataset/<int:dataset_id>/content", methods=["GET"])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def content_endpoint(dataset_id):
|
def content_endpoint(dataset_id):
|
||||||
try:
|
try:
|
||||||
user_id = get_jwt_identity()
|
user_id = int(get_jwt_identity())
|
||||||
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
|
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
|
||||||
|
raise NotAuthorisedException("This user is not authorised to access this dataset")
|
||||||
|
|
||||||
|
dataset_content = dataset_manager.get_dataset_content(dataset_id)
|
||||||
filters = get_request_filters()
|
filters = get_request_filters()
|
||||||
return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200
|
return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200
|
||||||
except NotAuthorisedException:
|
except NotAuthorisedException:
|
||||||
@@ -187,8 +214,11 @@ def content_endpoint(dataset_id):
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_summary(dataset_id):
|
def get_summary(dataset_id):
|
||||||
try:
|
try:
|
||||||
user_id = get_jwt_identity()
|
user_id = int(get_jwt_identity())
|
||||||
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
|
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
|
||||||
|
raise NotAuthorisedException("This user is not authorised to access this dataset")
|
||||||
|
|
||||||
|
dataset_content = dataset_manager.get_dataset_content(dataset_id)
|
||||||
filters = get_request_filters()
|
filters = get_request_filters()
|
||||||
return jsonify(stat_gen.summary(dataset_content, filters)), 200
|
return jsonify(stat_gen.summary(dataset_content, filters)), 200
|
||||||
except NotAuthorisedException:
|
except NotAuthorisedException:
|
||||||
@@ -204,8 +234,11 @@ def get_summary(dataset_id):
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_time_analysis(dataset_id):
|
def get_time_analysis(dataset_id):
|
||||||
try:
|
try:
|
||||||
user_id = get_jwt_identity()
|
user_id = int(get_jwt_identity())
|
||||||
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
|
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
|
||||||
|
raise NotAuthorisedException("This user is not authorised to access this dataset")
|
||||||
|
|
||||||
|
dataset_content = dataset_manager.get_dataset_content(dataset_id)
|
||||||
filters = get_request_filters()
|
filters = get_request_filters()
|
||||||
return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200
|
return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200
|
||||||
except NotAuthorisedException:
|
except NotAuthorisedException:
|
||||||
@@ -221,8 +254,11 @@ def get_time_analysis(dataset_id):
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_user_analysis(dataset_id):
|
def get_user_analysis(dataset_id):
|
||||||
try:
|
try:
|
||||||
user_id = get_jwt_identity()
|
user_id = int(get_jwt_identity())
|
||||||
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
|
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
|
||||||
|
raise NotAuthorisedException("This user is not authorised to access this dataset")
|
||||||
|
|
||||||
|
dataset_content = dataset_manager.get_dataset_content(dataset_id)
|
||||||
filters = get_request_filters()
|
filters = get_request_filters()
|
||||||
return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200
|
return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200
|
||||||
except NotAuthorisedException:
|
except NotAuthorisedException:
|
||||||
@@ -238,8 +274,11 @@ def get_user_analysis(dataset_id):
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_cultural_analysis(dataset_id):
|
def get_cultural_analysis(dataset_id):
|
||||||
try:
|
try:
|
||||||
user_id = get_jwt_identity()
|
user_id = int(get_jwt_identity())
|
||||||
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
|
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
|
||||||
|
raise NotAuthorisedException("This user is not authorised to access this dataset")
|
||||||
|
|
||||||
|
dataset_content = dataset_manager.get_dataset_content(dataset_id)
|
||||||
filters = get_request_filters()
|
filters = get_request_filters()
|
||||||
return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200
|
return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200
|
||||||
except NotAuthorisedException:
|
except NotAuthorisedException:
|
||||||
@@ -255,8 +294,11 @@ def get_cultural_analysis(dataset_id):
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_interaction_analysis(dataset_id):
|
def get_interaction_analysis(dataset_id):
|
||||||
try:
|
try:
|
||||||
user_id = get_jwt_identity()
|
user_id = int(get_jwt_identity())
|
||||||
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
|
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
|
||||||
|
raise NotAuthorisedException("This user is not authorised to access this dataset")
|
||||||
|
|
||||||
|
dataset_content = dataset_manager.get_dataset_content(dataset_id)
|
||||||
filters = get_request_filters()
|
filters = get_request_filters()
|
||||||
return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200
|
return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200
|
||||||
except NotAuthorisedException:
|
except NotAuthorisedException:
|
||||||
|
|||||||
@@ -1,19 +1,22 @@
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
from server.db.database import PostgresConnector
|
from server.db.database import PostgresConnector
|
||||||
from psycopg2.extras import Json
|
from psycopg2.extras import Json
|
||||||
from server.exceptions import NotAuthorisedException
|
from server.exceptions import NotAuthorisedException, NonExistentDatasetException
|
||||||
|
|
||||||
class DatasetManager:
|
class DatasetManager:
|
||||||
def __init__(self, db: PostgresConnector):
|
def __init__(self, db: PostgresConnector):
|
||||||
self.db = db
|
self.db = db
|
||||||
|
|
||||||
def get_dataset_and_validate(self, dataset_id: int, user_id: int) -> pd.DataFrame:
|
def authorize_user_dataset(self, dataset_id: int, user_id: int) -> bool:
|
||||||
dataset_info = self.get_dataset_info(dataset_id)
|
dataset_info = self.get_dataset_info(dataset_id)
|
||||||
|
|
||||||
if dataset_info.get("user_id") != user_id:
|
if dataset_info.get("user_id", None) == None:
|
||||||
raise NotAuthorisedException("This user is not authorised to access this dataset")
|
return False
|
||||||
|
|
||||||
return self.get_dataset_content(dataset_id)
|
if dataset_info.get("user_id") != user_id:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
|
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
|
||||||
query = "SELECT * FROM events WHERE dataset_id = %s"
|
query = "SELECT * FROM events WHERE dataset_id = %s"
|
||||||
@@ -23,7 +26,11 @@ class DatasetManager:
|
|||||||
def get_dataset_info(self, dataset_id: int) -> dict:
|
def get_dataset_info(self, dataset_id: int) -> dict:
|
||||||
query = "SELECT * FROM datasets WHERE id = %s"
|
query = "SELECT * FROM datasets WHERE id = %s"
|
||||||
result = self.db.execute(query, (dataset_id,), fetch=True)
|
result = self.db.execute(query, (dataset_id,), fetch=True)
|
||||||
return result[0] if result else None
|
|
||||||
|
if not result:
|
||||||
|
raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist")
|
||||||
|
|
||||||
|
return result[0]
|
||||||
|
|
||||||
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
|
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
|
||||||
query = """
|
query = """
|
||||||
@@ -34,11 +41,6 @@ class DatasetManager:
|
|||||||
result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
|
result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
|
||||||
return result[0]["id"] if result else None
|
return result[0]["id"] if result else None
|
||||||
|
|
||||||
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
|
|
||||||
query = "SELECT * FROM events WHERE dataset_id = %s"
|
|
||||||
result = self.db.execute(query, (dataset_id,), fetch=True)
|
|
||||||
return pd.DataFrame(result)
|
|
||||||
|
|
||||||
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
|
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
|
||||||
if event_data.empty:
|
if event_data.empty:
|
||||||
return
|
return
|
||||||
@@ -49,6 +51,7 @@ class DatasetManager:
|
|||||||
type,
|
type,
|
||||||
parent_id,
|
parent_id,
|
||||||
author,
|
author,
|
||||||
|
title,
|
||||||
content,
|
content,
|
||||||
timestamp,
|
timestamp,
|
||||||
date,
|
date,
|
||||||
@@ -70,7 +73,8 @@ class DatasetManager:
|
|||||||
%s, %s, %s, %s, %s,
|
%s, %s, %s, %s, %s,
|
||||||
%s, %s, %s, %s, %s,
|
%s, %s, %s, %s, %s,
|
||||||
%s, %s, %s, %s, %s,
|
%s, %s, %s, %s, %s,
|
||||||
%s, %s, %s, %s, %s
|
%s, %s, %s, %s, %s,
|
||||||
|
%s
|
||||||
)
|
)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -80,6 +84,7 @@ class DatasetManager:
|
|||||||
row["type"],
|
row["type"],
|
||||||
row["parent_id"],
|
row["parent_id"],
|
||||||
row["author"],
|
row["author"],
|
||||||
|
row.get("title"),
|
||||||
row["content"],
|
row["content"],
|
||||||
row["timestamp"],
|
row["timestamp"],
|
||||||
row["date"],
|
row["date"],
|
||||||
@@ -101,3 +106,35 @@ class DatasetManager:
|
|||||||
]
|
]
|
||||||
|
|
||||||
self.db.execute_batch(query, values)
|
self.db.execute_batch(query, values)
|
||||||
|
|
||||||
|
def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None):
|
||||||
|
if status not in ["processing", "complete", "error"]:
|
||||||
|
raise ValueError("Invalid status")
|
||||||
|
|
||||||
|
query = """
|
||||||
|
UPDATE datasets
|
||||||
|
SET status = %s,
|
||||||
|
status_message = %s,
|
||||||
|
completed_at = CASE
|
||||||
|
WHEN %s = 'complete' THEN NOW()
|
||||||
|
ELSE NULL
|
||||||
|
END
|
||||||
|
WHERE id = %s
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.db.execute(query, (status, status_message, status, dataset_id))
|
||||||
|
|
||||||
|
def get_dataset_status(self, dataset_id: int):
|
||||||
|
query = """
|
||||||
|
SELECT status, status_message, completed_at
|
||||||
|
FROM datasets
|
||||||
|
WHERE id = %s
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = self.db.execute(query, (dataset_id, ), fetch=True)
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
print(result)
|
||||||
|
raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist")
|
||||||
|
|
||||||
|
return result[0]
|
||||||
@@ -27,19 +27,21 @@ class PostgresConnector:
|
|||||||
self.connection.autocommit = False
|
self.connection.autocommit = False
|
||||||
|
|
||||||
def execute(self, query, params=None, fetch=False) -> list:
|
def execute(self, query, params=None, fetch=False) -> list:
|
||||||
|
try:
|
||||||
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
||||||
cursor.execute(query, params)
|
cursor.execute(query, params)
|
||||||
if fetch:
|
result = cursor.fetchall() if fetch else None
|
||||||
return cursor.fetchall()
|
|
||||||
self.connection.commit()
|
self.connection.commit()
|
||||||
|
return result
|
||||||
|
except Exception:
|
||||||
|
self.connection.rollback()
|
||||||
|
raise
|
||||||
|
|
||||||
def execute_batch(self, query, values):
|
def execute_batch(self, query, values):
|
||||||
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
||||||
execute_batch(cursor, query, values)
|
execute_batch(cursor, query, values)
|
||||||
self.connection.commit()
|
self.connection.commit()
|
||||||
|
|
||||||
|
|
||||||
## User Management Methods
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self.connection:
|
if self.connection:
|
||||||
self.connection.close()
|
self.connection.close()
|
||||||
@@ -11,9 +11,19 @@ CREATE TABLE datasets (
|
|||||||
user_id INTEGER NOT NULL,
|
user_id INTEGER NOT NULL,
|
||||||
name VARCHAR(255) NOT NULL,
|
name VARCHAR(255) NOT NULL,
|
||||||
description TEXT,
|
description TEXT,
|
||||||
|
|
||||||
|
-- Job state machine
|
||||||
|
status TEXT NOT NULL DEFAULT 'processing',
|
||||||
|
status_message TEXT,
|
||||||
|
completed_at TIMESTAMP,
|
||||||
|
|
||||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
topics JSONB,
|
topics JSONB,
|
||||||
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
|
||||||
|
|
||||||
|
-- Enforce valid states
|
||||||
|
CONSTRAINT datasets_status_check
|
||||||
|
CHECK (status IN ('processing', 'complete', 'error'))
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE events (
|
CREATE TABLE events (
|
||||||
@@ -30,7 +40,10 @@ CREATE TABLE events (
|
|||||||
hour INTEGER NOT NULL,
|
hour INTEGER NOT NULL,
|
||||||
weekday VARCHAR(255) NOT NULL,
|
weekday VARCHAR(255) NOT NULL,
|
||||||
|
|
||||||
/* Comments and Replies */
|
/* Posts Only */
|
||||||
|
title VARCHAR(255),
|
||||||
|
|
||||||
|
/* Comments Only*/
|
||||||
parent_id VARCHAR(255),
|
parent_id VARCHAR(255),
|
||||||
reply_to VARCHAR(255),
|
reply_to VARCHAR(255),
|
||||||
source VARCHAR(255) NOT NULL,
|
source VARCHAR(255) NOT NULL,
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
class NotAuthorisedException(Exception):
|
class NotAuthorisedException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class NotExistentDatasetException(Exception):
|
class NonExistentDatasetException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class DatabaseNotConfiguredException(Exception):
|
class DatabaseNotConfiguredException(Exception):
|
||||||
|
|||||||
16
server/queue/celery_app.py
Normal file
16
server/queue/celery_app.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
from celery import Celery
|
||||||
|
|
||||||
|
def create_celery():
|
||||||
|
celery = Celery(
|
||||||
|
"ethnograph",
|
||||||
|
broker="redis://redis:6379/0",
|
||||||
|
backend="redis://redis:6379/0",
|
||||||
|
)
|
||||||
|
celery.conf.task_serializer = "json"
|
||||||
|
celery.conf.result_serializer = "json"
|
||||||
|
celery.conf.accept_content = ["json"]
|
||||||
|
return celery
|
||||||
|
|
||||||
|
celery = create_celery()
|
||||||
|
|
||||||
|
from server.queue import tasks
|
||||||
24
server/queue/tasks.py
Normal file
24
server/queue/tasks.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from server.queue.celery_app import celery
|
||||||
|
from server.analysis.enrichment import DatasetEnrichment
|
||||||
|
|
||||||
|
@celery.task(bind=True, max_retries=3)
|
||||||
|
def process_dataset(self, dataset_id: int, posts: list, topics: dict):
|
||||||
|
|
||||||
|
try:
|
||||||
|
from server.db.database import PostgresConnector
|
||||||
|
from server.core.datasets import DatasetManager
|
||||||
|
|
||||||
|
db = PostgresConnector()
|
||||||
|
dataset_manager = DatasetManager(db)
|
||||||
|
|
||||||
|
df = pd.DataFrame(posts)
|
||||||
|
|
||||||
|
processor = DatasetEnrichment(df, topics)
|
||||||
|
enriched_df = processor.enrich()
|
||||||
|
|
||||||
|
dataset_manager.save_dataset_content(dataset_id, enriched_df)
|
||||||
|
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
|
||||||
|
except Exception as e:
|
||||||
|
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")
|
||||||
Reference in New Issue
Block a user