144 lines
4.5 KiB
Python
144 lines
4.5 KiB
Python
import pandas as pd
|
|
from server.db.database import PostgresConnector
|
|
from psycopg2.extras import Json
|
|
from server.exceptions import NotAuthorisedException, NonExistentDatasetException
|
|
|
|
class DatasetManager:
|
|
def __init__(self, db: PostgresConnector):
|
|
self.db = db
|
|
|
|
def authorize_user_dataset(self, dataset_id: int, user_id: int) -> bool:
|
|
dataset_info = self.get_dataset_info(dataset_id)
|
|
|
|
if dataset_info.get("user_id", None) == None:
|
|
return False
|
|
|
|
if dataset_info.get("user_id") != user_id:
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_user_datasets(self, user_id: int) -> list[dict]:
|
|
query = "SELECT * FROM datasets WHERE user_id = %s"
|
|
return self.db.execute(query, (user_id, ), fetch=True)
|
|
|
|
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
|
|
query = "SELECT * FROM events WHERE dataset_id = %s"
|
|
result = self.db.execute(query, (dataset_id,), fetch=True)
|
|
return pd.DataFrame(result)
|
|
|
|
def get_dataset_info(self, dataset_id: int) -> dict:
|
|
query = "SELECT * FROM datasets WHERE id = %s"
|
|
result = self.db.execute(query, (dataset_id,), fetch=True)
|
|
|
|
if not result:
|
|
raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist")
|
|
|
|
return result[0]
|
|
|
|
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
|
|
query = """
|
|
INSERT INTO datasets (user_id, name, topics)
|
|
VALUES (%s, %s, %s)
|
|
RETURNING id
|
|
"""
|
|
result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
|
|
return result[0]["id"] if result else None
|
|
|
|
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
|
|
if event_data.empty:
|
|
return
|
|
|
|
query = """
|
|
INSERT INTO events (
|
|
dataset_id,
|
|
type,
|
|
parent_id,
|
|
author,
|
|
title,
|
|
content,
|
|
timestamp,
|
|
date,
|
|
dt,
|
|
hour,
|
|
weekday,
|
|
reply_to,
|
|
source,
|
|
topic,
|
|
topic_confidence,
|
|
ner_entities,
|
|
emotion_anger,
|
|
emotion_disgust,
|
|
emotion_fear,
|
|
emotion_joy,
|
|
emotion_sadness
|
|
)
|
|
VALUES (
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s
|
|
)
|
|
"""
|
|
|
|
values = [
|
|
(
|
|
dataset_id,
|
|
row["type"],
|
|
row["parent_id"],
|
|
row["author"],
|
|
row.get("title"),
|
|
row["content"],
|
|
row["timestamp"],
|
|
row["date"],
|
|
row["dt"],
|
|
row["hour"],
|
|
row["weekday"],
|
|
row.get("reply_to"),
|
|
row["source"],
|
|
row.get("topic"),
|
|
row.get("topic_confidence"),
|
|
Json(row["ner_entities"]) if row.get("ner_entities") else None,
|
|
row.get("emotion_anger"),
|
|
row.get("emotion_disgust"),
|
|
row.get("emotion_fear"),
|
|
row.get("emotion_joy"),
|
|
row.get("emotion_sadness"),
|
|
)
|
|
for _, row in event_data.iterrows()
|
|
]
|
|
|
|
self.db.execute_batch(query, values)
|
|
|
|
def set_dataset_status(self, dataset_id: int, status: str, status_message: str | None = None):
|
|
if status not in ["processing", "complete", "error"]:
|
|
raise ValueError("Invalid status")
|
|
|
|
query = """
|
|
UPDATE datasets
|
|
SET status = %s,
|
|
status_message = %s,
|
|
completed_at = CASE
|
|
WHEN %s = 'complete' THEN NOW()
|
|
ELSE NULL
|
|
END
|
|
WHERE id = %s
|
|
"""
|
|
|
|
self.db.execute(query, (status, status_message, status, dataset_id))
|
|
|
|
def get_dataset_status(self, dataset_id: int):
|
|
query = """
|
|
SELECT status, status_message, completed_at
|
|
FROM datasets
|
|
WHERE id = %s
|
|
"""
|
|
|
|
result = self.db.execute(query, (dataset_id, ), fetch=True)
|
|
|
|
if not result:
|
|
print(result)
|
|
raise NonExistentDatasetException(f"Dataset {dataset_id} does not exist")
|
|
|
|
return result[0] |