Files
gander/web_server/blueprints/oauth.py
Christopher Ahern 2758be8680
Some checks are pending
CI / build (3.10) (push) Waiting to run
CI / build (3.8) (push) Waiting to run
CI / build (3.9) (push) Waiting to run
Fix/pylint cleanup (#8)
* Fix pylint warnings across all 24 Python files in web_server

- Add module, class, and function docstrings (C0114, C0115, C0116)
- Fix import ordering: stdlib before third-party before local (C0411)
- Replace wildcard imports with explicit named imports (W0401)
- Remove trailing whitespace and add missing final newlines (C0303, C0304)
- Replace dict() with dict literals (R1735)
- Remove unused imports and variables (W0611, W0612)
- Narrow broad Exception catches to specific exceptions (W0718)
- Replace f-string logging with lazy % formatting (W1203)
- Fix variable naming: UPPER_CASE for constants, snake_case for locals (C0103)
- Add pylint disable comments for necessary global statements (W0603)
- Fix no-else-return, simplifiable-if-expression, singleton-comparison
- Fix bad indentation in stripe.py (W0311)
- Add encoding="utf-8" to open() calls (W1514)
- Add check=True to subprocess.run() calls (W1510)
- Register Celery task modules via conf.include

* Update `package-lock.json` add peer dependencies
2026-02-07 20:57:28 +00:00

163 lines
5.3 KiB
Python

"""OAuth blueprint for Google authentication."""
from os import getenv
from secrets import token_hex, token_urlsafe
from random import randint
from authlib.integrations.flask_client import OAuth, OAuthError
from flask import Blueprint, jsonify, session, redirect, request
from blueprints.user import get_session_info_email
from database.database import Database
from dotenv import load_dotenv
from utils.path_manager import PathManager
oauth_bp = Blueprint("oauth", __name__)
_google = None
load_dotenv()
url_api = getenv("VITE_API_URL")
url = getenv("HOMEPAGE_URL")
path_manager = PathManager()
def init_oauth(app):
"""
Initialise the OAuth functionality.
"""
oauth = OAuth(app)
global _google # pylint: disable=global-statement
_google = oauth.register(
'google',
client_id=app.config['GOOGLE_CLIENT_ID'],
client_secret=app.config['GOOGLE_CLIENT_SECRET'],
authorize_url='https://accounts.google.com/o/oauth2/auth',
access_token_url='https://oauth2.googleapis.com/token',
client_kwargs={
'scope': 'openid profile email',
'prompt': 'select_account' # Forces account selection even if already logged in
},
api_base_url='https://www.googleapis.com/oauth2/v1/',
userinfo_endpoint='https://openidconnect.googleapis.com/v1/userinfo',
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
redirect_uri=f"{url}/api/google_auth"
)
@oauth_bp.route('/login/google')
def login_google():
"""
Redirects to Google's OAuth authorization page
"""
# Create both nonce and state
session["nonce"] = token_urlsafe(16)
session["state"] = token_urlsafe(32)
session["origin"] = request.args.get("next")
# Make sure session is saved before redirect
session.modified = True
return _google.authorize_redirect(
redirect_uri=f'{url}/api/google_auth',
nonce=session['nonce'],
state=session['state']
)
@oauth_bp.route('/google_auth')
def google_auth():
"""
Receives token from Google OAuth and authenticates it to validate login
"""
try:
# Check state parameter before authorizing
returned_state = request.args.get('state')
stored_state = session.get('state')
if not stored_state or stored_state != returned_state:
print(
f"State mismatch: stored={stored_state}, "
f"returned={returned_state}", flush=True
)
return jsonify({
'error': "mismatching_state: CSRF Warning! "
"State not equal in request and response.",
'message': 'Authentication failed'
}), 400
# State matched, proceed with token authorization
token = _google.authorize_access_token()
# Verify nonce
nonce = session.get('nonce')
if not nonce:
return jsonify({'error': 'Missing nonce in session'}), 400
user = _google.parse_id_token(token, nonce=nonce)
# Check if email exists to login else create a database entry
user_email = user.get("email")
user_data = get_session_info_email(user_email)
if not user_data:
with Database() as db:
# Generates a new username for the user
for _ in range(1000000):
username = user.get("given_name") + \
str(randint(1, 1000000))
taken = db.fetchone("""
SELECT * FROM users
WHERE username = ?
""", (username,))
if not taken:
break
db.execute(
"""INSERT INTO users
(username, email, stream_key)
VALUES (?, ?, ?)""",
(
username,
user_email,
token_hex(32),
)
)
user_data = get_session_info_email(user_email)
path_manager.create_user(username)
# Store origin, username and user_id before clearing session
origin = session.get("origin", f"{url.replace('/api', '')}")
username = user_data["username"]
user_id = user_data["user_id"]
# Clear session and set new data
session.clear()
session["username"] = username
session["user_id"] = user_id
# Ensure session is saved
session.modified = True
print(
f"session: {session.get('username')}. "
f"user_id: {session.get('user_id')}", flush=True
)
return redirect(origin)
except OAuthError as e:
print(f"OAuth Error: {str(e)}", flush=True)
return jsonify({
'message': 'Authentication failed',
'error': str(e)
}), 400
except (ValueError, TypeError, KeyError) as e:
print(f"Unexpected Error: {str(e)}", flush=True)
return jsonify({
'message': 'An unexpected error occurred',
'error': str(e)
}), 500