130 lines
4.5 KiB
Python
130 lines
4.5 KiB
Python
from flask import Flask, render_template, session, request, url_for, redirect, g
|
|
from flask_session import Session
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from functools import wraps
|
|
|
|
from core.forms import SignupForm, LoginForm
|
|
from database.database import Database
|
|
|
|
app = Flask(__name__, template_folder="../ui/templates/")
|
|
app.config["SECRET_KEY"] = ""
|
|
app.config["SESSION_PERMANENT"] = False
|
|
app.config["SESSION_TYPE"] = "filesystem"
|
|
|
|
Session(app)
|
|
|
|
@app.before_request
|
|
def logged_in_user():
|
|
g.user = session.get("username", None)
|
|
g.admin = session.get("username", None)
|
|
|
|
def login_required(view):
|
|
"""add at start of routes where users need to be logged in to access"""
|
|
@wraps(view)
|
|
def wrapped_view(*args, **kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for("login", next=request.url))
|
|
return view(*args, **kwargs)
|
|
return wrapped_view
|
|
|
|
def admin_required(view):
|
|
"""add at start of routes where admins need to be logged in to access"""
|
|
@wraps(view)
|
|
def wrapped_view(*args, **kwargs):
|
|
if g.admin != "admin":
|
|
return redirect(url_for("login", next=request.url))
|
|
return view(*args, **kwargs)
|
|
return wrapped_view
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""
|
|
Home page of the platform
|
|
|
|
Contains a list of some of the streams that are currently live and the most popular categories.
|
|
"""
|
|
|
|
return render_template('index.html')
|
|
|
|
@app.route("/signup", methods=["GET", "POST"])
|
|
def signup():
|
|
form = SignupForm()
|
|
if form.validate_on_submit():
|
|
# Retrieve data from the sign up form
|
|
username = form.username.data
|
|
email = form.email.data
|
|
password = form.password.data
|
|
password2 = form.password2.data
|
|
|
|
# Store in database and hash to avoid exposing sensitive information
|
|
db = Database()
|
|
cursor = db.create_connection()
|
|
|
|
# Check if user already exists to avoid duplicates
|
|
dup_email = cursor.execute("""SELECT * FROM users
|
|
WHERE email = ?;""", (email,)).fetchone()
|
|
dup_username = cursor.execute("""SELECT * FROM users
|
|
WHERE username = ?;""", (username,)).fetchone()
|
|
|
|
if dup_email is not None:
|
|
form.email.errors.append("Email already taken.")
|
|
elif dup_username is not None:
|
|
form.username.errors.append("Username already taken.")
|
|
elif password != password2:
|
|
form.password.errors.append("Passwords must match.")
|
|
else:
|
|
cursor.execute("""INSERT INTO users (username, password, email, num_followers, isPartenered, bio)
|
|
VALUES (?, ?, ?, ?, ?, ?);""", (username, generate_password_hash(password), email, 0, 0, "This user does not have a Bio."))
|
|
db.commit_data()
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
# Close connection to prevent data leaks
|
|
db.close_connection()
|
|
|
|
return render_template("signup.html", form=form)
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
form = LoginForm()
|
|
if form.validate_on_submit():
|
|
# Retrieve data from the login form
|
|
username = form.username.data
|
|
password = form.username.data
|
|
|
|
# Compare with database
|
|
db = Database()
|
|
cursor = db.create_connection()
|
|
|
|
# Check if user exists so only users who have signed up can login
|
|
user_exists = cursor.execute("""SELECT * FROM users
|
|
WHERE username = ?;""", (username,)).fetchone()
|
|
db.close_connection()
|
|
|
|
if not user_exists:
|
|
form.username.errors.append("Incorrect username or password.")
|
|
|
|
# Check is hashed passwords match to verify the user logging in
|
|
elif not check_password_hash(user_exists["password"], password):
|
|
form.username.errors.append("Incorrect username or password.")
|
|
|
|
else:
|
|
# Create a new session to prevent users from exploiting horizontal access control
|
|
session.clear()
|
|
session["username"] = username
|
|
|
|
# Return to previous page if applicable
|
|
next_page = request.args.get("next")
|
|
|
|
# Otherwise return home
|
|
if not next_page:
|
|
next_page = url_for("index")
|
|
return redirect(next_page)
|
|
|
|
return render_template("login.html", form=form)
|
|
|
|
@app.route("/logout")
|
|
@login_required
|
|
def logout():
|
|
session.clear()
|
|
return redirect(url_for("index")) |