From a8af24e2569da1c600317a15ed989a965aad351c Mon Sep 17 00:00:00 2001 From: ThisBirchWood Date: Wed, 5 Feb 2025 21:04:05 +0000 Subject: [PATCH] UPDATE: Refactored backend routes to improve readability --- docker-compose.yml | 1 - web_server/Dockerfile | 2 +- web_server/blueprints/__init__.py | 1 + web_server/blueprints/streams.py | 157 ++++++++++------------- web_server/celery_tasks/__init__.py | 4 +- web_server/database/app.db | Bin 73728 -> 81920 bytes web_server/database/streaming.sql | 22 +++- web_server/database/testing_data.sql | 63 +++++---- web_server/utils/recommendation_utils.py | 40 +++--- web_server/utils/stream_utils.py | 131 ++++++++++++------- 10 files changed, 228 insertions(+), 193 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index d602e05..ebb67d7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,7 +13,6 @@ services: - app_network volumes: - stream_data:/stream_data - web_server: build: context: ./web_server diff --git a/web_server/Dockerfile b/web_server/Dockerfile index efe9e31..805b2db 100644 --- a/web_server/Dockerfile +++ b/web_server/Dockerfile @@ -21,4 +21,4 @@ COPY . . ENV FLASK_APP=blueprints.__init__ ENV FLASK_DEBUG=True -CMD ["python", "-c", "from blueprints.socket import socketio; from blueprints.__init__ import create_app; app = create_app(); socketio.run(app, host='0.0.0.0', port=5000, debug=True)"] \ No newline at end of file +CMD ["python", "-c", "from blueprints.socket import socketio; from blueprints.__init__ import create_app; app = create_app(); app.debug = True; socketio.run(app, host='0.0.0.0', port=5000, debug=True)"] \ No newline at end of file diff --git a/web_server/blueprints/__init__.py b/web_server/blueprints/__init__.py index a681075..30382b2 100644 --- a/web_server/blueprints/__init__.py +++ b/web_server/blueprints/__init__.py @@ -28,6 +28,7 @@ def create_app(): app.config["SECRET_KEY"] = getenv("FLASK_SECRET_KEY") app.config["SESSION_PERMANENT"] = False app.config["SESSION_TYPE"] = "filesystem" + app.config["PROPAGATE_EXCEPTIONS"] = True app.config.from_mapping( CELERY=dict( diff --git a/web_server/blueprints/streams.py b/web_server/blueprints/streams.py index fd26b01..f0a6c62 100644 --- a/web_server/blueprints/streams.py +++ b/web_server/blueprints/streams.py @@ -1,23 +1,8 @@ from flask import Blueprint, session, jsonify, g, request, redirect, abort, send_from_directory -from utils.stream_utils import ( - streamer_live_status, - streamer_most_recent_stream, - user_stream, - followed_live_streams, - followed_streamers, - stream_tags, - streamer_data -) +from utils.stream_utils import * from utils.user_utils import get_user_id from blueprints.utils import login_required -from utils.recommendation_utils import ( - default_recommendations, - recommendations_based_on_category, - user_recommendation_category, - followed_categories_recommendations, - category_recommendations, - user_category_recommendations -) +from utils.recommendation_utils import * from utils.utils import most_popular_category from database.database import Database from datetime import datetime @@ -29,137 +14,128 @@ stream_bp = Blueprint("stream", __name__) # Constants THUMBNAIL_GENERATION_INTERVAL = 180 -@stream_bp.route('/get_streams') -def get_sample_streams() -> list[dict]: +@stream_bp.route('/streams/popular/') +def get_popular_streams(no_streams) -> list[dict]: """ Returns a list of streams live now with the highest viewers """ - # shows default recommended streams for non-logged in users based on highest viewers - streams = default_recommendations() - for stream in streams: - stream['tags'] = stream_tags(stream["stream_id"]) + # Limit the number of streams to MAX_STREAMS + MAX_STREAMS = 100 + if no_streams < 1: + return jsonify([]) + elif no_streams > MAX_STREAMS: + no_streams = MAX_STREAMS + # Get the highest viewed streams + streams = get_highest_view_streams(no_streams) return jsonify(streams) + @login_required -@stream_bp.route('/get_recommended_streams') +@stream_bp.route('/streams/recommended') def get_recommended_streams() -> list[dict]: """ Queries DB to get a list of recommended streams using an algorithm """ - user_id = session.get("username") - category = user_recommendation_category(user_id) - streams = recommendations_based_on_category(category) - for stream in streams: - stream['tags'] = stream_tags(stream["stream_id"]) - return jsonify(streams) + user_id = session.get("user_id") -@stream_bp.route('/get_categories') -def get_categories() -> list[dict]: - """ - Returns a list of top 5 most popular categories - """ + # Get the user's most popular categories + category = get_user_preferred_category(user_id) + streams = get_streams_based_on_category(category) + return streams - category_data = category_recommendations() +@stream_bp.route('/categories/popular/') +def get_popular_categories(no_categories) -> list[dict]: + """ + Returns a list of most popular categories + """ + # Limit the number of categories to 100 + if no_categories < 1: + return jsonify([]) + elif no_categories > 100: + no_categories = 100 + + category_data = get_highest_view_categories(no_categories) return jsonify(category_data) @login_required -@stream_bp.route('/get_recommended_categories') +@stream_bp.route('/categories/recommended') def get_recommended_categories() -> list | list[dict]: """ Queries DB to get a list of recommended categories for the user """ user_id = session.get("user_id") - categories = user_category_recommendations(user_id) - return categories + categories = get_user_category_recommendations(user_id) + return jsonify(categories) -@stream_bp.route('/get_streamer_data/') -def get_streamer_data(streamer_username): +@stream_bp.route('/user/') +def get_user_data(username): """ - Returns a given streamer's data + Returns a given user's data """ - streamer_id = get_user_id(streamer_username) - if not streamer_id: + user_id = get_user_id(username) + if not user_id: abort(404) - data = streamer_data(streamer_id) - return data + data = get_streamer_data(user_id) + return jsonify(data) -@stream_bp.route('/streamer//status') -def get_streamer_status(streamer_username): +@stream_bp.route('/user//status') +def get_user_live_status(streamer_username): """ Returns a streamer's status, if they are live or not and their most recent stream (their current stream if live) """ user_id = get_user_id(streamer_username) - if not user_id: - abort(404) + is_live = True if get_streamer_live_status(user_id)['is_live'] else False + + most_recent_vod = get_latest_vod(user_id) - is_live = True if streamer_live_status(user_id)['isLive'] else False - most_recent_stream = streamer_most_recent_stream(user_id)['stream_id'] - - if not most_recent_stream: - most_recent_stream = None + if not most_recent_vod: + most_recent_vod = None + else: + most_recent_vod = most_recent_vod['vod_id'] return jsonify({ "is_live": is_live, - "most_recent_stream": most_recent_stream + "most_recent_stream": most_recent_vod }) -@stream_bp.route('/get_stream_data/') -def get_stream(streamer_username): +@stream_bp.route('/user//vods') +def get_vods(streamer_username): """ - Returns a streamer's most recent stream data + Returns a JSON of all the vods of a streamer """ user_id = get_user_id(streamer_username) - if not user_id: - abort(404) - - return jsonify(streamer_most_recent_stream(user_id)) - - -@stream_bp.route('/get_stream_data//') -def get_specific_stream(streamer_username, stream_id): - """ - Returns a streamer's stream data given stream_id - """ - user_id = get_user_id(streamer_username) - stream = user_stream(user_id, stream_id) - if stream: - return jsonify(stream) - - return jsonify({'error': 'Stream not found'}), 404 + vods = get_user_vods(user_id) + return jsonify(vods) @login_required -@stream_bp.route('/get_followed_category_streams') +@stream_bp.route('/categories/following') def get_following_categories_streams(): """ Returns popular streams in categories which the user followed """ - streams = followed_categories_recommendations(get_user_id(session.get('username'))) - - for stream in streams: - stream['tags'] = stream_tags(stream["stream_id"]) + streams = followed_categories_recommendations(session.get('user_id')) return jsonify(streams) @login_required -@stream_bp.route('/get_followed_streamers') -def get_followed_streamers(): +@stream_bp.route('/users/following') +def get_followed_streamers_(): """ Queries DB to get a list of followed streamers """ - username = session.get('username') - user_id = get_user_id(username) + user_id = session.get('user_id') - live_following_streams = followed_streamers(user_id) + live_following_streams = get_followed_streamers(user_id) return live_following_streams ## RTMP Server Routes @@ -197,12 +173,17 @@ def end_stream(): Ends a stream """ db = Database() + + # get stream key user_info = db.fetchone("""SELECT user_id FROM users WHERE stream_key = ?""", (request.form.get("name"),)) + stream_info = db.fetchone("""SELECT stream_id FROM streams WHERE user_id = ?""", (user_info["user_id"],)) if not user_info: return "Unauthorized", 403 - # Set stream to not live - db.execute("""UPDATE streams SET isLive = 0 WHERE user_id = ? AND isLive = 1""", (user_info["user_id"],)) + # Remove stream from database + db.execute("""DELETE FROM streams WHERE user_id = ?""", (user_info["user_id"],)) + + # return "Stream ended", 200 \ No newline at end of file diff --git a/web_server/celery_tasks/__init__.py b/web_server/celery_tasks/__init__.py index e00c422..284b71b 100644 --- a/web_server/celery_tasks/__init__.py +++ b/web_server/celery_tasks/__init__.py @@ -1,5 +1,5 @@ from celery import Celery, shared_task, Task -from utils.stream_utils import generate_thumbnail, streamer_live_status +from utils.stream_utils import generate_thumbnail, get_streamer_live_status from time import sleep def celery_init_app(app) -> Celery: @@ -22,7 +22,7 @@ def update_thumbnail(user_id, sleep_time=180) -> None: ffmpeg_wait_time = 5 # check if user is streaming - while streamer_live_status(user_id)['isLive']: + while get_streamer_live_status(user_id)['isLive']: sleep(ffmpeg_wait_time) generate_thumbnail(user_id) sleep(sleep_time - ffmpeg_wait_time) diff --git a/web_server/database/app.db b/web_server/database/app.db index ed438fe549a96a41f6e75252e54bf0e17b3ae8c3..718f8d2e07f9237fc29c26a1c7ff2fc8da3bcd0f 100644 GIT binary patch delta 1903 zcmeHIU1%It6rMBp&YhW^&7LuTtNGhq5@Qm&$!3%IQ)9PlyM@|px^7~ZA{bL!jkF~- zSrkEYbUS*}jST=)lZ( z?sw1q=DYX8Iny7}>1pLeT_yQglB8JG+yx; z`hwo9&MH6Bu>7Gs1E%z;bdg8Q$jLLcevzp==~BWPw#P$|poIx*s2EfxfmvA=OxE)V zY2x#&Lbe_(F0e6>txV~iYPbx(E^DtEtf_&Sr2SkiEb>7a?d?HWHVVhBK{Z%k4=)Vy zE~AU*G}HdJ7Bqi2lii(8X7`8pr1qCi-3qNjEzLjk+p;xakJLe0w)T|f>%fBT&p%^5 zpavTo!Aj;w4;(r4^!UM%{OA*t8(6g?n@SF+!o$h?2U6h;*4O3>i$@H3M6R>#2I#1F z=AJy*9?qtArLw8P9jPH_ezMI;dWzNZDOzO*8llH1Fl)COY;Fc?$nI}|dpCTe{aORG z^Xp1Ao6Juf8a>KCpbZ<#dcorTI6+d@UA&R(B01~MQoji~;PW`Z#KBftgJ;dj0O&m5 zG$OuBO^=FiU+8hri*JO+Oh3i6grDQv_$;Q2w}kIQt%EYDy+;O~NwjF~#2EnaPzi zB#xlCvUe_|_HOOz+ZylVC&r^U;tS(_8}Zp=G~8kaRwcZK-{2KIkME+5GnmH%n89rr z#d_2OzXw*upBDmCrbz)JsuGXId!wt;*5lA_&xrNE8~odY^$=*+s(RbqOH64Y%^9|zTRrK2MkG5XC<7!xq_*)f47Q- zM0wjOv#;!9A=vhRMf*n;jfz(C7H=Xuo#5dcHEmy!X8y!G)N_)~Dy2Z$KV{VTR`qc` zsxGoh_2|u4B<^93{Oi_o8!hCleVzqTXmg^_rYb~T^Ey}Iu=P+0n&60NCjVIv`wMP9 BxIF*> delta 987 zcmaKre~3&`6vyAWKi-?4d*1FEs~zlYvuv0#Z)a@QC}T0?w}dUpn6{$6*> zkN<493{sM4lWciO_75ic%Z66|P|}7_A`yQei>W_y@)s;s4U9lHl!O@o9{6%4{rLtZgrdcKfr z&+acCUIML3XLE&&Jf=^VOI#7XE5EMmP`g~pYwHj7v}L;5LaR4yNv}$8j(9?$kS;dp z&d&P7V!c!G$c(=#3RUI}5$j~VyGX=ZS#qx@VvUWSejvRO^HT;6F@CiP^yEv>m zUBWpood#fcXSP6HT&Y5KPoBD%QtC+Z+)HT?CXUd4x$#nskw_+*l8Hs~L{Id$zT4CI zOJ6(4LTW>U8h*fMID$8EP_;RUjmR0eYkAy?FNj)A(S&6(22jf;J&U3Zag&aFD06Ze ztZ3Yl&(78RI6l?z8-BvK_`)_zBREx8rGRror32&4`BP?p9ICWb?=cO(<7a%2uW`&S z-*g}4*oRp*D-rX$4&QOHLu`|VSN+Yt2G3>B4)+~bQ14|IsGH?MVz>W?*5tqLT>T$L zw1|BUqfF#Pr(!V&Omv72MPCOPQL*PwEX!H{8quzR90AX4(PmTy0&pZIzw%qMq(Ap6 zqIUn6sMZOhvJ*t@a)|N@pU4 Optional[int]: +def get_user_preferred_category(user_id: int) -> Optional[int]: """ Queries user_preferences database to find users favourite streaming category and returns the category """ @@ -23,9 +23,9 @@ def followed_categories_recommendations(user_id: int) -> Optional[List[dict]]: """ with Database() as db: streams = db.fetchall(""" - SELECT stream_id, title, username, num_viewers, category_name + SELECT u.user_id, title, u.username, num_viewers, category_name FROM streams - JOIN users ON users.user_id = streams.user_id + JOIN users u ON streams.user_id = u.user_id JOIN categories ON streams.category_id = categories.category_id WHERE categories.category_id IN (SELECT category_id FROM followed_categories WHERE user_id = ?) ORDER BY num_viewers DESC @@ -34,15 +34,15 @@ def followed_categories_recommendations(user_id: int) -> Optional[List[dict]]: return streams -def recommendations_based_on_category(category_id: int) -> Optional[List[dict]]: +def get_streams_based_on_category(category_id: int) -> Optional[List[dict]]: """ Queries stream database to get top 25 most viewed streams based on given category """ with Database() as db: streams = db.fetchall(""" - SELECT streams.stream_id, title, username, num_viewers, category_name + SELECT u.user_id, title, username, num_viewers, category_name FROM streams - JOIN users ON users.user_id = streams.user_id + JOIN users u ON streams.user_id = u.user_id JOIN categories ON streams.category_id = categories.category_id WHERE categories.category_id = ? ORDER BY num_viewers DESC @@ -51,41 +51,37 @@ def recommendations_based_on_category(category_id: int) -> Optional[List[dict]]: return streams -def default_recommendations() -> Optional[List[dict]]: +def get_highest_view_streams(no_streams: int) -> Optional[List[dict]]: """ - Return a list of 25 recommended live streams by number of viewers + Return a list of live streams by number of viewers """ with Database() as db: data = db.fetchall(""" - SELECT streams.stream_id, title, username, num_viewers, category_name + SELECT u.user_id, username, title, num_viewers, category_name FROM streams - JOIN users ON users.user_id = streams.user_id + JOIN users u ON streams.user_id = u.user_id JOIN categories ON streams.category_id = categories.category_id - WHERE isLive = 1 ORDER BY num_viewers DESC - LIMIT 25; - """) + LIMIT ?; + """, (no_streams,)) return data - -def category_recommendations() -> Optional[List[dict]]: +def get_highest_view_categories(no_categories: int) -> Optional[List[dict]]: """ - Returns a list of the top 5 most popular live categories + Returns a list of top 5 most popular categories """ with Database() as db: categories = db.fetchall(""" - SELECT categories.category_id, categories.category_name + SELECT categories.category_id, categories.category_name, SUM(streams.num_viewers) AS total_viewers FROM streams JOIN categories ON streams.category_id = categories.category_id - WHERE streams.isLive = 1 GROUP BY categories.category_name ORDER BY SUM(streams.num_viewers) DESC - LIMIT 5; - """) + LIMIT ?; + """, (no_categories,)) return categories - -def user_category_recommendations(user_id: int) -> Optional[List[dict]]: +def get_user_category_recommendations(user_id: int) -> Optional[List[dict]]: """ Queries user_preferences database to find users top 5 favourite streaming category and returns the category """ diff --git a/web_server/utils/stream_utils.py b/web_server/utils/stream_utils.py index bf70e9f..e512fb1 100644 --- a/web_server/utils/stream_utils.py +++ b/web_server/utils/stream_utils.py @@ -3,37 +3,38 @@ from typing import Optional import sqlite3, os, subprocess from time import sleep from typing import Optional, List +from datetime import datetime -def streamer_live_status(user_id: int) -> dict: +def get_streamer_live_status(user_id: int): """ Returns boolean on whether the given streamer is live """ with Database() as db: is_live = db.fetchone(""" - SELECT isLive - FROM streams - WHERE user_id = ? - ORDER BY stream_id DESC - LIMIT 1; + SELECT is_live + FROM users + WHERE user_id = ?; """, (user_id,)) return is_live -def followed_live_streams(user_id: int) -> Optional[List[dict]]: +def get_followed_live_streams(user_id: int) -> Optional[List[dict]]: """ Searches for streamers who the user followed which are currently live + Returns a list of live streams with the streamer's user id, stream title, and number of viewers """ with Database() as db: live_streams = db.fetchall(""" - SELECT user_id, stream_id, title, num_viewers - FROM streams - WHERE user_id IN (SELECT followed_id FROM follows WHERE user_id = ?) - AND stream_id = (SELECT MAX(stream_id) FROM streams WHERE user_id = streams.user_id) - AND isLive = 1; - """, (user_id,)) + SELECT users.user_id, streams.title, streams.num_viewers, users.username + FROM streams JOIN users + ON streams.user_id = users.user_id + WHERE users.user_id IN + (SELECT followed_id FROM follows WHERE user_id = ?) + AND users.is_live = 1; + """, (user_id,)) return live_streams -def followed_streamers(user_id: int) -> Optional[List[dict]]: +def get_followed_streamers(user_id: int) -> Optional[List[dict]]: """ Returns a list of streamers who the user follows """ @@ -45,62 +46,54 @@ def followed_streamers(user_id: int) -> Optional[List[dict]]: """, (user_id,)) return followed_streamers -def user_stream(user_id: int, stream_id: int) -> dict: +def get_vod(vod_id: int) -> dict: """ - Returns data of a streamers selected stream + Returns data of a streamers vod """ with Database() as db: - stream = db.fetchone(""" - SELECT u.username, s.user_id, s.title, s.start_time, s.num_viewers, c.category_name - FROM streams AS s - JOIN categories AS c ON s.category_id = c.category_id - JOIN users AS u ON s.user_id = u.user_id - WHERE u.user_id = ? - AND s.stream_id = ? - """, (user_id, stream_id)) - return stream + vod = db.fetchone("""SELECT * FROM vods WHERE vod_id = ?;""", (vod_id,)) + return vod -def streamer_most_recent_stream(user_id: int) -> Optional[dict]: +def get_latest_vod(user_id: int): """ Returns data of the most recent stream by a streamer """ with Database() as db: - most_recent_stream = db.fetchone(""" - SELECT s.stream_id, u.username, s.user_id, s.title, s.start_time, s.num_viewers, c.category_name - FROM streams AS s - JOIN categories AS c ON s.category_id = c.category_id - JOIN users AS u ON s.user_id = u.user_id - WHERE u.user_id = ? - AND s.stream_id = (SELECT MAX(stream_id) FROM streams WHERE user_id = ?) - """, (user_id, user_id)) - return most_recent_stream + latest_vod = db.fetchone("""SELECT * FROM vods WHERE user_id = ? ORDER BY vod_id DESC LIMIT 1;""", (user_id,)) + return latest_vod -def streamer_data(streamer_id: int) -> Optional[dict]: +def get_user_vods(user_id: int): + """ + Returns data of all vods by a streamer + """ + with Database() as db: + vods = db.fetchall("""SELECT * FROM vods WHERE user_id = ?;""", (user_id,)) + return vods + + +def get_streamer_data(user_id: int) -> Optional[dict]: """ Returns information about the streamer """ with Database() as db: data = db.fetchone(""" SELECT username, bio, num_followers, is_partnered FROM users - WHERE user_id = ? - """, (streamer_id,)) + WHERE user_id = ?; + """, (user_id,)) return data def generate_thumbnail(user_id: int) -> None: """ - Returns the thumbnail of a stream + Generates the thumbnail of a stream """ - db = Database() - username = db.fetchone("""SELECT * FROM users WHERE user_id = ?""", (user_id,)) - db.close_connection() + with Database() as db: + username = db.fetchone("""SELECT * FROM users WHERE user_id = ?""", (user_id,)) if not username: return None if not os.path.exists(f"stream_data/thumbnails/"): os.makedirs(f"stream_data/thumbnails/") - - subprocess.Popen(["ls", "-lR"]) thumbnail_command = [ "ffmpeg", @@ -115,16 +108,56 @@ def generate_thumbnail(user_id: int) -> None: ] subprocess.run(thumbnail_command) -def stream_tags(stream_id: int) -> Optional[List[str]]: + +def get_stream_tags(user_id: int) -> Optional[List[str]]: """ - Given a stream return tags associated with the stream + Given a stream return tags associated with the user's stream """ with Database() as db: tags = db.fetchall(""" SELECT tag_name FROM tags JOIN stream_tags ON tags.tag_id = stream_tags.tag_id - WHERE stream_id = ? - """, (stream_id,)) - tags = [tag['tag_name'] for tag in tags] if tags else None + WHERE user_id = ?; + """, (user_id,)) return tags + +def get_vod_tags(vod_id: int): + """ + Given a vod return tags associated with the vod + """ + with Database() as db: + tags = db.fetchall(""" + SELECT tag_name + FROM tags + JOIN vod_tags ON tags.tag_id = vod_tags.tag_id + WHERE vod_id = ?; + """, (vod_id,)) + return tags + +def transfer_stream_to_vod(user_id: int): + """ + Deletes stream from stream table and moves it to VoD table + TODO: Add functionaliy to save stream permanently + """ + + with Database() as db: + stream = db.fetchone(""" + SELECT * FROM streams WHERE user_id = ?; + """, (user_id,)) + + if not stream: + return None + + ## TODO: calculate length in seconds, currently using temp value + + db.execute(""" + INSERT INTO vods (user_id, title, datetime, category_id, length, views) + VALUES (?, ?, ?, ?, ?, ?); + """, (stream["user_id"], stream["title"], stream["datetime"], stream["category_id"], 10, stream["num_viewers"])) + + db.execute(""" + DELETE FROM streams WHERE user_id = ?; + """, (user_id,)) + + return True \ No newline at end of file