Compare commits
7 Commits
d73f4f1c45
...
d3c4d883be
| Author | SHA1 | Date | |
|---|---|---|---|
| d3c4d883be | |||
| a975f3bdba | |||
| 5fb7710dc2 | |||
| 0be9ff4896 | |||
| 2493c6d465 | |||
| d0a01c8d2f | |||
| 41f10c18cf |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -9,3 +9,5 @@ __pycache__/
|
|||||||
# React App Vite
|
# React App Vite
|
||||||
node_modules/
|
node_modules/
|
||||||
dist/
|
dist/
|
||||||
|
|
||||||
|
*.sh
|
||||||
@@ -1,6 +1,8 @@
|
|||||||
import os
|
import os
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
import pandas as pd
|
||||||
from psycopg2.extras import RealDictCursor
|
from psycopg2.extras import RealDictCursor
|
||||||
|
from psycopg2.extras import execute_batch, Json
|
||||||
|
|
||||||
|
|
||||||
class PostgresConnector:
|
class PostgresConnector:
|
||||||
@@ -18,18 +20,19 @@ class PostgresConnector:
|
|||||||
)
|
)
|
||||||
self.connection.autocommit = False
|
self.connection.autocommit = False
|
||||||
|
|
||||||
def execute(self, query, params=None, fetch=False):
|
def execute(self, query, params=None, fetch=False) -> list:
|
||||||
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
||||||
cursor.execute(query, params)
|
cursor.execute(query, params)
|
||||||
if fetch:
|
if fetch:
|
||||||
return cursor.fetchall()
|
return cursor.fetchall()
|
||||||
self.connection.commit()
|
self.connection.commit()
|
||||||
|
|
||||||
def executemany(self, query, param_list):
|
def executemany(self, query, param_list) -> list:
|
||||||
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
||||||
cursor.executemany(query, param_list)
|
cursor.executemany(query, param_list)
|
||||||
self.connection.commit()
|
self.connection.commit()
|
||||||
|
|
||||||
|
## User Management Methods
|
||||||
def save_user(self, username, email, password_hash):
|
def save_user(self, username, email, password_hash):
|
||||||
query = """
|
query = """
|
||||||
INSERT INTO users (username, email, password_hash)
|
INSERT INTO users (username, email, password_hash)
|
||||||
@@ -47,6 +50,86 @@ class PostgresConnector:
|
|||||||
result = self.execute(query, (email,), fetch=True)
|
result = self.execute(query, (email,), fetch=True)
|
||||||
return result[0] if result else None
|
return result[0] if result else None
|
||||||
|
|
||||||
|
# Dataset Management Methods
|
||||||
|
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
|
||||||
|
query = """
|
||||||
|
INSERT INTO datasets (user_id, name, topics)
|
||||||
|
VALUES (%s, %s, %s)
|
||||||
|
RETURNING id
|
||||||
|
"""
|
||||||
|
result = self.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
|
||||||
|
return result[0]["id"] if result else None
|
||||||
|
|
||||||
|
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
|
||||||
|
query = """
|
||||||
|
INSERT INTO events (
|
||||||
|
dataset_id,
|
||||||
|
parent_id,
|
||||||
|
author,
|
||||||
|
content,
|
||||||
|
timestamp,
|
||||||
|
date,
|
||||||
|
dt,
|
||||||
|
hour,
|
||||||
|
weekday,
|
||||||
|
reply_to,
|
||||||
|
source,
|
||||||
|
topic,
|
||||||
|
topic_confidence,
|
||||||
|
ner_entities,
|
||||||
|
emotion_anger,
|
||||||
|
emotion_disgust,
|
||||||
|
emotion_fear,
|
||||||
|
emotion_joy,
|
||||||
|
emotion_sadness
|
||||||
|
)
|
||||||
|
VALUES (
|
||||||
|
%s, %s, %s, %s, %s,
|
||||||
|
%s, %s, %s, %s, %s,
|
||||||
|
%s, %s, %s, %s, %s,
|
||||||
|
%s, %s, %s, %s
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
|
||||||
|
values = []
|
||||||
|
|
||||||
|
for _, row in event_data.iterrows():
|
||||||
|
values.append((
|
||||||
|
dataset_id,
|
||||||
|
row["parent_id"],
|
||||||
|
row["author"],
|
||||||
|
row["content"],
|
||||||
|
row["timestamp"],
|
||||||
|
row["date"],
|
||||||
|
row["dt"],
|
||||||
|
row["hour"],
|
||||||
|
row["weekday"],
|
||||||
|
row.get("reply_to"),
|
||||||
|
row["source"],
|
||||||
|
row.get("topic"),
|
||||||
|
row.get("topic_confidence"),
|
||||||
|
Json(row["ner_entities"]) if row.get("ner_entities") else None,
|
||||||
|
row.get("emotion_anger"),
|
||||||
|
row.get("emotion_disgust"),
|
||||||
|
row.get("emotion_fear"),
|
||||||
|
row.get("emotion_joy"),
|
||||||
|
row.get("emotion_sadness"),
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
||||||
|
execute_batch(cursor, query, values)
|
||||||
|
self.connection.commit()
|
||||||
|
|
||||||
|
def get_dataset_by_id(self, dataset_id: int) -> pd.DataFrame:
|
||||||
|
query = "SELECT * FROM events WHERE dataset_id = %s"
|
||||||
|
result = self.execute(query, (dataset_id,), fetch=True)
|
||||||
|
return pd.DataFrame(result)
|
||||||
|
|
||||||
|
def get_datasets_for_user(self, user_id: int) -> list:
|
||||||
|
query = "SELECT * FROM datasets WHERE user_id = %s"
|
||||||
|
return self.execute(query, (user_id,), fetch=True)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self.connection:
|
if self.connection:
|
||||||
self.connection.close()
|
self.connection.close()
|
||||||
@@ -6,30 +6,45 @@ CREATE TABLE users (
|
|||||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE has_access (
|
CREATE TABLE datasets (
|
||||||
id SERIAL PRIMARY KEY,
|
id SERIAL PRIMARY KEY,
|
||||||
user_id INTEGER NOT NULL,
|
user_id INTEGER NOT NULL,
|
||||||
post_id INTEGER NOT NULL,
|
name VARCHAR(255) NOT NULL,
|
||||||
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
|
description TEXT,
|
||||||
FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
topics JSONB,
|
||||||
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE posts (
|
CREATE TABLE events (
|
||||||
|
/* Required Fields */
|
||||||
id SERIAL PRIMARY KEY,
|
id SERIAL PRIMARY KEY,
|
||||||
author VARCHAR(255) NOT NULL,
|
dataset_id INTEGER NOT NULL,
|
||||||
title VARCHAR(255) NOT NULL,
|
|
||||||
content TEXT NOT NULL,
|
|
||||||
created_at TIMESTAMP NOT NULL,
|
|
||||||
source VARCHAR(255) NOT NULL
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE comments (
|
|
||||||
id SERIAL PRIMARY KEY,
|
|
||||||
post_id INTEGER NOT NULL,
|
|
||||||
author VARCHAR(255) NOT NULL,
|
author VARCHAR(255) NOT NULL,
|
||||||
content TEXT NOT NULL,
|
content TEXT NOT NULL,
|
||||||
created_at TIMESTAMP NOT NULL,
|
timestamp BIGINT NOT NULL,
|
||||||
|
date DATE NOT NULL,
|
||||||
|
dt TIMESTAMP NOT NULL,
|
||||||
|
hour INTEGER NOT NULL,
|
||||||
|
weekday VARCHAR(255) NOT NULL,
|
||||||
|
|
||||||
|
/* Comments and Replies */
|
||||||
|
parent_id VARCHAR(255),
|
||||||
reply_to VARCHAR(255),
|
reply_to VARCHAR(255),
|
||||||
source VARCHAR(255) NOT NULL,
|
source VARCHAR(255) NOT NULL,
|
||||||
FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE
|
|
||||||
|
/* NLP Fields */
|
||||||
|
topic VARCHAR(255),
|
||||||
|
topic_confidence FLOAT,
|
||||||
|
|
||||||
|
ner_entities JSONB,
|
||||||
|
|
||||||
|
emotion_anger FLOAT,
|
||||||
|
emotion_disgust FLOAT,
|
||||||
|
emotion_fear FLOAT,
|
||||||
|
emotion_joy FLOAT,
|
||||||
|
emotion_sadness FLOAT,
|
||||||
|
|
||||||
|
FOREIGN KEY (dataset_id) REFERENCES datasets(id) ON DELETE CASCADE
|
||||||
);
|
);
|
||||||
@@ -12,6 +12,7 @@ from flask_jwt_extended import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from server.stat_gen import StatGen
|
from server.stat_gen import StatGen
|
||||||
|
from server.dataset_processor import DatasetProcessor
|
||||||
from db.database import PostgresConnector
|
from db.database import PostgresConnector
|
||||||
from server.auth import AuthManager
|
from server.auth import AuthManager
|
||||||
|
|
||||||
@@ -99,6 +100,7 @@ def profile():
|
|||||||
|
|
||||||
|
|
||||||
@app.route('/upload', methods=['POST'])
|
@app.route('/upload', methods=['POST'])
|
||||||
|
@jwt_required()
|
||||||
def upload_data():
|
def upload_data():
|
||||||
if "posts" not in request.files or "topics" not in request.files:
|
if "posts" not in request.files or "topics" not in request.files:
|
||||||
return jsonify({"error": "Missing required files or form data"}), 400
|
return jsonify({"error": "Missing required files or form data"}), 400
|
||||||
@@ -113,18 +115,24 @@ def upload_data():
|
|||||||
return jsonify({"error": "Invalid file type. Only .jsonl and .json files are allowed."}), 400
|
return jsonify({"error": "Invalid file type. Only .jsonl and .json files are allowed."}), 400
|
||||||
|
|
||||||
try:
|
try:
|
||||||
global stat_obj
|
current_user = get_jwt_identity()
|
||||||
|
|
||||||
posts_df = pd.read_json(post_file, lines=True)
|
posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
|
||||||
stat_obj = StatGen(posts_df, json.load(topic_file))
|
topics = json.load(topic_file)
|
||||||
return jsonify({"message": "File uploaded successfully", "event_count": len(stat_obj.df)}), 200
|
|
||||||
|
processor = DatasetProcessor(posts_df, topics)
|
||||||
|
enriched_df = processor.enrich()
|
||||||
|
dataset_id = db.save_dataset_info(current_user, f"dataset_{current_user}", topics)
|
||||||
|
db.save_dataset_content(dataset_id, enriched_df)
|
||||||
|
|
||||||
|
return jsonify({"message": "File uploaded successfully", "event_count": len(enriched_df)}), 200
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
|
return jsonify({"error": f"Failed to read JSONL file: {str(e)}"}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
|
||||||
|
|
||||||
@app.route('/dataset', methods=['GET'])
|
@app.route('/dataset/<int:dataset_id>', methods=['GET'])
|
||||||
def get_dataset():
|
def get_dataset(dataset_id):
|
||||||
if stat_obj is None:
|
if stat_obj is None:
|
||||||
return jsonify({"error": "No data uploaded"}), 400
|
return jsonify({"error": "No data uploaded"}), 400
|
||||||
|
|
||||||
|
|||||||
39
server/dataset_processor.py
Normal file
39
server/dataset_processor.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from server.analysis.nlp import NLP
|
||||||
|
|
||||||
|
class DatasetProcessor:
|
||||||
|
def __init__(self, df, topics):
|
||||||
|
self.df = self._explode_comments(df)
|
||||||
|
self.topics = topics
|
||||||
|
self.nlp = NLP(self.df, "title", "content", self.topics)
|
||||||
|
|
||||||
|
def _explode_comments(self, df) -> pd.DataFrame:
|
||||||
|
comments_df = df[["id", "comments"]].explode("comments")
|
||||||
|
comments_df = comments_df[comments_df["comments"].apply(lambda x: isinstance(x, dict))]
|
||||||
|
comments_df = pd.json_normalize(comments_df["comments"])
|
||||||
|
|
||||||
|
posts_df = df.drop(columns=["comments"])
|
||||||
|
posts_df["type"] = "post"
|
||||||
|
posts_df["parent_id"] = None
|
||||||
|
|
||||||
|
comments_df["type"] = "comment"
|
||||||
|
comments_df["parent_id"] = comments_df.get("post_id")
|
||||||
|
|
||||||
|
df = pd.concat([posts_df, comments_df])
|
||||||
|
df.drop(columns=["post_id"], inplace=True, errors="ignore")
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
def enrich(self) -> pd.DataFrame:
|
||||||
|
self.df['timestamp'] = pd.to_numeric(self.df['timestamp'], errors='raise')
|
||||||
|
self.df['date'] = pd.to_datetime(self.df['timestamp'], unit='s').dt.date
|
||||||
|
self.df["dt"] = pd.to_datetime(self.df["timestamp"], unit="s", utc=True)
|
||||||
|
self.df["hour"] = self.df["dt"].dt.hour
|
||||||
|
self.df["weekday"] = self.df["dt"].dt.day_name()
|
||||||
|
|
||||||
|
self.nlp.add_emotion_cols()
|
||||||
|
self.nlp.add_topic_col()
|
||||||
|
self.nlp.add_ner_cols()
|
||||||
|
|
||||||
|
return self.df
|
||||||
Reference in New Issue
Block a user