Compare commits

...

7 Commits

5 changed files with 172 additions and 25 deletions

2
.gitignore vendored
View File

@@ -9,3 +9,5 @@ __pycache__/
# React App Vite
node_modules/
dist/
*.sh

View File

@@ -1,6 +1,8 @@
import os
import psycopg2
import pandas as pd
from psycopg2.extras import RealDictCursor
from psycopg2.extras import execute_batch, Json
class PostgresConnector:
@@ -18,18 +20,19 @@ class PostgresConnector:
)
self.connection.autocommit = False
def execute(self, query, params=None, fetch=False):
def execute(self, query, params=None, fetch=False) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
if fetch:
return cursor.fetchall()
self.connection.commit()
def executemany(self, query, param_list):
def executemany(self, query, param_list) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.executemany(query, param_list)
self.connection.commit()
## User Management Methods
def save_user(self, username, email, password_hash):
query = """
INSERT INTO users (username, email, password_hash)
@@ -47,6 +50,86 @@ class PostgresConnector:
result = self.execute(query, (email,), fetch=True)
return result[0] if result else None
# Dataset Management Methods
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
query = """
INSERT INTO datasets (user_id, name, topics)
VALUES (%s, %s, %s)
RETURNING id
"""
result = self.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
return result[0]["id"] if result else None
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
query = """
INSERT INTO events (
dataset_id,
parent_id,
author,
content,
timestamp,
date,
dt,
hour,
weekday,
reply_to,
source,
topic,
topic_confidence,
ner_entities,
emotion_anger,
emotion_disgust,
emotion_fear,
emotion_joy,
emotion_sadness
)
VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s
)
"""
values = []
for _, row in event_data.iterrows():
values.append((
dataset_id,
row["parent_id"],
row["author"],
row["content"],
row["timestamp"],
row["date"],
row["dt"],
row["hour"],
row["weekday"],
row.get("reply_to"),
row["source"],
row.get("topic"),
row.get("topic_confidence"),
Json(row["ner_entities"]) if row.get("ner_entities") else None,
row.get("emotion_anger"),
row.get("emotion_disgust"),
row.get("emotion_fear"),
row.get("emotion_joy"),
row.get("emotion_sadness"),
))
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
execute_batch(cursor, query, values)
self.connection.commit()
def get_dataset_by_id(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.execute(query, (dataset_id,), fetch=True)
return pd.DataFrame(result)
def get_datasets_for_user(self, user_id: int) -> list:
query = "SELECT * FROM datasets WHERE user_id = %s"
return self.execute(query, (user_id,), fetch=True)
def close(self):
if self.connection:
self.connection.close()

View File

@@ -6,30 +6,45 @@ CREATE TABLE users (
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE has_access (
CREATE TABLE datasets (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
post_id INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE
name VARCHAR(255) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
topics JSONB,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE posts (
CREATE TABLE events (
/* Required Fields */
id SERIAL PRIMARY KEY,
author VARCHAR(255) NOT NULL,
title VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
source VARCHAR(255) NOT NULL
);
dataset_id INTEGER NOT NULL,
CREATE TABLE comments (
id SERIAL PRIMARY KEY,
post_id INTEGER NOT NULL,
author VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
timestamp BIGINT NOT NULL,
date DATE NOT NULL,
dt TIMESTAMP NOT NULL,
hour INTEGER NOT NULL,
weekday VARCHAR(255) NOT NULL,
/* Comments and Replies */
parent_id VARCHAR(255),
reply_to VARCHAR(255),
source VARCHAR(255) NOT NULL,
FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE
/* NLP Fields */
topic VARCHAR(255),
topic_confidence FLOAT,
ner_entities JSONB,
emotion_anger FLOAT,
emotion_disgust FLOAT,
emotion_fear FLOAT,
emotion_joy FLOAT,
emotion_sadness FLOAT,
FOREIGN KEY (dataset_id) REFERENCES datasets(id) ON DELETE CASCADE
);

View File

@@ -12,6 +12,7 @@ from flask_jwt_extended import (
)
from server.stat_gen import StatGen
from server.dataset_processor import DatasetProcessor
from db.database import PostgresConnector
from server.auth import AuthManager
@@ -99,6 +100,7 @@ def profile():
@app.route('/upload', methods=['POST'])
@jwt_required()
def upload_data():
if "posts" not in request.files or "topics" not in request.files:
return jsonify({"error": "Missing required files or form data"}), 400
@@ -113,18 +115,24 @@ def upload_data():
return jsonify({"error": "Invalid file type. Only .jsonl and .json files are allowed."}), 400
try:
global stat_obj
current_user = get_jwt_identity()
posts_df = pd.read_json(post_file, lines=True)
stat_obj = StatGen(posts_df, json.load(topic_file))
return jsonify({"message": "File uploaded successfully", "event_count": len(stat_obj.df)}), 200
posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
topics = json.load(topic_file)
processor = DatasetProcessor(posts_df, topics)
enriched_df = processor.enrich()
dataset_id = db.save_dataset_info(current_user, f"dataset_{current_user}", topics)
db.save_dataset_content(dataset_id, enriched_df)
return jsonify({"message": "File uploaded successfully", "event_count": len(enriched_df)}), 200
except ValueError as e:
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
@app.route('/dataset', methods=['GET'])
def get_dataset():
@app.route('/dataset/<int:dataset_id>', methods=['GET'])
def get_dataset(dataset_id):
if stat_obj is None:
return jsonify({"error": "No data uploaded"}), 400

View File

@@ -0,0 +1,39 @@
import pandas as pd
from server.analysis.nlp import NLP
class DatasetProcessor:
def __init__(self, df, topics):
self.df = self._explode_comments(df)
self.topics = topics
self.nlp = NLP(self.df, "title", "content", self.topics)
def _explode_comments(self, df) -> pd.DataFrame:
comments_df = df[["id", "comments"]].explode("comments")
comments_df = comments_df[comments_df["comments"].apply(lambda x: isinstance(x, dict))]
comments_df = pd.json_normalize(comments_df["comments"])
posts_df = df.drop(columns=["comments"])
posts_df["type"] = "post"
posts_df["parent_id"] = None
comments_df["type"] = "comment"
comments_df["parent_id"] = comments_df.get("post_id")
df = pd.concat([posts_df, comments_df])
df.drop(columns=["post_id"], inplace=True, errors="ignore")
return df
def enrich(self) -> pd.DataFrame:
self.df['timestamp'] = pd.to_numeric(self.df['timestamp'], errors='raise')
self.df['date'] = pd.to_datetime(self.df['timestamp'], unit='s').dt.date
self.df["dt"] = pd.to_datetime(self.df["timestamp"], unit="s", utc=True)
self.df["hour"] = self.df["dt"].dt.hour
self.df["weekday"] = self.df["dt"].dt.day_name()
self.nlp.add_emotion_cols()
self.nlp.add_topic_col()
self.nlp.add_ner_cols()
return self.df