Files
gander/web_server/blueprints/authentication.py
Christopher Ahern 2758be8680
Some checks are pending
CI / build (3.10) (push) Waiting to run
CI / build (3.8) (push) Waiting to run
CI / build (3.9) (push) Waiting to run
Fix/pylint cleanup (#8)
* Fix pylint warnings across all 24 Python files in web_server

- Add module, class, and function docstrings (C0114, C0115, C0116)
- Fix import ordering: stdlib before third-party before local (C0411)
- Replace wildcard imports with explicit named imports (W0401)
- Remove trailing whitespace and add missing final newlines (C0303, C0304)
- Replace dict() with dict literals (R1735)
- Remove unused imports and variables (W0611, W0612)
- Narrow broad Exception catches to specific exceptions (W0718)
- Replace f-string logging with lazy % formatting (W1203)
- Fix variable naming: UPPER_CASE for constants, snake_case for locals (C0103)
- Add pylint disable comments for necessary global statements (W0603)
- Fix no-else-return, simplifiable-if-expression, singleton-comparison
- Fix bad indentation in stripe.py (W0311)
- Add encoding="utf-8" to open() calls (W1514)
- Add check=True to subprocess.run() calls (W1510)
- Register Celery task modules via conf.include

* Update `package-lock.json` add peer dependencies
2026-02-07 20:57:28 +00:00

256 lines
7.6 KiB
Python

"""Authentication blueprint for user signup, login, and logout."""
import logging
from secrets import token_hex
from flask import Blueprint, session, request, jsonify
from werkzeug.security import generate_password_hash, check_password_hash
from flask_cors import cross_origin
from database.database import Database
from blueprints.middleware import login_required
from utils.user_utils import get_user_id
from utils.utils import sanitize
from utils.path_manager import PathManager
auth_bp = Blueprint("auth", __name__)
path_manager = PathManager()
logger = logging.getLogger(__name__)
@auth_bp.route("/signup", methods=["POST"])
@cross_origin(supports_credentials=True)
def signup():
"""
Route that allows a user to sign up by providing a
`username`, `email` and `password`.
"""
# ensure a JSON request is made to contact this route
if not request.is_json:
return jsonify({"message": "Expected JSON data"}), 400
# Extract data from request via JSON
data = request.get_json()
username = data.get('username')
email = data.get('email')
password = data.get('password')
# Validation - ensure all fields exist, users cannot have an empty field
if not all([username, email, password]):
error_fields = get_error_fields(
[username, email, password]
)
return jsonify({
"account_created": False,
"error_fields": error_fields,
"message": "Missing required fields"
}), 400
# Sanitize the inputs - helps to prevent SQL injection
try:
username = sanitize(username, "username")
email = sanitize(email, "email")
password = sanitize(password, "password")
except ValueError:
error_fields = get_error_fields([username, email, password])
return jsonify({
"account_created": False,
"error_fields": error_fields,
"message": "Invalid input received"
}), 400
# Create a connection to the database
db = Database()
try:
# Check for duplicate email/username, no two users can have the same
dup_email = db.fetchone(
"SELECT * FROM users WHERE email = ?",
(email,)
)
dup_username = db.fetchone(
"SELECT * FROM users WHERE username = ?",
(username,)
)
if dup_email is not None:
return jsonify({
"account_created": False,
"error_fields": ["email"],
"message": f"Email already taken: {email}"
}), 400
if dup_username is not None:
return jsonify({
"account_created": False,
"error_fields": ["username"],
"message": "Username already taken"
}), 400
# Create new user once input is validated
db.execute(
"""INSERT INTO users
(username, password, email, stream_key)
VALUES (?, ?, ?, ?)""",
(
username,
generate_password_hash(password),
email,
token_hex(32)
)
)
# Create user directories for stream data
path_manager.create_user(username)
return jsonify({
"account_created": True,
"message": "Account created successfully"
}), 201
except (ValueError, TypeError, KeyError) as exc:
logger.error("Error during signup: %s", exc)
return jsonify({
"account_created": False,
"message": "Server error occurred: " + str(exc)
}), 500
finally:
db.close_connection()
@auth_bp.route("/login", methods=["POST"])
@cross_origin(supports_credentials=True)
def login():
"""
Login to the web app with existing credentials.
"""
# ensure a JSON request is made to contact this route
if not request.is_json:
return jsonify({"message": "Expected JSON data"}), 400
# Extract data from request via JSON
data = request.get_json()
username = data.get('username')
password = data.get('password')
# Validation - ensure all fields exist, users cannot have an empty field
if not all([username, password]):
return jsonify({
"logged_in": False,
"message": "Missing required fields"
}), 400
# Sanitize the inputs - helps to prevent SQL injection
try:
username = sanitize(username, "username")
password = sanitize(password, "password")
except ValueError:
return jsonify({
"account_created": False,
"error_fields": ["username", "password"],
"message": "Invalid input received"
}), 400
# Create a connection to the database
db = Database()
try:
# Check if user exists, only existing users can be logged in
user = db.fetchone(
"SELECT * FROM users WHERE username = ?",
(username,)
)
if not user:
return jsonify({
"logged_in": False,
"error_fields": ["username", "password"],
"message": "Invalid username or password"
}), 401
# Verify password matches the password associated with that user
if not check_password_hash(user["password"], password):
return jsonify({
"logged_in": False,
"error_fields": ["username", "password"],
"message": "Invalid username or password"
}), 401
# Add user directories for stream data in case they don't exist
path_manager.create_user(username)
# Set up session
session.clear()
session["username"] = username
session["user_id"] = get_user_id(username)
logger.info(
"Logged in as %s. session: %s. user_id: %s",
username, session.get('username'), session.get('user_id')
)
# User has been logged in, let frontend know that
return jsonify({
"logged_in": True,
"message": "Login successful",
"username": username
}), 200
except (ValueError, TypeError, KeyError) as exc:
logger.error("Error during login: %s", exc)
return jsonify({
"logged_in": False,
"message": "Server error occurred"
}), 500
finally:
db.close_connection()
@auth_bp.route("/logout")
@login_required
def logout() -> dict:
"""
Log out and clear the users session.
If the user is currently streaming, end their stream first.
Can only be accessed by a logged in user.
"""
from utils.stream_utils import end_user_stream
# Check if user is currently streaming
user_id = session.get("user_id")
username = session.get("username")
with Database() as db:
is_streaming = db.fetchone(
"""SELECT is_live FROM users WHERE user_id = ?""",
(user_id,)
)
if is_streaming and is_streaming.get("is_live") == 1:
# Get the user's stream key
stream_key_info = db.fetchone(
"""SELECT stream_key FROM users WHERE user_id = ?""",
(user_id,)
)
stream_key = (
stream_key_info.get("stream_key") if stream_key_info
else None
)
if stream_key:
# End the stream
end_user_stream(stream_key, user_id, username)
session.clear()
return {"logged_in": False}
def get_error_fields(values: list):
"""Return field names for empty values."""
fields = ["username", "email", "password"]
return [fields[i] for i, v in enumerate(values) if not v]