From ed438e6c2f9e629956f44073969c6957ff1683e1 Mon Sep 17 00:00:00 2001 From: JustIceO7 Date: Mon, 27 Jan 2025 19:32:46 +0000 Subject: [PATCH] FEAT: Got started on the user forgot password feature as well as added error handling --- web_server/blueprints/user.py | 18 +++++- web_server/utils/user_utils.py | 103 +++++++++++++++++++++++++++------ 2 files changed, 100 insertions(+), 21 deletions(-) diff --git a/web_server/blueprints/user.py b/web_server/blueprints/user.py index a313f3c..aa7943f 100644 --- a/web_server/blueprints/user.py +++ b/web_server/blueprints/user.py @@ -1,5 +1,5 @@ from flask import Blueprint, jsonify, session -from utils.user_utils import is_subscribed, is_following, subscription_expiration +from utils.user_utils import is_subscribed, is_following, subscription_expiration, verify_token, reset_password user_bp = Blueprint("user", __name__) @@ -50,9 +50,23 @@ def authenticate_user() -> dict: @user_bp.route('/forgot_password', methods=['POST']) -def forgot_password(): +def user_forgot_password(): """ Will send link to email to reset password by looking at the user_id within session to see whos password should be reset Creates a super random number to be used a the link to reset password I guess a random number generator seeded with a secret """ return + +@user_bp.route('/reset_password//') +def user_reset_password(token, new_password): + """ + Given token and new password resets the users password + """ + email = verify_token(token) + if email: + response = reset_password(new_password, email) + if response: + return "Success" + else: + return "Failure" + return "Failure" diff --git a/web_server/utils/user_utils.py b/web_server/utils/user_utils.py index f118f91..42848cd 100644 --- a/web_server/utils/user_utils.py +++ b/web_server/utils/user_utils.py @@ -1,6 +1,13 @@ from database.database import Database from typing import Optional from datetime import datetime +from itsdangerous import URLSafeTimedSerializer +from os import getenv +from werkzeug.security import generate_password_hash, check_password_hash +from dotenv import load_dotenv +load_dotenv() + +serializer = URLSafeTimedSerializer(getenv("AUTH_SECRET_KEY")) def get_user_id(username: str) -> Optional[int]: """ @@ -8,8 +15,16 @@ def get_user_id(username: str) -> Optional[int]: """ db = Database() cursor = db.create_connection() - data = cursor.execute("SELECT user_id FROM user WHERE username = ?", (username,)).fetchone() - return data[0] if data else None + + try: + data = cursor.execute( + "SELECT user_id FROM user WHERE username = ?", + (username,) + ).fetchone() + return data[0] if data else None + except Exception as e: + print(f"Error: {e}") + return None def get_username(user_id: str) -> Optional[str]: """ @@ -17,27 +32,47 @@ def get_username(user_id: str) -> Optional[str]: """ db = Database() cursor = db.create_connection() - data = cursor.execute("SELECT username FROM user WHERE username = ?", (user_id,)).fetchone() - return data[0] if data else None + + try: + data = cursor.execute( + "SELECT username FROM user WHERE user_id = ?", + (user_id,) + ).fetchone() + return data[0] if data else None + except Exception as e: + print(f"Error: {e}") + return None def is_subscribed(user_id: int, streamer_id: int) -> bool: """ - Returns True if user is subscribed to a streamer else False + Returns True if user is subscribed to a streamer, else False """ db = Database() cursor = db.create_connection() - return bool(cursor.execute( - "SELECT 1 FROM subscribes WHERE user_id = ? AND streamer_id = ? AND expires > since", - (user_id, streamer_id) - ).fetchone()) -def is_following(user_id: int, streamer_id:int) -> bool: + try: + result = cursor.execute( + "SELECT 1 FROM subscribes WHERE user_id = ? AND streamer_id = ? AND expires > ?", + (user_id, streamer_id, datetime.now()) + ).fetchone() + return bool(result) + except Exception as e: + print(f"Error: {e}") + return False + +def is_following(user_id: int, streamer_id: int) -> bool: db = Database() cursor = db.create_connection() - return bool(cursor.execute( - "SELECT 1 FROM follows WHERE user_id = ? AND streamer_id = ?", - (user_id, streamer_id) - ).fetchone()) + + try: + result = cursor.execute( + "SELECT 1 FROM follows WHERE user_id = ? AND streamer_id = ?", + (user_id, streamer_id) + ).fetchone() + return bool(result) + except Exception as e: + print(f"Error: {e}") + return False def subscription_expiration(user_id: int, streamer_id: int) -> int: """ @@ -45,11 +80,41 @@ def subscription_expiration(user_id: int, streamer_id: int) -> int: """ db = Database() cursor = db.create_connection() - data = cursor.execute( - "SELECT expires from subscriptions WHERE user_id = ? AND streamer_id = ? AND expires > since", (user_id,streamer_id)).fetchone() remaining_time = 0 - if data: - expiration_date = data[0] + try: + data = cursor.execute( + "SELECT expires from subscriptions WHERE user_id = ? AND streamer_id = ? AND expires > since", (user_id,streamer_id)).fetchone() + if data: + expiration_date = data[0] + + remaining_time = (expiration_date - datetime.now()).seconds + except Exception as e: + print(f"Error: {e}") - remaining_time = (expiration_date - datetime.now()).seconds return remaining_time + +def verify_token(token: str): + """ + Given a token verifies token and decodes the token into an email + """ + try: + email = serializer.loads(token, salt='1', max_age=3600) + return email + except Exception as e: + print(f"Error: {e}") + return False + +def reset_password(new_password: str, email: str): + """ + Given email and new password reset the password for a given user + """ + db = Database() + cursor = db.create_connection() + + try: + cursor.execute("UPDATE users SET password = ? WHERE email = ?", (generate_password_hash(new_password), email)) + db.commit() + return True + except Exception as e: + print(f"Error: {e}") + return False \ No newline at end of file