From 899764eaf8ba4b5cca1a3385c16bdd5573180016 Mon Sep 17 00:00:00 2001 From: ThisBirchWood Date: Tue, 11 Feb 2025 20:57:27 +0000 Subject: [PATCH] FEAT: Streams now save to mp4 after a stream is stopped, instead of discarded --- nginx/Dockerfile | 3 + nginx/nginx.conf | 1 + web_server/blueprints/streams.py | 102 ++++++++++++++++++++------- web_server/celery_tasks/__init__.py | 49 +++++++++++-- web_server/database/app.db | Bin 159744 -> 159744 bytes web_server/database/testing_data.sql | 4 +- web_server/utils/stream_utils.py | 19 ++++- 7 files changed, 142 insertions(+), 36 deletions(-) diff --git a/nginx/Dockerfile b/nginx/Dockerfile index 7830c79..0e2351f 100644 --- a/nginx/Dockerfile +++ b/nginx/Dockerfile @@ -3,5 +3,8 @@ FROM tiangolo/nginx-rtmp COPY nginx.conf /etc/nginx/nginx.conf EXPOSE 1935 8080 +RUN mkdir -p /stream_data/hls && \ + chmod -R 777 /stream_data + # Start the Nginx server CMD [ "nginx", "-g", "daemon off;" ] diff --git a/nginx/nginx.conf b/nginx/nginx.conf index 2e1b5ba..d621790 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf @@ -32,6 +32,7 @@ rtmp { hls_nested on; hls_fragment 5s; hls_playlist_length 60s; + hls_cleanup off; } } } diff --git a/web_server/blueprints/streams.py b/web_server/blueprints/streams.py index c54fa64..b65426d 100644 --- a/web_server/blueprints/streams.py +++ b/web_server/blueprints/streams.py @@ -5,7 +5,8 @@ from utils.user_utils import get_user_id from blueprints.middleware import login_required from database.database import Database from datetime import datetime -from celery_tasks import update_thumbnail +from celery_tasks import update_thumbnail, combine_ts_stream +from dateutil import parser stream_bp = Blueprint("stream", __name__) @@ -142,26 +143,40 @@ def vods(username): def publish_stream(): """ Authenticates stream from streamer and publishes it to the site + + step-by-step: + fetch user info from stream key + insert stream into database + set user as streaming + periodically update thumbnail """ stream_key = request.form.get("name") + print("Stream request received") - # Check if stream key is valid - db = Database() - user_info = db.fetchone("""SELECT user_id, username, current_stream_title, current_selected_category_id - FROM users - WHERE stream_key = ?""", (stream_key,)) + # Open database connection + with Database() as db: + # Get user info from stream key + user_info = db.fetchone("""SELECT user_id, username, current_stream_title, current_selected_category_id, is_live + FROM users + WHERE stream_key = ?""", (stream_key,)) - if not user_info: - return "Unauthorized", 403 + # If stream key is invalid, return unauthorized + if not user_info or user_info["is_live"]: + return "Unauthorized", 403 + + # Insert stream into database + db.execute("""INSERT INTO streams (user_id, title, start_time, num_viewers, category_id) + VALUES (?, ?, ?, ?, ?)""", (user_info["user_id"], + user_info["current_stream_title"], + datetime.now(), + 0, + 1)) + + # Set user as streaming + db.execute("""UPDATE users SET is_live = 1 WHERE user_id = ?""", (user_info["user_id"],)) - # Insert stream into database - db.execute("""INSERT INTO streams (user_id, title, category_id, start_time, isLive) - VALUES (?, ?, ?, ?, ?)""", (user_info["user_id"], - user_info["current_stream_title"], - 1, - datetime.now(), - 1)) + # Update thumbnail periodically update_thumbnail.delay(user_info["user_id"]) return redirect(f"/{user_info['username']}") @@ -170,17 +185,54 @@ def publish_stream(): def end_stream(): """ Ends a stream + + step-by-step: + remove stream from database + move stream to vod table + set user as not streaming + convert ts files to mp4 + clean up old ts files + end thumbnail generation """ - db = Database() - - # get stream key - user_info = db.fetchone("""SELECT user_id FROM users WHERE stream_key = ?""", (request.form.get("name"),)) - stream_info = db.fetchone("""SELECT stream_id FROM streams WHERE user_id = ?""", (user_info["user_id"],)) - - if not user_info: - return "Unauthorized", 403 - # Remove stream from database - db.execute("""DELETE FROM streams WHERE user_id = ?""", (user_info["user_id"],)) + stream_key = request.form.get("name") + + # Open database connection + with Database() as db: + # Get user info from stream key + user_info = db.fetchone("""SELECT * + FROM users + WHERE stream_key = ?""", (stream_key,)) + + stream_info = db.fetchone("""SELECT * + FROM streams + WHERE user_id = ?""", (user_info["user_id"],)) + + + # If stream key is invalid, return unauthorized + if not user_info: + return "Unauthorized", 403 + + # Remove stream from database + db.execute("""DELETE FROM streams + WHERE user_id = ?""", (user_info["user_id"],)) + + # Move stream to vod table + stream_length = int((datetime.now() - parser.parse(stream_info["start_time"])).total_seconds()) + + db.execute("""INSERT INTO vods (user_id, title, datetime, category_id, length, views) + VALUES (?, ?, ?, ?, ?, ?)""", (user_info["user_id"], + user_info["current_stream_title"], + stream_info["start_time"], + user_info["current_selected_category_id"], + stream_length, + 0)) + + # Set user as not streaming + db.execute("""UPDATE users + SET is_live = 0 + WHERE user_id = ?""", (user_info["user_id"],)) + + combine_ts_stream.delay(user_info["username"]) return "Stream ended", 200 \ No newline at end of file diff --git a/web_server/celery_tasks/__init__.py b/web_server/celery_tasks/__init__.py index 284b71b..c23180d 100644 --- a/web_server/celery_tasks/__init__.py +++ b/web_server/celery_tasks/__init__.py @@ -1,6 +1,9 @@ from celery import Celery, shared_task, Task from utils.stream_utils import generate_thumbnail, get_streamer_live_status from time import sleep +from os import listdir, remove +from datetime import datetime +import subprocess def celery_init_app(app) -> Celery: class FlaskTask(Task): @@ -19,11 +22,43 @@ def update_thumbnail(user_id, sleep_time=180) -> None: """ Updates the thumbnail of a stream periodically """ - ffmpeg_wait_time = 5 + generate_thumbnail(user_id) + sleep(sleep_time) - # check if user is streaming - while get_streamer_live_status(user_id)['isLive']: - sleep(ffmpeg_wait_time) - generate_thumbnail(user_id) - sleep(sleep_time - ffmpeg_wait_time) - return \ No newline at end of file +@shared_task +def combine_ts_stream(username): + """ + Combines all ts files into a single vod, and removes the ts files + """ + path = f"stream_data/hls/{username}/" + ts_files = [f for f in listdir(path) if f.endswith(".ts")] + ts_files.sort() + + # Create temp file listing all ts files + with open(f"{path}list.txt", "w") as f: + for ts_file in ts_files: + f.write(f"file '{ts_file}'\n") + + # Concatenate all ts files into a single vod + file_name = datetime.now().strftime("%d-%m-%Y-%H-%M-%S") + vod_path = f"stream_data/hls/{username}/{file_name}.mp4" + vod_command = [ + "ffmpeg", + "-f", + "concat", + "-safe", + "0", + "-i", + f"{path}list.txt", + "-c", + "copy", + vod_path + ] + + subprocess.run(vod_command) + + # Remove ts files + for ts_file in ts_files: + remove(f"{path}{ts_file}") + + return vod_path diff --git a/web_server/database/app.db b/web_server/database/app.db index 89703241f94b779345011c7aca83fbfbd282f802..b848ec4ea1107114b4a103eefd17af2658fb73e3 100644 GIT binary patch delta 555 zcmZvYF-ROy5Qb;w?d|O@`hGWS@w#zpILy=Ar8$*Vfg;~qcn}$AYXj)r?5=C5O(bgy%VSnN2v*h{wi}^&qvN)cL zBkCQ_DI%m}7nGsbml5t&MDU-{>if=LN(u>Y=OkuQ5C?iNI3S$j7zgce=^L!;aF>eY zMI<{t;xn+9o&4sY40Unr24T-`z}PKabKOqf&^0d$-uow5(}+6~jjFcXk2KRsma21F zU0t+VThO;+J*n5Li#6>PTrX&uyRtuMjHf^CI$^@4H!0!Ipor zuYEI%9wQq=^A-E;SL_*eC-4H5G4o3?@IT{!&o2d3#KS*b>OSLDL4Gb~1{I(r10y3N z2s3V*z{n!7o#g}LCw@lG?ce`1aw$MGGc)i%1!`snYW~VMomrphxflxzHwz1P6L?to zr!nw9=0C~5fqxoMOB?_6Y5q*Uy3#;baT+smx)x+6E4U}-X6B_U1g91kXXfWIOaocO o!heT>|2zLXpk;S93kICwpMEEvNt&IVk(oh+k$L;S{ftZk0DM(J$N&HU diff --git a/web_server/database/testing_data.sql b/web_server/database/testing_data.sql index 442fbd3..fdfa058 100644 --- a/web_server/database/testing_data.sql +++ b/web_server/database/testing_data.sql @@ -1,6 +1,6 @@ -- Sample Data for users INSERT INTO users (username, password, email, num_followers, stream_key, is_partnered, bio, is_live, current_stream_title, current_selected_category_id) VALUES -('GamerDude', 'password123', 'gamerdude@example.com', 500, '1234', 0, 'Streaming my gaming adventures!', 1, 'Epic Gaming Session', 1), +('GamerDude', 'password123', 'gamerdude@example.com', 500, '1234', 0, 'Streaming my gaming adventures!', 0, 'Epic Gaming Session', 1), ('MusicLover', 'music4life', 'musiclover@example.com', 1200, '2345', 0, 'I share my favorite tunes.', 1, 'Live Music Jam', 2), ('ArtFan', 'artistic123', 'artfan@example.com', 300, '3456', 0, 'Exploring the world of art.', 1, 'Sketching Live', 3), ('EduGuru', 'learn123', 'eduguru@example.com', 800, '4567', 0, 'Teaching everything I know.', 1, 'Math Made Easy', 4), @@ -135,3 +135,5 @@ SELECT * FROM stream_tags; -- To see all tables in the database SELECT name FROM sqlite_master WHERE type='table'; + +UPDATE users SET is_live = 0 WHERE user_id = 1; diff --git a/web_server/utils/stream_utils.py b/web_server/utils/stream_utils.py index b5e309e..fea17fc 100644 --- a/web_server/utils/stream_utils.py +++ b/web_server/utils/stream_utils.py @@ -2,6 +2,7 @@ from database.database import Database from typing import Optional import os, subprocess from typing import Optional, List +from time import sleep def get_streamer_live_status(user_id: int): """ @@ -82,7 +83,6 @@ def get_user_vods(user_id: int): vods = db.fetchall("""SELECT * FROM vods WHERE user_id = ?;""", (user_id,)) return vods - def generate_thumbnail(user_id: int) -> None: """ Generates the thumbnail of a stream @@ -108,7 +108,20 @@ def generate_thumbnail(user_id: int) -> None: f"stream_data/thumbnails/{username['username']}.jpg" ] - subprocess.run(thumbnail_command) + attempts = 3 + + while attempts > 0: + try: + subprocess.run(thumbnail_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) + print(f"Thumbnail {username['username']} generated successfully") + break + except subprocess.CalledProcessError as e: + attempts -= 1 + print("FFmpeg failed with an error:") + print(e.stderr.decode()) # Print detailed error message + print("Retrying in 5 seconds...") + sleep(5) + continue def get_stream_tags(user_id: int) -> Optional[List[str]]: """ @@ -148,7 +161,7 @@ def transfer_stream_to_vod(user_id: int): """, (user_id,)) if not stream: - return None + return False ## TODO: calculate length in seconds, currently using temp value