diff --git a/db/database.py b/db/database.py deleted file mode 100644 index a9c6d41..0000000 --- a/db/database.py +++ /dev/null @@ -1,149 +0,0 @@ -import os -import psycopg2 -import pandas as pd -from psycopg2.extras import RealDictCursor -from psycopg2.extras import execute_batch, Json -from server.exceptions import NotExistentDatasetException - - -class PostgresConnector: - """ - Simple PostgreSQL connector (single connection). - """ - - def __init__(self): - self.connection = psycopg2.connect( - host=os.getenv("POSTGRES_HOST", "localhost"), - port=os.getenv("POSTGRES_PORT", 5432), - user=os.getenv("POSTGRES_USER", "postgres"), - password=os.getenv("POSTGRES_PASSWORD", "postgres"), - database=os.getenv("POSTGRES_DB", "postgres"), - ) - self.connection.autocommit = False - - def execute(self, query, params=None, fetch=False) -> list: - with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: - cursor.execute(query, params) - if fetch: - return cursor.fetchall() - self.connection.commit() - - def executemany(self, query, param_list) -> list: - with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: - cursor.executemany(query, param_list) - self.connection.commit() - - ## User Management Methods - def save_user(self, username, email, password_hash): - query = """ - INSERT INTO users (username, email, password_hash) - VALUES (%s, %s, %s) - """ - self.execute(query, (username, email, password_hash)) - - def get_user_by_username(self, username) -> dict: - query = "SELECT id, username, email, password_hash FROM users WHERE username = %s" - result = self.execute(query, (username,), fetch=True) - return result[0] if result else None - - def get_user_by_email(self, email) -> dict: - query = "SELECT id, username, email, password_hash FROM users WHERE email = %s" - result = self.execute(query, (email,), fetch=True) - return result[0] if result else None - - # Dataset Management Methods - def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int: - query = """ - INSERT INTO datasets (user_id, name, topics) - VALUES (%s, %s, %s) - RETURNING id - """ - result = self.execute(query, (user_id, dataset_name, Json(topics)), fetch=True) - return result[0]["id"] if result else None - - def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame): - query = """ - INSERT INTO events ( - dataset_id, - type, - parent_id, - author, - title, - content, - timestamp, - date, - dt, - hour, - weekday, - reply_to, - source, - topic, - topic_confidence, - ner_entities, - emotion_anger, - emotion_disgust, - emotion_fear, - emotion_joy, - emotion_sadness - ) - VALUES ( - %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, - %s - ) - """ - - values = [] - - for _, row in event_data.iterrows(): - values.append(( - dataset_id, - row["type"], - row["parent_id"], - row["author"], - row.get("title"), - row["content"], - row["timestamp"], - row["date"], - row["dt"], - row["hour"], - row["weekday"], - row.get("reply_to"), - row["source"], - row.get("topic"), - row.get("topic_confidence"), - Json(row["entities"]) if row.get("entities") else None, - row.get("emotion_anger"), - row.get("emotion_disgust"), - row.get("emotion_fear"), - row.get("emotion_joy"), - row.get("emotion_sadness"), - )) - - - with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: - execute_batch(cursor, query, values) - self.connection.commit() - - def get_dataset_content(self, dataset_id: int) -> pd.DataFrame: - query = "SELECT * FROM events WHERE dataset_id = %s" - result = self.execute(query, (dataset_id,), fetch=True) - - if result: - return pd.DataFrame(result) - - raise NotExistentDatasetException("Dataset does not exist") - - def get_dataset_info(self, dataset_id: int) -> dict: - query = "SELECT * FROM datasets WHERE id = %s" - result = self.execute(query, (dataset_id,), fetch=True) - if result: - return result[0] - - raise NotExistentDatasetException("Dataset does not exist") - - def close(self): - if self.connection: - self.connection.close() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index dec2007..3522b91 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,8 +8,8 @@ services: ports: - "5432:5432" volumes: - - ./db/postgres_vol:/var/lib/postgresql/data - - ./db/schema.sql:/docker-entrypoint-initdb.d/schema.sql + - ./server/db/postgres_vol:/var/lib/postgresql/data + - ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql volumes: postgres_data: \ No newline at end of file diff --git a/server/dataset_processor.py b/server/analysis/enrichment.py similarity index 98% rename from server/dataset_processor.py rename to server/analysis/enrichment.py index 37e94da..065caea 100644 --- a/server/dataset_processor.py +++ b/server/analysis/enrichment.py @@ -2,7 +2,7 @@ import pandas as pd from server.analysis.nlp import NLP -class DatasetProcessor: +class DatasetEnrichment: def __init__(self, df, topics): self.df = self._explode_comments(df) self.topics = topics diff --git a/server/stat_gen.py b/server/analysis/stat_gen.py similarity index 99% rename from server/stat_gen.py rename to server/analysis/stat_gen.py index 2ea5ac1..f9d8344 100644 --- a/server/stat_gen.py +++ b/server/analysis/stat_gen.py @@ -1,5 +1,3 @@ -import datetime - import nltk import pandas as pd from nltk.corpus import stopwords diff --git a/server/app.py b/server/app.py index 5e63acd..be48607 100644 --- a/server/app.py +++ b/server/app.py @@ -1,4 +1,7 @@ import os +import pandas as pd +import traceback +import json from dotenv import load_dotenv from flask import Flask, jsonify, request @@ -11,19 +14,15 @@ from flask_jwt_extended import ( get_jwt_identity, ) -from server.stat_gen import StatGen -from server.dataset_processor import DatasetProcessor +from server.analysis.stat_gen import StatGen +from server.analysis.enrichment import DatasetEnrichment from server.exceptions import NotAuthorisedException, NotExistentDatasetException -from db.database import PostgresConnector -from server.auth import AuthManager -from server.utils import get_request_filters, get_dataset_and_validate - -import pandas as pd -import traceback -import json +from server.db.database import PostgresConnector +from server.core.auth import AuthManager +from server.core.datasets import DatasetManager +from server.utils import get_request_filters app = Flask(__name__) -db = PostgresConnector() # Env Variables load_dotenv() @@ -40,11 +39,12 @@ app.config["JWT_ACCESS_TOKEN_EXPIRES"] = jwt_access_token_expires bcrypt = Bcrypt(app) jwt = JWTManager(app) + +db = PostgresConnector() auth_manager = AuthManager(db, bcrypt) - +dataset_manager = DatasetManager(db) stat_gen = StatGen() - @app.route("/register", methods=["POST"]) def register_user(): data = request.get_json() @@ -130,12 +130,10 @@ def upload_data(): posts_df = pd.read_json(post_file, lines=True, convert_dates=False) topics = json.load(topic_file) - processor = DatasetProcessor(posts_df, topics) + processor = DatasetEnrichment(posts_df, topics) enriched_df = processor.enrich() - dataset_id = db.save_dataset_info( - current_user, f"dataset_{current_user}", topics - ) - db.save_dataset_content(dataset_id, enriched_df) + dataset_id = dataset_manager.save_dataset_info(current_user, f"dataset_{current_user}", topics) + dataset_manager.save_dataset_content(dataset_id, enriched_df) return jsonify( { @@ -154,7 +152,8 @@ def upload_data(): @jwt_required() def get_dataset(dataset_id): try: - dataset_content = get_dataset_and_validate(dataset_id, db) + user_id = get_jwt_identity() + dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) filters = get_request_filters() filtered_dataset = stat_gen.filter_dataset(dataset_content, filters) return jsonify(filtered_dataset), 200 @@ -171,7 +170,8 @@ def get_dataset(dataset_id): @jwt_required() def content_endpoint(dataset_id): try: - dataset_content = get_dataset_and_validate(dataset_id, db) + user_id = get_jwt_identity() + dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) filters = get_request_filters() return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200 except NotAuthorisedException: @@ -187,7 +187,8 @@ def content_endpoint(dataset_id): @jwt_required() def get_summary(dataset_id): try: - dataset_content = get_dataset_and_validate(dataset_id, db) + user_id = get_jwt_identity() + dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) filters = get_request_filters() return jsonify(stat_gen.summary(dataset_content, filters)), 200 except NotAuthorisedException: @@ -203,7 +204,8 @@ def get_summary(dataset_id): @jwt_required() def get_time_analysis(dataset_id): try: - dataset_content = get_dataset_and_validate(dataset_id, db) + user_id = get_jwt_identity() + dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) filters = get_request_filters() return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200 except NotAuthorisedException: @@ -219,7 +221,8 @@ def get_time_analysis(dataset_id): @jwt_required() def get_user_analysis(dataset_id): try: - dataset_content = get_dataset_and_validate(dataset_id, db) + user_id = get_jwt_identity() + dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) filters = get_request_filters() return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200 except NotAuthorisedException: @@ -235,7 +238,8 @@ def get_user_analysis(dataset_id): @jwt_required() def get_cultural_analysis(dataset_id): try: - dataset_content = get_dataset_and_validate(dataset_id, db) + user_id = get_jwt_identity() + dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) filters = get_request_filters() return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200 except NotAuthorisedException: @@ -251,7 +255,8 @@ def get_cultural_analysis(dataset_id): @jwt_required() def get_interaction_analysis(dataset_id): try: - dataset_content = get_dataset_and_validate(dataset_id, db) + user_id = get_jwt_identity() + dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id)) filters = get_request_filters() return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200 except NotAuthorisedException: diff --git a/server/auth.py b/server/auth.py deleted file mode 100644 index 9d62512..0000000 --- a/server/auth.py +++ /dev/null @@ -1,29 +0,0 @@ -from db.database import PostgresConnector -from flask_bcrypt import Bcrypt - -class AuthManager: - def __init__(self, db: PostgresConnector, bcrypt: Bcrypt): - self.db = db - self.bcrypt = bcrypt - - def register_user(self, username, email, password): - hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8") - - if self.db.get_user_by_email(email): - raise ValueError("Email already registered") - - if self.db.get_user_by_username(username): - raise ValueError("Username already taken") - - self.db.save_user(username, email, hashed_password) - - def authenticate_user(self, username, password): - user = self.db.get_user_by_username(username) - if user and self.bcrypt.check_password_hash(user['password_hash'], password): - return user - return None - - def get_user_by_id(self, user_id): - query = "SELECT id, username, email FROM users WHERE id = %s" - result = self.db.execute(query, (user_id,), fetch=True) - return result[0] if result else None \ No newline at end of file diff --git a/server/core/auth.py b/server/core/auth.py new file mode 100644 index 0000000..625c3c2 --- /dev/null +++ b/server/core/auth.py @@ -0,0 +1,48 @@ +from server.db.database import PostgresConnector +from flask_bcrypt import Bcrypt + +class AuthManager: + def __init__(self, db: PostgresConnector, bcrypt: Bcrypt): + self.db = db + self.bcrypt = bcrypt + + # private + def _save_user(self, username, email, password_hash): + query = """ + INSERT INTO users (username, email, password_hash) + VALUES (%s, %s, %s) + """ + self.db.execute(query, (username, email, password_hash)) + + # public + def register_user(self, username, email, password): + hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8") + + if self.get_user_by_email(email): + raise ValueError("Email already registered") + + if self.get_user_by_username(username): + raise ValueError("Username already taken") + + self._save_user(username, email, hashed_password) + + def authenticate_user(self, username, password): + user = self.get_user_by_username(username) + if user and self.bcrypt.check_password_hash(user['password_hash'], password): + return user + return None + + def get_user_by_id(self, user_id): + query = "SELECT id, username, email FROM users WHERE id = %s" + result = self.db.execute(query, (user_id,), fetch=True) + return result[0] if result else None + + def get_user_by_username(self, username) -> dict: + query = "SELECT id, username, email, password_hash FROM users WHERE username = %s" + result = self.db.execute(query, (username,), fetch=True) + return result[0] if result else None + + def get_user_by_email(self, email) -> dict: + query = "SELECT id, username, email, password_hash FROM users WHERE email = %s" + result = self.db.execute(query, (email,), fetch=True) + return result[0] if result else None diff --git a/server/core/datasets.py b/server/core/datasets.py new file mode 100644 index 0000000..0d5dba5 --- /dev/null +++ b/server/core/datasets.py @@ -0,0 +1,103 @@ +import pandas as pd +from server.db.database import PostgresConnector +from psycopg2.extras import Json +from server.exceptions import NotAuthorisedException + +class DatasetManager: + def __init__(self, db: PostgresConnector): + self.db = db + + def get_dataset_and_validate(self, dataset_id: int, user_id: int) -> pd.DataFrame: + dataset_info = self.get_dataset_info(dataset_id) + + if dataset_info.get("user_id") != user_id: + raise NotAuthorisedException("This user is not authorised to access this dataset") + + return self.get_dataset_content(dataset_id) + + def get_dataset_content(self, dataset_id: int) -> pd.DataFrame: + query = "SELECT * FROM events WHERE dataset_id = %s" + result = self.db.execute(query, (dataset_id,), fetch=True) + return pd.DataFrame(result) + + def get_dataset_info(self, dataset_id: int) -> dict: + query = "SELECT * FROM datasets WHERE id = %s" + result = self.db.execute(query, (dataset_id,), fetch=True) + return result[0] if result else None + + def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int: + query = """ + INSERT INTO datasets (user_id, name, topics) + VALUES (%s, %s, %s) + RETURNING id + """ + result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True) + return result[0]["id"] if result else None + + def get_dataset_content(self, dataset_id: int) -> pd.DataFrame: + query = "SELECT * FROM events WHERE dataset_id = %s" + result = self.db.execute(query, (dataset_id,), fetch=True) + return pd.DataFrame(result) + + def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame): + if event_data.empty: + return + + query = """ + INSERT INTO events ( + dataset_id, + type, + parent_id, + author, + content, + timestamp, + date, + dt, + hour, + weekday, + reply_to, + source, + topic, + topic_confidence, + ner_entities, + emotion_anger, + emotion_disgust, + emotion_fear, + emotion_joy, + emotion_sadness + ) + VALUES ( + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s + ) + """ + + values = [ + ( + dataset_id, + row["type"], + row["parent_id"], + row["author"], + row["content"], + row["timestamp"], + row["date"], + row["dt"], + row["hour"], + row["weekday"], + row.get("reply_to"), + row["source"], + row.get("topic"), + row.get("topic_confidence"), + Json(row["ner_entities"]) if row.get("ner_entities") else None, + row.get("emotion_anger"), + row.get("emotion_disgust"), + row.get("emotion_fear"), + row.get("emotion_joy"), + row.get("emotion_sadness"), + ) + for _, row in event_data.iterrows() + ] + + self.db.execute_batch(query, values) \ No newline at end of file diff --git a/server/db/database.py b/server/db/database.py new file mode 100644 index 0000000..efbbfcd --- /dev/null +++ b/server/db/database.py @@ -0,0 +1,45 @@ +import os +import psycopg2 +from psycopg2.extras import RealDictCursor +from psycopg2.extras import execute_batch + +from server.exceptions import DatabaseNotConfiguredException + + +class PostgresConnector: + """ + Simple PostgreSQL connector (single connection). + """ + + def __init__(self): + + try: + self.connection = psycopg2.connect( + host=os.getenv("POSTGRES_HOST", "localhost"), + port=os.getenv("POSTGRES_PORT", 5432), + user=os.getenv("POSTGRES_USER", "postgres"), + password=os.getenv("POSTGRES_PASSWORD", "postgres"), + database=os.getenv("POSTGRES_DB", "postgres"), + ) + except psycopg2.OperationalError as e: + raise DatabaseNotConfiguredException(f"Ensure database is up and running: {e}") + + self.connection.autocommit = False + + def execute(self, query, params=None, fetch=False) -> list: + with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: + cursor.execute(query, params) + if fetch: + return cursor.fetchall() + self.connection.commit() + + def execute_batch(self, query, values): + with self.connection.cursor(cursor_factory=RealDictCursor) as cursor: + execute_batch(cursor, query, values) + self.connection.commit() + + + ## User Management Methods + def close(self): + if self.connection: + self.connection.close() \ No newline at end of file diff --git a/db/schema.sql b/server/db/schema.sql similarity index 94% rename from db/schema.sql rename to server/db/schema.sql index 5a9eaee..693f821 100644 --- a/db/schema.sql +++ b/server/db/schema.sql @@ -30,10 +30,7 @@ CREATE TABLE events ( hour INTEGER NOT NULL, weekday VARCHAR(255) NOT NULL, - /* Posts Only */ - title VARCHAR(255), - - /* Comments Only*/ + /* Comments and Replies */ parent_id VARCHAR(255), reply_to VARCHAR(255), source VARCHAR(255) NOT NULL, diff --git a/server/exceptions.py b/server/exceptions.py index f3ebaa9..e63be49 100644 --- a/server/exceptions.py +++ b/server/exceptions.py @@ -2,4 +2,7 @@ class NotAuthorisedException(Exception): pass class NotExistentDatasetException(Exception): + pass + +class DatabaseNotConfiguredException(Exception): pass \ No newline at end of file diff --git a/server/utils.py b/server/utils.py index 71e593b..815739f 100644 --- a/server/utils.py +++ b/server/utils.py @@ -1,10 +1,5 @@ import datetime -import pandas as pd - from flask import request -from flask_jwt_extended import get_jwt_identity -from db.database import PostgresConnector -from server.exceptions import NotAuthorisedException def parse_datetime_filter(value): if not value: @@ -53,12 +48,3 @@ def get_request_filters() -> dict: filters["data_sources"] = data_sources return filters - -def get_dataset_and_validate(dataset_id: int, db: PostgresConnector) -> pd.DataFrame: - current_user = get_jwt_identity() - dataset = db.get_dataset_info(dataset_id) - - if dataset.get("user_id") != int(current_user): - raise NotAuthorisedException("This user is not authorised to access this dataset") - - return db.get_dataset_content(dataset_id)