Compare commits

..

8 Commits

12 changed files with 232 additions and 225 deletions

View File

@@ -1,149 +0,0 @@
import os
import psycopg2
import pandas as pd
from psycopg2.extras import RealDictCursor
from psycopg2.extras import execute_batch, Json
from server.exceptions import NotExistentDatasetException
class PostgresConnector:
"""
Simple PostgreSQL connector (single connection).
"""
def __init__(self):
self.connection = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=os.getenv("POSTGRES_PORT", 5432),
user=os.getenv("POSTGRES_USER", "postgres"),
password=os.getenv("POSTGRES_PASSWORD", "postgres"),
database=os.getenv("POSTGRES_DB", "postgres"),
)
self.connection.autocommit = False
def execute(self, query, params=None, fetch=False) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
if fetch:
return cursor.fetchall()
self.connection.commit()
def executemany(self, query, param_list) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.executemany(query, param_list)
self.connection.commit()
## User Management Methods
def save_user(self, username, email, password_hash):
query = """
INSERT INTO users (username, email, password_hash)
VALUES (%s, %s, %s)
"""
self.execute(query, (username, email, password_hash))
def get_user_by_username(self, username) -> dict:
query = "SELECT id, username, email, password_hash FROM users WHERE username = %s"
result = self.execute(query, (username,), fetch=True)
return result[0] if result else None
def get_user_by_email(self, email) -> dict:
query = "SELECT id, username, email, password_hash FROM users WHERE email = %s"
result = self.execute(query, (email,), fetch=True)
return result[0] if result else None
# Dataset Management Methods
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
query = """
INSERT INTO datasets (user_id, name, topics)
VALUES (%s, %s, %s)
RETURNING id
"""
result = self.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
return result[0]["id"] if result else None
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
query = """
INSERT INTO events (
dataset_id,
type,
parent_id,
author,
title,
content,
timestamp,
date,
dt,
hour,
weekday,
reply_to,
source,
topic,
topic_confidence,
ner_entities,
emotion_anger,
emotion_disgust,
emotion_fear,
emotion_joy,
emotion_sadness
)
VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s
)
"""
values = []
for _, row in event_data.iterrows():
values.append((
dataset_id,
row["type"],
row["parent_id"],
row["author"],
row.get("title"),
row["content"],
row["timestamp"],
row["date"],
row["dt"],
row["hour"],
row["weekday"],
row.get("reply_to"),
row["source"],
row.get("topic"),
row.get("topic_confidence"),
Json(row["entities"]) if row.get("entities") else None,
row.get("emotion_anger"),
row.get("emotion_disgust"),
row.get("emotion_fear"),
row.get("emotion_joy"),
row.get("emotion_sadness"),
))
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
execute_batch(cursor, query, values)
self.connection.commit()
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.execute(query, (dataset_id,), fetch=True)
if result:
return pd.DataFrame(result)
raise NotExistentDatasetException("Dataset does not exist")
def get_dataset_info(self, dataset_id: int) -> dict:
query = "SELECT * FROM datasets WHERE id = %s"
result = self.execute(query, (dataset_id,), fetch=True)
if result:
return result[0]
raise NotExistentDatasetException("Dataset does not exist")
def close(self):
if self.connection:
self.connection.close()

View File

@@ -8,8 +8,8 @@ services:
ports: ports:
- "5432:5432" - "5432:5432"
volumes: volumes:
- ./db/postgres_vol:/var/lib/postgresql/data - ./server/db/postgres_vol:/var/lib/postgresql/data
- ./db/schema.sql:/docker-entrypoint-initdb.d/schema.sql - ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql
volumes: volumes:
postgres_data: postgres_data:

View File

@@ -2,7 +2,7 @@ import pandas as pd
from server.analysis.nlp import NLP from server.analysis.nlp import NLP
class DatasetProcessor: class DatasetEnrichment:
def __init__(self, df, topics): def __init__(self, df, topics):
self.df = self._explode_comments(df) self.df = self._explode_comments(df)
self.topics = topics self.topics = topics

View File

@@ -1,5 +1,3 @@
import datetime
import nltk import nltk
import pandas as pd import pandas as pd
from nltk.corpus import stopwords from nltk.corpus import stopwords

View File

@@ -1,4 +1,7 @@
import os import os
import pandas as pd
import traceback
import json
from dotenv import load_dotenv from dotenv import load_dotenv
from flask import Flask, jsonify, request from flask import Flask, jsonify, request
@@ -11,19 +14,15 @@ from flask_jwt_extended import (
get_jwt_identity, get_jwt_identity,
) )
from server.stat_gen import StatGen from server.analysis.stat_gen import StatGen
from server.dataset_processor import DatasetProcessor from server.analysis.enrichment import DatasetEnrichment
from server.exceptions import NotAuthorisedException, NotExistentDatasetException from server.exceptions import NotAuthorisedException, NotExistentDatasetException
from db.database import PostgresConnector from server.db.database import PostgresConnector
from server.auth import AuthManager from server.core.auth import AuthManager
from server.utils import get_request_filters, get_dataset_and_validate from server.core.datasets import DatasetManager
from server.utils import get_request_filters
import pandas as pd
import traceback
import json
app = Flask(__name__) app = Flask(__name__)
db = PostgresConnector()
# Env Variables # Env Variables
load_dotenv() load_dotenv()
@@ -40,11 +39,12 @@ app.config["JWT_ACCESS_TOKEN_EXPIRES"] = jwt_access_token_expires
bcrypt = Bcrypt(app) bcrypt = Bcrypt(app)
jwt = JWTManager(app) jwt = JWTManager(app)
db = PostgresConnector()
auth_manager = AuthManager(db, bcrypt) auth_manager = AuthManager(db, bcrypt)
dataset_manager = DatasetManager(db)
stat_gen = StatGen() stat_gen = StatGen()
@app.route("/register", methods=["POST"]) @app.route("/register", methods=["POST"])
def register_user(): def register_user():
data = request.get_json() data = request.get_json()
@@ -130,12 +130,10 @@ def upload_data():
posts_df = pd.read_json(post_file, lines=True, convert_dates=False) posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
topics = json.load(topic_file) topics = json.load(topic_file)
processor = DatasetProcessor(posts_df, topics) processor = DatasetEnrichment(posts_df, topics)
enriched_df = processor.enrich() enriched_df = processor.enrich()
dataset_id = db.save_dataset_info( dataset_id = dataset_manager.save_dataset_info(current_user, f"dataset_{current_user}", topics)
current_user, f"dataset_{current_user}", topics dataset_manager.save_dataset_content(dataset_id, enriched_df)
)
db.save_dataset_content(dataset_id, enriched_df)
return jsonify( return jsonify(
{ {
@@ -154,7 +152,8 @@ def upload_data():
@jwt_required() @jwt_required()
def get_dataset(dataset_id): def get_dataset(dataset_id):
try: try:
dataset_content = get_dataset_and_validate(dataset_id, db) user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters() filters = get_request_filters()
filtered_dataset = stat_gen.filter_dataset(dataset_content, filters) filtered_dataset = stat_gen.filter_dataset(dataset_content, filters)
return jsonify(filtered_dataset), 200 return jsonify(filtered_dataset), 200
@@ -171,7 +170,8 @@ def get_dataset(dataset_id):
@jwt_required() @jwt_required()
def content_endpoint(dataset_id): def content_endpoint(dataset_id):
try: try:
dataset_content = get_dataset_and_validate(dataset_id, db) user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -187,7 +187,8 @@ def content_endpoint(dataset_id):
@jwt_required() @jwt_required()
def get_summary(dataset_id): def get_summary(dataset_id):
try: try:
dataset_content = get_dataset_and_validate(dataset_id, db) user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.summary(dataset_content, filters)), 200 return jsonify(stat_gen.summary(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -203,7 +204,8 @@ def get_summary(dataset_id):
@jwt_required() @jwt_required()
def get_time_analysis(dataset_id): def get_time_analysis(dataset_id):
try: try:
dataset_content = get_dataset_and_validate(dataset_id, db) user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -219,7 +221,8 @@ def get_time_analysis(dataset_id):
@jwt_required() @jwt_required()
def get_user_analysis(dataset_id): def get_user_analysis(dataset_id):
try: try:
dataset_content = get_dataset_and_validate(dataset_id, db) user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -235,7 +238,8 @@ def get_user_analysis(dataset_id):
@jwt_required() @jwt_required()
def get_cultural_analysis(dataset_id): def get_cultural_analysis(dataset_id):
try: try:
dataset_content = get_dataset_and_validate(dataset_id, db) user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:
@@ -251,7 +255,8 @@ def get_cultural_analysis(dataset_id):
@jwt_required() @jwt_required()
def get_interaction_analysis(dataset_id): def get_interaction_analysis(dataset_id):
try: try:
dataset_content = get_dataset_and_validate(dataset_id, db) user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters() filters = get_request_filters()
return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200 return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200
except NotAuthorisedException: except NotAuthorisedException:

View File

@@ -1,29 +0,0 @@
from db.database import PostgresConnector
from flask_bcrypt import Bcrypt
class AuthManager:
def __init__(self, db: PostgresConnector, bcrypt: Bcrypt):
self.db = db
self.bcrypt = bcrypt
def register_user(self, username, email, password):
hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8")
if self.db.get_user_by_email(email):
raise ValueError("Email already registered")
if self.db.get_user_by_username(username):
raise ValueError("Username already taken")
self.db.save_user(username, email, hashed_password)
def authenticate_user(self, username, password):
user = self.db.get_user_by_username(username)
if user and self.bcrypt.check_password_hash(user['password_hash'], password):
return user
return None
def get_user_by_id(self, user_id):
query = "SELECT id, username, email FROM users WHERE id = %s"
result = self.db.execute(query, (user_id,), fetch=True)
return result[0] if result else None

48
server/core/auth.py Normal file
View File

@@ -0,0 +1,48 @@
from server.db.database import PostgresConnector
from flask_bcrypt import Bcrypt
class AuthManager:
def __init__(self, db: PostgresConnector, bcrypt: Bcrypt):
self.db = db
self.bcrypt = bcrypt
# private
def _save_user(self, username, email, password_hash):
query = """
INSERT INTO users (username, email, password_hash)
VALUES (%s, %s, %s)
"""
self.db.execute(query, (username, email, password_hash))
# public
def register_user(self, username, email, password):
hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8")
if self.get_user_by_email(email):
raise ValueError("Email already registered")
if self.get_user_by_username(username):
raise ValueError("Username already taken")
self._save_user(username, email, hashed_password)
def authenticate_user(self, username, password):
user = self.get_user_by_username(username)
if user and self.bcrypt.check_password_hash(user['password_hash'], password):
return user
return None
def get_user_by_id(self, user_id):
query = "SELECT id, username, email FROM users WHERE id = %s"
result = self.db.execute(query, (user_id,), fetch=True)
return result[0] if result else None
def get_user_by_username(self, username) -> dict:
query = "SELECT id, username, email, password_hash FROM users WHERE username = %s"
result = self.db.execute(query, (username,), fetch=True)
return result[0] if result else None
def get_user_by_email(self, email) -> dict:
query = "SELECT id, username, email, password_hash FROM users WHERE email = %s"
result = self.db.execute(query, (email,), fetch=True)
return result[0] if result else None

103
server/core/datasets.py Normal file
View File

@@ -0,0 +1,103 @@
import pandas as pd
from server.db.database import PostgresConnector
from psycopg2.extras import Json
from server.exceptions import NotAuthorisedException
class DatasetManager:
def __init__(self, db: PostgresConnector):
self.db = db
def get_dataset_and_validate(self, dataset_id: int, user_id: int) -> pd.DataFrame:
dataset_info = self.get_dataset_info(dataset_id)
if dataset_info.get("user_id") != user_id:
raise NotAuthorisedException("This user is not authorised to access this dataset")
return self.get_dataset_content(dataset_id)
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True)
return pd.DataFrame(result)
def get_dataset_info(self, dataset_id: int) -> dict:
query = "SELECT * FROM datasets WHERE id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True)
return result[0] if result else None
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
query = """
INSERT INTO datasets (user_id, name, topics)
VALUES (%s, %s, %s)
RETURNING id
"""
result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
return result[0]["id"] if result else None
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True)
return pd.DataFrame(result)
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
if event_data.empty:
return
query = """
INSERT INTO events (
dataset_id,
type,
parent_id,
author,
content,
timestamp,
date,
dt,
hour,
weekday,
reply_to,
source,
topic,
topic_confidence,
ner_entities,
emotion_anger,
emotion_disgust,
emotion_fear,
emotion_joy,
emotion_sadness
)
VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s
)
"""
values = [
(
dataset_id,
row["type"],
row["parent_id"],
row["author"],
row["content"],
row["timestamp"],
row["date"],
row["dt"],
row["hour"],
row["weekday"],
row.get("reply_to"),
row["source"],
row.get("topic"),
row.get("topic_confidence"),
Json(row["ner_entities"]) if row.get("ner_entities") else None,
row.get("emotion_anger"),
row.get("emotion_disgust"),
row.get("emotion_fear"),
row.get("emotion_joy"),
row.get("emotion_sadness"),
)
for _, row in event_data.iterrows()
]
self.db.execute_batch(query, values)

45
server/db/database.py Normal file
View File

@@ -0,0 +1,45 @@
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.extras import execute_batch
from server.exceptions import DatabaseNotConfiguredException
class PostgresConnector:
"""
Simple PostgreSQL connector (single connection).
"""
def __init__(self):
try:
self.connection = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=os.getenv("POSTGRES_PORT", 5432),
user=os.getenv("POSTGRES_USER", "postgres"),
password=os.getenv("POSTGRES_PASSWORD", "postgres"),
database=os.getenv("POSTGRES_DB", "postgres"),
)
except psycopg2.OperationalError as e:
raise DatabaseNotConfiguredException(f"Ensure database is up and running: {e}")
self.connection.autocommit = False
def execute(self, query, params=None, fetch=False) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
if fetch:
return cursor.fetchall()
self.connection.commit()
def execute_batch(self, query, values):
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
execute_batch(cursor, query, values)
self.connection.commit()
## User Management Methods
def close(self):
if self.connection:
self.connection.close()

View File

@@ -30,10 +30,7 @@ CREATE TABLE events (
hour INTEGER NOT NULL, hour INTEGER NOT NULL,
weekday VARCHAR(255) NOT NULL, weekday VARCHAR(255) NOT NULL,
/* Posts Only */ /* Comments and Replies */
title VARCHAR(255),
/* Comments Only*/
parent_id VARCHAR(255), parent_id VARCHAR(255),
reply_to VARCHAR(255), reply_to VARCHAR(255),
source VARCHAR(255) NOT NULL, source VARCHAR(255) NOT NULL,

View File

@@ -3,3 +3,6 @@ class NotAuthorisedException(Exception):
class NotExistentDatasetException(Exception): class NotExistentDatasetException(Exception):
pass pass
class DatabaseNotConfiguredException(Exception):
pass

View File

@@ -1,10 +1,5 @@
import datetime import datetime
import pandas as pd
from flask import request from flask import request
from flask_jwt_extended import get_jwt_identity
from db.database import PostgresConnector
from server.exceptions import NotAuthorisedException
def parse_datetime_filter(value): def parse_datetime_filter(value):
if not value: if not value:
@@ -53,12 +48,3 @@ def get_request_filters() -> dict:
filters["data_sources"] = data_sources filters["data_sources"] = data_sources
return filters return filters
def get_dataset_and_validate(dataset_id: int, db: PostgresConnector) -> pd.DataFrame:
current_user = get_jwt_identity()
dataset = db.get_dataset_info(dataset_id)
if dataset.get("user_id") != int(current_user):
raise NotAuthorisedException("This user is not authorised to access this dataset")
return db.get_dataset_content(dataset_id)