Compare commits

..

8 Commits

12 changed files with 232 additions and 225 deletions

View File

@@ -1,149 +0,0 @@
import os
import psycopg2
import pandas as pd
from psycopg2.extras import RealDictCursor
from psycopg2.extras import execute_batch, Json
from server.exceptions import NotExistentDatasetException
class PostgresConnector:
"""
Simple PostgreSQL connector (single connection).
"""
def __init__(self):
self.connection = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=os.getenv("POSTGRES_PORT", 5432),
user=os.getenv("POSTGRES_USER", "postgres"),
password=os.getenv("POSTGRES_PASSWORD", "postgres"),
database=os.getenv("POSTGRES_DB", "postgres"),
)
self.connection.autocommit = False
def execute(self, query, params=None, fetch=False) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
if fetch:
return cursor.fetchall()
self.connection.commit()
def executemany(self, query, param_list) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.executemany(query, param_list)
self.connection.commit()
## User Management Methods
def save_user(self, username, email, password_hash):
query = """
INSERT INTO users (username, email, password_hash)
VALUES (%s, %s, %s)
"""
self.execute(query, (username, email, password_hash))
def get_user_by_username(self, username) -> dict:
query = "SELECT id, username, email, password_hash FROM users WHERE username = %s"
result = self.execute(query, (username,), fetch=True)
return result[0] if result else None
def get_user_by_email(self, email) -> dict:
query = "SELECT id, username, email, password_hash FROM users WHERE email = %s"
result = self.execute(query, (email,), fetch=True)
return result[0] if result else None
# Dataset Management Methods
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
query = """
INSERT INTO datasets (user_id, name, topics)
VALUES (%s, %s, %s)
RETURNING id
"""
result = self.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
return result[0]["id"] if result else None
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
query = """
INSERT INTO events (
dataset_id,
type,
parent_id,
author,
title,
content,
timestamp,
date,
dt,
hour,
weekday,
reply_to,
source,
topic,
topic_confidence,
ner_entities,
emotion_anger,
emotion_disgust,
emotion_fear,
emotion_joy,
emotion_sadness
)
VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s
)
"""
values = []
for _, row in event_data.iterrows():
values.append((
dataset_id,
row["type"],
row["parent_id"],
row["author"],
row.get("title"),
row["content"],
row["timestamp"],
row["date"],
row["dt"],
row["hour"],
row["weekday"],
row.get("reply_to"),
row["source"],
row.get("topic"),
row.get("topic_confidence"),
Json(row["entities"]) if row.get("entities") else None,
row.get("emotion_anger"),
row.get("emotion_disgust"),
row.get("emotion_fear"),
row.get("emotion_joy"),
row.get("emotion_sadness"),
))
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
execute_batch(cursor, query, values)
self.connection.commit()
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.execute(query, (dataset_id,), fetch=True)
if result:
return pd.DataFrame(result)
raise NotExistentDatasetException("Dataset does not exist")
def get_dataset_info(self, dataset_id: int) -> dict:
query = "SELECT * FROM datasets WHERE id = %s"
result = self.execute(query, (dataset_id,), fetch=True)
if result:
return result[0]
raise NotExistentDatasetException("Dataset does not exist")
def close(self):
if self.connection:
self.connection.close()

View File

@@ -8,8 +8,8 @@ services:
ports:
- "5432:5432"
volumes:
- ./db/postgres_vol:/var/lib/postgresql/data
- ./db/schema.sql:/docker-entrypoint-initdb.d/schema.sql
- ./server/db/postgres_vol:/var/lib/postgresql/data
- ./server/db/schema.sql:/docker-entrypoint-initdb.d/schema.sql
volumes:
postgres_data:

View File

@@ -2,7 +2,7 @@ import pandas as pd
from server.analysis.nlp import NLP
class DatasetProcessor:
class DatasetEnrichment:
def __init__(self, df, topics):
self.df = self._explode_comments(df)
self.topics = topics

View File

@@ -1,5 +1,3 @@
import datetime
import nltk
import pandas as pd
from nltk.corpus import stopwords

View File

@@ -1,4 +1,7 @@
import os
import pandas as pd
import traceback
import json
from dotenv import load_dotenv
from flask import Flask, jsonify, request
@@ -11,19 +14,15 @@ from flask_jwt_extended import (
get_jwt_identity,
)
from server.stat_gen import StatGen
from server.dataset_processor import DatasetProcessor
from server.analysis.stat_gen import StatGen
from server.analysis.enrichment import DatasetEnrichment
from server.exceptions import NotAuthorisedException, NotExistentDatasetException
from db.database import PostgresConnector
from server.auth import AuthManager
from server.utils import get_request_filters, get_dataset_and_validate
import pandas as pd
import traceback
import json
from server.db.database import PostgresConnector
from server.core.auth import AuthManager
from server.core.datasets import DatasetManager
from server.utils import get_request_filters
app = Flask(__name__)
db = PostgresConnector()
# Env Variables
load_dotenv()
@@ -40,11 +39,12 @@ app.config["JWT_ACCESS_TOKEN_EXPIRES"] = jwt_access_token_expires
bcrypt = Bcrypt(app)
jwt = JWTManager(app)
db = PostgresConnector()
auth_manager = AuthManager(db, bcrypt)
dataset_manager = DatasetManager(db)
stat_gen = StatGen()
@app.route("/register", methods=["POST"])
def register_user():
data = request.get_json()
@@ -130,12 +130,10 @@ def upload_data():
posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
topics = json.load(topic_file)
processor = DatasetProcessor(posts_df, topics)
processor = DatasetEnrichment(posts_df, topics)
enriched_df = processor.enrich()
dataset_id = db.save_dataset_info(
current_user, f"dataset_{current_user}", topics
)
db.save_dataset_content(dataset_id, enriched_df)
dataset_id = dataset_manager.save_dataset_info(current_user, f"dataset_{current_user}", topics)
dataset_manager.save_dataset_content(dataset_id, enriched_df)
return jsonify(
{
@@ -154,7 +152,8 @@ def upload_data():
@jwt_required()
def get_dataset(dataset_id):
try:
dataset_content = get_dataset_and_validate(dataset_id, db)
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters()
filtered_dataset = stat_gen.filter_dataset(dataset_content, filters)
return jsonify(filtered_dataset), 200
@@ -171,7 +170,8 @@ def get_dataset(dataset_id):
@jwt_required()
def content_endpoint(dataset_id):
try:
dataset_content = get_dataset_and_validate(dataset_id, db)
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters()
return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -187,7 +187,8 @@ def content_endpoint(dataset_id):
@jwt_required()
def get_summary(dataset_id):
try:
dataset_content = get_dataset_and_validate(dataset_id, db)
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters()
return jsonify(stat_gen.summary(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -203,7 +204,8 @@ def get_summary(dataset_id):
@jwt_required()
def get_time_analysis(dataset_id):
try:
dataset_content = get_dataset_and_validate(dataset_id, db)
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters()
return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -219,7 +221,8 @@ def get_time_analysis(dataset_id):
@jwt_required()
def get_user_analysis(dataset_id):
try:
dataset_content = get_dataset_and_validate(dataset_id, db)
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters()
return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -235,7 +238,8 @@ def get_user_analysis(dataset_id):
@jwt_required()
def get_cultural_analysis(dataset_id):
try:
dataset_content = get_dataset_and_validate(dataset_id, db)
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters()
return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200
except NotAuthorisedException:
@@ -251,7 +255,8 @@ def get_cultural_analysis(dataset_id):
@jwt_required()
def get_interaction_analysis(dataset_id):
try:
dataset_content = get_dataset_and_validate(dataset_id, db)
user_id = get_jwt_identity()
dataset_content = dataset_manager.get_dataset_and_validate(dataset_id, int(user_id))
filters = get_request_filters()
return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200
except NotAuthorisedException:

View File

@@ -1,29 +0,0 @@
from db.database import PostgresConnector
from flask_bcrypt import Bcrypt
class AuthManager:
def __init__(self, db: PostgresConnector, bcrypt: Bcrypt):
self.db = db
self.bcrypt = bcrypt
def register_user(self, username, email, password):
hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8")
if self.db.get_user_by_email(email):
raise ValueError("Email already registered")
if self.db.get_user_by_username(username):
raise ValueError("Username already taken")
self.db.save_user(username, email, hashed_password)
def authenticate_user(self, username, password):
user = self.db.get_user_by_username(username)
if user and self.bcrypt.check_password_hash(user['password_hash'], password):
return user
return None
def get_user_by_id(self, user_id):
query = "SELECT id, username, email FROM users WHERE id = %s"
result = self.db.execute(query, (user_id,), fetch=True)
return result[0] if result else None

48
server/core/auth.py Normal file
View File

@@ -0,0 +1,48 @@
from server.db.database import PostgresConnector
from flask_bcrypt import Bcrypt
class AuthManager:
def __init__(self, db: PostgresConnector, bcrypt: Bcrypt):
self.db = db
self.bcrypt = bcrypt
# private
def _save_user(self, username, email, password_hash):
query = """
INSERT INTO users (username, email, password_hash)
VALUES (%s, %s, %s)
"""
self.db.execute(query, (username, email, password_hash))
# public
def register_user(self, username, email, password):
hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8")
if self.get_user_by_email(email):
raise ValueError("Email already registered")
if self.get_user_by_username(username):
raise ValueError("Username already taken")
self._save_user(username, email, hashed_password)
def authenticate_user(self, username, password):
user = self.get_user_by_username(username)
if user and self.bcrypt.check_password_hash(user['password_hash'], password):
return user
return None
def get_user_by_id(self, user_id):
query = "SELECT id, username, email FROM users WHERE id = %s"
result = self.db.execute(query, (user_id,), fetch=True)
return result[0] if result else None
def get_user_by_username(self, username) -> dict:
query = "SELECT id, username, email, password_hash FROM users WHERE username = %s"
result = self.db.execute(query, (username,), fetch=True)
return result[0] if result else None
def get_user_by_email(self, email) -> dict:
query = "SELECT id, username, email, password_hash FROM users WHERE email = %s"
result = self.db.execute(query, (email,), fetch=True)
return result[0] if result else None

103
server/core/datasets.py Normal file
View File

@@ -0,0 +1,103 @@
import pandas as pd
from server.db.database import PostgresConnector
from psycopg2.extras import Json
from server.exceptions import NotAuthorisedException
class DatasetManager:
def __init__(self, db: PostgresConnector):
self.db = db
def get_dataset_and_validate(self, dataset_id: int, user_id: int) -> pd.DataFrame:
dataset_info = self.get_dataset_info(dataset_id)
if dataset_info.get("user_id") != user_id:
raise NotAuthorisedException("This user is not authorised to access this dataset")
return self.get_dataset_content(dataset_id)
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True)
return pd.DataFrame(result)
def get_dataset_info(self, dataset_id: int) -> dict:
query = "SELECT * FROM datasets WHERE id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True)
return result[0] if result else None
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
query = """
INSERT INTO datasets (user_id, name, topics)
VALUES (%s, %s, %s)
RETURNING id
"""
result = self.db.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
return result[0]["id"] if result else None
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
query = "SELECT * FROM events WHERE dataset_id = %s"
result = self.db.execute(query, (dataset_id,), fetch=True)
return pd.DataFrame(result)
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
if event_data.empty:
return
query = """
INSERT INTO events (
dataset_id,
type,
parent_id,
author,
content,
timestamp,
date,
dt,
hour,
weekday,
reply_to,
source,
topic,
topic_confidence,
ner_entities,
emotion_anger,
emotion_disgust,
emotion_fear,
emotion_joy,
emotion_sadness
)
VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s
)
"""
values = [
(
dataset_id,
row["type"],
row["parent_id"],
row["author"],
row["content"],
row["timestamp"],
row["date"],
row["dt"],
row["hour"],
row["weekday"],
row.get("reply_to"),
row["source"],
row.get("topic"),
row.get("topic_confidence"),
Json(row["ner_entities"]) if row.get("ner_entities") else None,
row.get("emotion_anger"),
row.get("emotion_disgust"),
row.get("emotion_fear"),
row.get("emotion_joy"),
row.get("emotion_sadness"),
)
for _, row in event_data.iterrows()
]
self.db.execute_batch(query, values)

45
server/db/database.py Normal file
View File

@@ -0,0 +1,45 @@
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.extras import execute_batch
from server.exceptions import DatabaseNotConfiguredException
class PostgresConnector:
"""
Simple PostgreSQL connector (single connection).
"""
def __init__(self):
try:
self.connection = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=os.getenv("POSTGRES_PORT", 5432),
user=os.getenv("POSTGRES_USER", "postgres"),
password=os.getenv("POSTGRES_PASSWORD", "postgres"),
database=os.getenv("POSTGRES_DB", "postgres"),
)
except psycopg2.OperationalError as e:
raise DatabaseNotConfiguredException(f"Ensure database is up and running: {e}")
self.connection.autocommit = False
def execute(self, query, params=None, fetch=False) -> list:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
if fetch:
return cursor.fetchall()
self.connection.commit()
def execute_batch(self, query, values):
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
execute_batch(cursor, query, values)
self.connection.commit()
## User Management Methods
def close(self):
if self.connection:
self.connection.close()

View File

@@ -30,10 +30,7 @@ CREATE TABLE events (
hour INTEGER NOT NULL,
weekday VARCHAR(255) NOT NULL,
/* Posts Only */
title VARCHAR(255),
/* Comments Only*/
/* Comments and Replies */
parent_id VARCHAR(255),
reply_to VARCHAR(255),
source VARCHAR(255) NOT NULL,

View File

@@ -3,3 +3,6 @@ class NotAuthorisedException(Exception):
class NotExistentDatasetException(Exception):
pass
class DatabaseNotConfiguredException(Exception):
pass

View File

@@ -1,10 +1,5 @@
import datetime
import pandas as pd
from flask import request
from flask_jwt_extended import get_jwt_identity
from db.database import PostgresConnector
from server.exceptions import NotAuthorisedException
def parse_datetime_filter(value):
if not value:
@@ -53,12 +48,3 @@ def get_request_filters() -> dict:
filters["data_sources"] = data_sources
return filters
def get_dataset_and_validate(dataset_id: int, db: PostgresConnector) -> pd.DataFrame:
current_user = get_jwt_identity()
dataset = db.get_dataset_info(dataset_id)
if dataset.get("user_id") != int(current_user):
raise NotAuthorisedException("This user is not authorised to access this dataset")
return db.get_dataset_content(dataset_id)