diff --git a/db/database.py b/db/database.py index c9e60ed..cde9f7b 100644 --- a/db/database.py +++ b/db/database.py @@ -2,6 +2,7 @@ import os import psycopg2 import pandas as pd from psycopg2.extras import RealDictCursor +from psycopg2.extras import execute_batch, Json class PostgresConnector: @@ -19,14 +20,14 @@ class PostgresConnector: ) self.connection.autocommit = False - def execute(self, query, params=None, fetch=False): + def execute(self, query, params=None, fetch=False) -> list: with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) if fetch: return cursor.fetchall() self.connection.commit() - def executemany(self, query, param_list): + def executemany(self, query, param_list) -> list: with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: cursor.executemany(query, param_list) self.connection.commit() @@ -50,12 +51,84 @@ class PostgresConnector: return result[0] if result else None # Dataset Management Methods - def save_dataset(self, user_id: int, dataset_name: str, dataset_content: pd.DataFrame, topics: dict): + def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int: query = """ - INSERT INTO datasets (user_id, name, content, topics) - VALUES (%s, %s, %s, %s) + INSERT INTO datasets (user_id, name, topics) + VALUES (%s, %s, %s) + RETURNING id """ - self.execute(query, (user_id, dataset_name, dataset_content.to_json(orient="records"), topics)) + result = self.execute(query, (user_id, dataset_name, Json(topics)), fetch=True) + return result[0]["id"] if result else None + + def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame): + query = """ + INSERT INTO events ( + dataset_id, + parent_id, + author, + content, + timestamp, + date, + dt, + hour, + weekday, + reply_to, + source, + topic, + topic_confidence, + ner_entities, + emotion_anger, + emotion_disgust, + emotion_fear, + emotion_joy, + emotion_sadness + ) + VALUES ( + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, %s + ) + """ + + values = [] + + for _, row in event_data.iterrows(): + values.append(( + dataset_id, + row["parent_id"], + row["author"], + row["content"], + row["timestamp"], + row["date"], + row["dt"], + row["hour"], + row["weekday"], + row.get("reply_to"), + row["source"], + row.get("topic"), + row.get("topic_confidence"), + Json(row["ner_entities"]) if row.get("ner_entities") else None, + row.get("emotion_anger"), + row.get("emotion_disgust"), + row.get("emotion_fear"), + row.get("emotion_joy"), + row.get("emotion_sadness"), + )) + + + with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: + execute_batch(cursor, query, values) + self.connection.commit() + + def get_dataset_by_id(self, dataset_id: int) -> pd.DataFrame: + query = "SELECT * FROM events WHERE dataset_id = %s" + result = self.execute(query, (dataset_id,), fetch=True) + return pd.DataFrame(result) + + def get_datasets_for_user(self, user_id: int) -> list: + query = "SELECT * FROM datasets WHERE user_id = %s" + return self.execute(query, (user_id,), fetch=True) def close(self): if self.connection: diff --git a/db/schema.sql b/db/schema.sql index 1a0c075..4ad2ac6 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -17,16 +17,24 @@ CREATE TABLE datasets ( ); CREATE TABLE events ( + /* Required Fields */ id SERIAL PRIMARY KEY, dataset_id INTEGER NOT NULL, - parent_post_id INTEGER NOT NULL, author VARCHAR(255) NOT NULL, content TEXT NOT NULL, - created_at TIMESTAMP NOT NULL, + timestamp BIGINT NOT NULL, + date DATE NOT NULL, + dt TIMESTAMP NOT NULL, + hour INTEGER NOT NULL, + weekday VARCHAR(255) NOT NULL, + + /* Comments and Replies */ + parent_id VARCHAR(255), reply_to VARCHAR(255), source VARCHAR(255) NOT NULL, + /* NLP Fields */ topic VARCHAR(255), topic_confidence FLOAT, @@ -38,6 +46,5 @@ CREATE TABLE events ( emotion_joy FLOAT, emotion_sadness FLOAT, - FOREIGN KEY (parent_post_id) REFERENCES events(id) ON DELETE CASCADE, FOREIGN KEY (dataset_id) REFERENCES datasets(id) ON DELETE CASCADE ); \ No newline at end of file diff --git a/server/app.py b/server/app.py index d032484..9ec0db4 100644 --- a/server/app.py +++ b/server/app.py @@ -12,6 +12,7 @@ from flask_jwt_extended import ( ) from server.stat_gen import StatGen +from server.dataset_processor import DatasetProcessor from db.database import PostgresConnector from server.auth import AuthManager @@ -99,6 +100,7 @@ def profile(): @app.route('/upload', methods=['POST']) +@jwt_required() def upload_data(): if "posts" not in request.files or "topics" not in request.files: return jsonify({"error": "Missing required files or form data"}), 400 @@ -113,18 +115,24 @@ def upload_data(): return jsonify({"error": "Invalid file type. Only .jsonl and .json files are allowed."}), 400 try: - global stat_obj + current_user = get_jwt_identity() - posts_df = pd.read_json(post_file, lines=True) - stat_obj = StatGen(posts_df, json.load(topic_file)) - return jsonify({"message": "File uploaded successfully", "event_count": len(stat_obj.df)}), 200 + posts_df = pd.read_json(post_file, lines=True, convert_dates=False) + topics = json.load(topic_file) + + processor = DatasetProcessor(posts_df, topics) + enriched_df = processor.enrich() + dataset_id = db.save_dataset_info(current_user, f"dataset_{current_user}", topics) + db.save_dataset_content(dataset_id, enriched_df) + + return jsonify({"message": "File uploaded successfully", "event_count": len(enriched_df)}), 200 except ValueError as e: return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400 except Exception as e: return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 -@app.route('/dataset', methods=['GET']) -def get_dataset(): +@app.route('/dataset/', methods=['GET']) +def get_dataset(dataset_id): if stat_obj is None: return jsonify({"error": "No data uploaded"}), 400 diff --git a/server/dataset_processor.py b/server/dataset_processor.py index 880331e..37e94da 100644 --- a/server/dataset_processor.py +++ b/server/dataset_processor.py @@ -26,7 +26,7 @@ class DatasetProcessor: return df def enrich(self) -> pd.DataFrame: - self.df['timestamp'] = pd.to_numeric(self.df['timestamp'], errors='coerce') + self.df['timestamp'] = pd.to_numeric(self.df['timestamp'], errors='raise') self.df['date'] = pd.to_datetime(self.df['timestamp'], unit='s').dt.date self.df["dt"] = pd.to_datetime(self.df["timestamp"], unit="s", utc=True) self.df["hour"] = self.df["dt"].dt.hour