149 lines
5.0 KiB
Python
149 lines
5.0 KiB
Python
import os
|
|
import psycopg2
|
|
import pandas as pd
|
|
from psycopg2.extras import RealDictCursor
|
|
from psycopg2.extras import execute_batch, Json
|
|
from server.exceptions import NotExistentDatasetException
|
|
|
|
|
|
class PostgresConnector:
|
|
"""
|
|
Simple PostgreSQL connector (single connection).
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.connection = psycopg2.connect(
|
|
host=os.getenv("POSTGRES_HOST", "localhost"),
|
|
port=os.getenv("POSTGRES_PORT", 5432),
|
|
user=os.getenv("POSTGRES_USER", "postgres"),
|
|
password=os.getenv("POSTGRES_PASSWORD", "postgres"),
|
|
database=os.getenv("POSTGRES_DB", "postgres"),
|
|
)
|
|
self.connection.autocommit = False
|
|
|
|
def execute(self, query, params=None, fetch=False) -> list:
|
|
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
|
cursor.execute(query, params)
|
|
if fetch:
|
|
return cursor.fetchall()
|
|
self.connection.commit()
|
|
|
|
def executemany(self, query, param_list) -> list:
|
|
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
|
cursor.executemany(query, param_list)
|
|
self.connection.commit()
|
|
|
|
## User Management Methods
|
|
def save_user(self, username, email, password_hash):
|
|
query = """
|
|
INSERT INTO users (username, email, password_hash)
|
|
VALUES (%s, %s, %s)
|
|
"""
|
|
self.execute(query, (username, email, password_hash))
|
|
|
|
def get_user_by_username(self, username) -> dict:
|
|
query = "SELECT id, username, email, password_hash FROM users WHERE username = %s"
|
|
result = self.execute(query, (username,), fetch=True)
|
|
return result[0] if result else None
|
|
|
|
def get_user_by_email(self, email) -> dict:
|
|
query = "SELECT id, username, email, password_hash FROM users WHERE email = %s"
|
|
result = self.execute(query, (email,), fetch=True)
|
|
return result[0] if result else None
|
|
|
|
# Dataset Management Methods
|
|
def save_dataset_info(self, user_id: int, dataset_name: str, topics: dict) -> int:
|
|
query = """
|
|
INSERT INTO datasets (user_id, name, topics)
|
|
VALUES (%s, %s, %s)
|
|
RETURNING id
|
|
"""
|
|
result = self.execute(query, (user_id, dataset_name, Json(topics)), fetch=True)
|
|
return result[0]["id"] if result else None
|
|
|
|
def save_dataset_content(self, dataset_id: int, event_data: pd.DataFrame):
|
|
query = """
|
|
INSERT INTO events (
|
|
dataset_id,
|
|
type,
|
|
parent_id,
|
|
author,
|
|
title,
|
|
content,
|
|
timestamp,
|
|
date,
|
|
dt,
|
|
hour,
|
|
weekday,
|
|
reply_to,
|
|
source,
|
|
topic,
|
|
topic_confidence,
|
|
ner_entities,
|
|
emotion_anger,
|
|
emotion_disgust,
|
|
emotion_fear,
|
|
emotion_joy,
|
|
emotion_sadness
|
|
)
|
|
VALUES (
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s
|
|
)
|
|
"""
|
|
|
|
values = []
|
|
|
|
for _, row in event_data.iterrows():
|
|
values.append((
|
|
dataset_id,
|
|
row["type"],
|
|
row["parent_id"],
|
|
row["author"],
|
|
row.get("title"),
|
|
row["content"],
|
|
row["timestamp"],
|
|
row["date"],
|
|
row["dt"],
|
|
row["hour"],
|
|
row["weekday"],
|
|
row.get("reply_to"),
|
|
row["source"],
|
|
row.get("topic"),
|
|
row.get("topic_confidence"),
|
|
Json(row["entities"]) if row.get("entities") else None,
|
|
row.get("emotion_anger"),
|
|
row.get("emotion_disgust"),
|
|
row.get("emotion_fear"),
|
|
row.get("emotion_joy"),
|
|
row.get("emotion_sadness"),
|
|
))
|
|
|
|
|
|
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
|
|
execute_batch(cursor, query, values)
|
|
self.connection.commit()
|
|
|
|
def get_dataset_content(self, dataset_id: int) -> pd.DataFrame:
|
|
query = "SELECT * FROM events WHERE dataset_id = %s"
|
|
result = self.execute(query, (dataset_id,), fetch=True)
|
|
|
|
if result:
|
|
return pd.DataFrame(result)
|
|
|
|
raise NotExistentDatasetException("Dataset does not exist")
|
|
|
|
def get_dataset_info(self, dataset_id: int) -> dict:
|
|
query = "SELECT * FROM datasets WHERE id = %s"
|
|
result = self.execute(query, (dataset_id,), fetch=True)
|
|
if result:
|
|
return result[0]
|
|
|
|
raise NotExistentDatasetException("Dataset does not exist")
|
|
|
|
def close(self):
|
|
if self.connection:
|
|
self.connection.close() |