Implement job queue for asynchronous NLP #6

Merged
dylan merged 14 commits from feat/implement-job-queue into main 2026-03-03 14:26:38 +00:00
2 changed files with 14 additions and 15 deletions
Showing only changes of commit f93e45b827 - Show all commits

View File

@@ -3,7 +3,7 @@ import pandas as pd
from server.analysis.nlp import NLP from server.analysis.nlp import NLP
class DatasetEnrichment: class DatasetEnrichment:
def __init__(self, df, topics): def __init__(self, df: pd.DataFrame, topics: dict):
self.df = self._explode_comments(df) self.df = self._explode_comments(df)
self.topics = topics self.topics = topics
self.nlp = NLP(self.df, "title", "content", self.topics) self.nlp = NLP(self.df, "title", "content", self.topics)

View File

@@ -1,7 +1,7 @@
import pandas as pd import pandas as pd
from server.db.database import PostgresConnector from server.db.database import PostgresConnector
from psycopg2.extras import Json from psycopg2.extras import Json
from server.exceptions import NotAuthorisedException from server.exceptions import NotAuthorisedException, NonExistentDatasetException
class DatasetManager: class DatasetManager:
def __init__(self, db: PostgresConnector): def __init__(self, db: PostgresConnector):
@@ -23,21 +23,20 @@ class DatasetManager:
def get_dataset_info(self, dataset_id: int) -> dict: def get_dataset_info(self, dataset_id: int) -> dict:
query = "SELECT * FROM datasets WHERE id = %s" query = "SELECT * FROM datasets WHERE id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True) result = self.db.execute(query, (dataset_id,), fetch=True)
return result[0] if result else None
if not result:
raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist")
return result[0]
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int: def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
query = """ query = """
INSERT INTO datasets (user_id, name, topics) INSERT INTO datasets (user_id, name, topics)
VALUES (%s, %s, %s) VALUES (%s, %s, %s)
RETURNING id RETURNING id
""" """
result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True) result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
return result[0]["id"] if result else None return result[0]["id"] if result else None
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True)
return pd.DataFrame(result)
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame): def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
if event_data.empty: if event_data.empty: