Files
crosspost/server/core/auth.py

62 lines
2.1 KiB
Python

import re
from server.db.database import PostgresConnector
from flask_bcrypt import Bcrypt
EMAIL_REGEX = re.compile(r"[^@]+@[^@]+\.[^@]+")
class AuthManager:
def __init__(self, db: PostgresConnector, bcrypt: Bcrypt):
self.db = db
self.bcrypt = bcrypt
# private
def _save_user(self, username, email, password_hash):
query = """
INSERT INTO users (username, email, password_hash)
VALUES (%s, %s, %s)
"""
self.db.execute(query, (username, email, password_hash))
# public
def register_user(self, username, email, password):
hashed_password = self.bcrypt.generate_password_hash(password).decode("utf-8")
if len(username) < 3:
raise ValueError("Username must be longer than 3 characters")
if not EMAIL_REGEX.match(email):
raise ValueError("Please enter a valid email address")
if self.get_user_by_email(email):
raise ValueError("Email already registered")
if self.get_user_by_username(username):
raise ValueError("Username already taken")
self._save_user(username, email, hashed_password)
def authenticate_user(self, username, password):
user = self.get_user_by_username(username)
if user and self.bcrypt.check_password_hash(user["password_hash"], password):
return user
return None
def get_user_by_id(self, user_id):
query = "SELECT id, username, email FROM users WHERE id = %s"
result = self.db.execute(query, (user_id,), fetch=True)
return result[0] if result else None
def get_user_by_username(self, username) -> dict:
query = (
"SELECT id, username, email, password_hash FROM users WHERE username = %s"
)
result = self.db.execute(query, (username,), fetch=True)
return result[0] if result else None
def get_user_by_email(self, email) -> dict:
query = "SELECT id, username, email, password_hash FROM users WHERE email = %s"
result = self.db.execute(query, (email,), fetch=True)
return result[0] if result else None