Storage of user data and datasets in PostGreSQL #2

Merged
dylan merged 19 commits from feat/database-integration into main 2026-03-01 16:47:25 +00:00
4 changed files with 104 additions and 16 deletions
Showing only changes of commit 5fb7710dc2 - Show all commits

View File

@@ -2,6 +2,7 @@ import os
import psycopg2
import pandas as pd
from psycopg2.extras import RealDictCursor
from psycopg2.extras import execute_batch, Json
class PostgresConnector:
@@ -19,14 +20,14 @@ class PostgresConnector:
)
self.connection.autocommit = False
def execute(self, query, params=None, fetch=False):
def execute(self, query, params=None, fetch=False) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
if fetch:
return cursor.fetchall()
self.connection.commit()
def executemany(self, query, param_list):
def executemany(self, query, param_list) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.executemany(query, param_list)
self.connection.commit()
@@ -50,12 +51,84 @@ class PostgresConnector:
return result[0] if result else None
# Dataset Management Methods
def save_dataset(self, user_id: int, dataset_name: str, dataset_content: pd.DataFrame, topics: dict):
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
query = """
INSERT INTO datasets (user_id, name, content, topics)
VALUES (%s, %s, %s, %s)
INSERT INTO datasets (user_id, name, topics)
VALUES (%s, %s, %s)
RETURNING id
"""
self.execute(query, (user_id, dataset_name, dataset_content.to_json(orient="records"), topics))
result = self.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
return result[0]["id"] if result else None
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
query = """
INSERT INTO events (
dataset_id,
parent_id,
author,
content,
timestamp,
date,
dt,
hour,
weekday,
reply_to,
source,
topic,
topic_confidence,
ner_entities,
emotion_anger,
emotion_disgust,
emotion_fear,
emotion_joy,
emotion_sadness
)
VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s
)
"""
values = []
for _, row in event_data.iterrows():
values.append((
dataset_id,
row["parent_id"],
row["author"],
row["content"],
row["timestamp"],
row["date"],
row["dt"],
row["hour"],
row["weekday"],
row.get("reply_to"),
row["source"],
row.get("topic"),
row.get("topic_confidence"),
Json(row["ner_entities"]) if row.get("ner_entities") else None,
row.get("emotion_anger"),
row.get("emotion_disgust"),
row.get("emotion_fear"),
row.get("emotion_joy"),
row.get("emotion_sadness"),
))
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
execute_batch(cursor, query, values)
self.connection.commit()
def get_dataset_by_id(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.execute(query, (dataset_id,), fetch=True)
return pd.DataFrame(result)
def get_datasets_for_user(self, user_id: int) -> list:
query = "SELECT * FROM datasets WHERE user_id = %s"
return self.execute(query, (user_id,), fetch=True)
def close(self):
if self.connection:

View File

@@ -17,16 +17,24 @@ CREATE TABLE datasets (
);
CREATE TABLE events (
/* Required Fields */
id SERIAL PRIMARY KEY,
dataset_id INTEGER NOT NULL,
parent_post_id INTEGER NOT NULL,
author VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
timestamp BIGINT NOT NULL,
date DATE NOT NULL,
dt TIMESTAMP NOT NULL,
hour INTEGER NOT NULL,
weekday VARCHAR(255) NOT NULL,
/* Comments and Replies */
parent_id VARCHAR(255),
reply_to VARCHAR(255),
source VARCHAR(255) NOT NULL,
/* NLP Fields */
topic VARCHAR(255),
topic_confidence FLOAT,
@@ -38,6 +46,5 @@ CREATE TABLE events (
emotion_joy FLOAT,
emotion_sadness FLOAT,
FOREIGN KEY (parent_post_id) REFERENCES events(id) ON DELETE CASCADE,
FOREIGN KEY (dataset_id) REFERENCES datasets(id) ON DELETE CASCADE
);

View File

@@ -12,6 +12,7 @@ from flask_jwt_extended import (
)
from server.stat_gen import StatGen
from server.dataset_processor import DatasetProcessor
from db.database import PostgresConnector
from server.auth import AuthManager
@@ -99,6 +100,7 @@ def profile():
@app.route('/upload', methods=['POST'])
@jwt_required()
def upload_data():
if "posts" not in request.files or "topics" not in request.files:
return jsonify({"error": "Missing required files or form data"}), 400
@@ -113,18 +115,24 @@ def upload_data():
return jsonify({"error": "Invalid file type. Only .jsonl and .json files are allowed."}), 400
try:
global stat_obj
current_user = get_jwt_identity()
posts_df = pd.read_json(post_file, lines=True)
stat_obj = StatGen(posts_df, json.load(topic_file))
return jsonify({"message": "File uploaded successfully", "event_count": len(stat_obj.df)}), 200
posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
topics = json.load(topic_file)
processor = DatasetProcessor(posts_df, topics)
enriched_df = processor.enrich()
dataset_id = db.save_dataset_info(current_user, f"dataset_{current_user}", topics)
db.save_dataset_content(dataset_id, enriched_df)
return jsonify({"message": "File uploaded successfully", "event_count": len(enriched_df)}), 200
except ValueError as e:
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
@app.route('/dataset', methods=['GET'])
def get_dataset():
@app.route('/dataset/<int:dataset_id>', methods=['GET'])
def get_dataset(dataset_id):
if stat_obj is None:
return jsonify({"error": "No data uploaded"}), 400

View File

@@ -26,7 +26,7 @@ class DatasetProcessor:
return df
def enrich(self) -> pd.DataFrame:
self.df['timestamp'] = pd.to_numeric(self.df['timestamp'], errors='coerce')
self.df['timestamp'] = pd.to_numeric(self.df['timestamp'], errors='raise')
self.df['date'] = pd.to_datetime(self.df['timestamp'], unit='s').dt.date
self.df["dt"] = pd.to_datetime(self.df["timestamp"], unit="s", utc=True)
self.df["hour"] = self.df["dt"].dt.hour