Files
crosspost/server/db/database.py

63 lines
1.9 KiB
Python

import os
import psycopg2
import os
from dotenv import load_dotenv
from psycopg2.extras import RealDictCursor
from psycopg2.extras import execute_batch
load_dotenv()
postgres_host = os.getenv("POSTGRES_HOST", "localhost")
postgres_port = os.getenv("POSTGRES_PORT", 5432)
postgres_user = os.getenv("POSTGRES_USER", "postgres")
postgres_password = os.getenv("POSTGRES_PASSWORD", "postgres")
postgres_db = os.getenv("POSTGRES_DB", "postgres")
from server.exceptions import DatabaseNotConfiguredException
class PostgresConnector:
"""
Simple PostgreSQL connector (single connection).
"""
def __init__(self):
try:
self.connection = psycopg2.connect(
host=postgres_host,
port=postgres_port,
user=postgres_user,
password=postgres_password,
database=postgres_db,
)
except psycopg2.OperationalError as e:
raise DatabaseNotConfiguredException(
f"Ensure database is up and running: {e}"
)
self.connection.autocommit = False
def execute(self, query, params=None, fetch=False) -> list:
try:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
result = cursor.fetchall() if fetch else None
self.connection.commit()
return result
except Exception:
self.connection.rollback()
raise
def execute_batch(self, query, values):
try:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
execute_batch(cursor, query, values)
self.connection.commit()
except Exception:
self.connection.rollback()
raise
def close(self):
if self.connection:
self.connection.close()