Fix: Imports on backend

Fix: Connected Backend to Frontend over localhost
This commit is contained in:
Chris-1010
2025-01-21 23:54:11 +00:00
parent 2cda918e72
commit 1ede987914
101 changed files with 354 additions and 18641 deletions

View File

@@ -0,0 +1,26 @@
from flask import Flask
from flask_session import Session
from backend.blueprints.utils import logged_in_user
from flask_cors import CORS
def create_app():
app = Flask(__name__, template_folder="../ui/templates/", static_folder="../ui/static/")
app.config["SECRET_KEY"] = ""
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
#! ↓↓↓ For development purposes only
CORS(app) # Allow cross-origin requests for the frontend
Session(app)
app.before_request(logged_in_user)
with app.app_context():
from backend.blueprints.authentication import auth_bp
from backend.blueprints.main import main_bp
from backend.blueprints.stripe import stripe_bp
app.register_blueprint(auth_bp)
app.register_blueprint(main_bp)
app.register_blueprint(stripe_bp)
return app

View File

@@ -0,0 +1,92 @@
from flask import Blueprint, render_template, session, request, url_for, redirect, g
from werkzeug.security import generate_password_hash, check_password_hash
from backend.forms import SignupForm, LoginForm
from backend.database.database import Database
from backend.blueprints.utils import login_required
auth_bp = Blueprint("auth", __name__)
@auth_bp.route("/signup", methods=["GET", "POST"])
def signup():
form = SignupForm()
if form.validate_on_submit():
# Retrieve data from the sign up form
username = form.username.data
email = form.email.data
password = form.password.data
password2 = form.password2.data
# Store in database and hash to avoid exposing sensitive information
db = Database()
cursor = db.create_connection()
# Check if user already exists to avoid duplicates
dup_email = cursor.execute("""SELECT * FROM users
WHERE email = ?;""", (email,)).fetchone()
dup_username = cursor.execute("""SELECT * FROM users
WHERE username = ?;""", (username,)).fetchone()
if dup_email is not None:
form.email.errors.append("Email already taken.")
elif dup_username is not None:
form.username.errors.append("Username already taken.")
elif password != password2:
form.password.errors.append("Passwords must match.")
else:
cursor.execute("""INSERT INTO users (username, password, email, num_followers, isPartenered, bio)
VALUES (?, ?, ?, ?, ?, ?);""", (username, generate_password_hash(password), email, 0, 0, "This user does not have a Bio."))
db.commit_data()
return redirect(url_for("auth.login"))
# Close connection to prevent data leaks
db.close_connection()
return render_template("signup.html", form=form)
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
form = LoginForm()
if form.validate_on_submit():
# Retrieve data from the login form
username = form.username.data
password = form.username.data
# Compare with database
db = Database()
cursor = db.create_connection()
# Check if user exists so only users who have signed up can login
user_exists = cursor.execute("""SELECT * FROM users
WHERE username = ?;""", (username,)).fetchone()
if not user_exists:
form.username.errors.append("Incorrect username or password.")
db.close_connection()
# Check is hashed passwords match to verify the user logging in
elif not check_password_hash(user_exists["password"], password):
form.username.errors.append("Incorrect username or password.")
db.close_connection()
else:
# Create a new session to prevent users from exploiting horizontal access control
session.clear()
session["username"] = username
# Return to previous page if applicable
next_page = request.args.get("next")
# Otherwise return home
if not next_page:
next_page = url_for("app.index")
db.close_connection()
return redirect(next_page)
return render_template("login.html", form=form)
@auth_bp.route("/logout")
@login_required
def logout():
session.clear()
return redirect(url_for("index"))

View File

@@ -0,0 +1,42 @@
from flask import render_template, Blueprint
main_bp = Blueprint("app", __name__)
@main_bp.route('/get_streams')
def get_sample_streams():
"""
Returns a list of (sample) streamers live right now
"""
streamers = [
{
"id": 1,
"title": "Gaming Stream",
"streamer": "Gamer123",
"viewers": 1500,
"thumbnail": "assets/images/monkey.png",
},
{
"id": 2,
"title": "Art Stream",
"streamer": "Artist456",
"viewers": 800,
"thumbnail": "assets/images/surface.jpeg",
},
{
"id": 3,
"title": "Music Stream",
"streamer": "Musician789",
"viewers": 2000,
"thumbnail": "assets/images/dance_game.png",
},
{
"id": 4,
"title": "Just Chatting",
"streamer": "Chatty101",
"viewers": 1200,
"thumbnail": "assets/images/elf.webp",
},
]
return streamers

View File

@@ -0,0 +1,38 @@
from flask import render_template, Blueprint, request, jsonify
import stripe
stripe_bp = Blueprint("stripe", __name__)
stripe.api_key = 'sk_test_51QikGlGk6yuk3uA8muEMPjMhUvbZWZiMCYQArZRXcFVn26hbt1kTz5yUVWkk3RQlltArbAXmVmkfEHU2z1Ch5Obv00Y03oT127'
@stripe_bp.route('/create-checkout-session', methods=['POST'])
def create_checkout_session():
try:
session = stripe.checkout.Session.create(
ui_mode = 'embedded',
payment_method_types=['card'],
line_items=[
{
'price': 'price_1QikNCGk6yuk3uA86mZf3dmM', #Subscription ID
'quantity': 1,
},
],
mode='subscription',
redirect_on_completion = 'never'
)
except Exception as e:
print(e)
return str(e)
return jsonify(clientSecret=session.client_secret)
@stripe_bp.route('/session-status', methods=['GET']) # check for payment status
def session_status():
session = stripe.checkout.Session.retrieve(request.args.get('session_id'))
return jsonify(status=session.status, customer_email=session.customer_details.email)
@stripe_bp.route('/checkout', methods=['GET'])
def checkout():
return render_template("checkout.html")

View File

@@ -0,0 +1,24 @@
from flask import redirect, url_for, request, g, session
from functools import wraps
def logged_in_user():
g.user = session.get("username", None)
g.admin = session.get("username", None)
def login_required(view):
"""add at start of routes where users need to be logged in to access"""
@wraps(view)
def wrapped_view(*args, **kwargs):
if g.user is None:
return redirect(url_for("login", next=request.url))
return view(*args, **kwargs)
return wrapped_view
def admin_required(view):
"""add at start of routes where admins need to be logged in to access"""
@wraps(view)
def wrapped_view(*args, **kwargs):
if g.admin != "admin":
return redirect(url_for("login", next=request.url))
return view(*args, **kwargs)
return wrapped_view

View File

@@ -0,0 +1,22 @@
import sqlite3
import os
class Database:
def __init__(self) -> None:
self._db = os.path.join(os.path.abspath(os.path.dirname(__file__)), "app.db")
def create_connection(self) -> sqlite3.Cursor:
conn = sqlite3.connect(self._db)
conn.row_factory = sqlite3.Row
self._conn = conn
cursor = conn.cursor()
return cursor
def commit_data(self):
try:
self._conn.commit()
except Exception as e:
print(e)
def close_connection(self) -> None:
self._conn.close()

View File

@@ -0,0 +1,36 @@
DROP TABLE IF EXISTS users;
CREATE TABLE users
(
username VARCHAR(50) PRIMARY KEY NOT NULL,
password VARCHAR(256) NOT NULL,
email VARCHAR(64) NOT NULL,
num_followers INTEGER NOT NULL,
isPartenered BOOLEAN NOT NULL DEFAULT 0,
bio TEXT
);
SELECT * FROM users;
DROP TABLE IF EXISTS streams;
CREATE TABLE streams
(
stream_id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
start_time DATETIME NOT NULL,
num_viewers INT NOT NULL DEFAULT 0,
isLive BOOLEAN NOT NULL DEFAULT 0,
vod_id INT,
streamer_id VARCHAR NOT NULL,
FOREIGN KEY (streamer_id) REFERENCES users(username) ON DELETE CASCADE
);
DROP TABLE IF EXISTS follows;
CREATE TABLE follows
(
user_id INT NOT NULL,
following_id INT NOT NULL,
PRIMARY KEY (user_id, following_id),
FOREIGN KEY (user_id) REFERENCES users(username) ON DELETE CASCADE,
FOREIGN KEY (following_id) REFERENCES users(username) ON DELETE CASCADE
);

View File

@@ -0,0 +1,15 @@
from flask_wtf import FlaskForm
from wtforms import SubmitField, StringField, EmailField, PasswordField
from wtforms.validators import InputRequired, EqualTo
class SignupForm(FlaskForm):
username = StringField("Username:", validators=[InputRequired()])
email = EmailField("Email:", validators=[InputRequired()])
password = PasswordField("Password:", validators=[InputRequired()])
password2 = PasswordField("Confirm Password:", validators=[InputRequired(), EqualTo("password")])
submit = SubmitField("Submit")
class LoginForm(FlaskForm):
username = StringField("Username:", validators=[InputRequired()])
password = PasswordField("Password:", validators=[InputRequired()])
submit = SubmitField("Submit")