diff --git a/nginx/Dockerfile b/nginx/Dockerfile index 7830c79..0e2351f 100644 --- a/nginx/Dockerfile +++ b/nginx/Dockerfile @@ -3,5 +3,8 @@ FROM tiangolo/nginx-rtmp COPY nginx.conf /etc/nginx/nginx.conf EXPOSE 1935 8080 +RUN mkdir -p /stream_data/hls && \ + chmod -R 777 /stream_data + # Start the Nginx server CMD [ "nginx", "-g", "daemon off;" ] diff --git a/nginx/nginx.conf b/nginx/nginx.conf index 2e1b5ba..d621790 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf @@ -32,6 +32,7 @@ rtmp { hls_nested on; hls_fragment 5s; hls_playlist_length 60s; + hls_cleanup off; } } } diff --git a/web_server/blueprints/streams.py b/web_server/blueprints/streams.py index c54fa64..b65426d 100644 --- a/web_server/blueprints/streams.py +++ b/web_server/blueprints/streams.py @@ -5,7 +5,8 @@ from utils.user_utils import get_user_id from blueprints.middleware import login_required from database.database import Database from datetime import datetime -from celery_tasks import update_thumbnail +from celery_tasks import update_thumbnail, combine_ts_stream +from dateutil import parser stream_bp = Blueprint("stream", __name__) @@ -142,26 +143,40 @@ def vods(username): def publish_stream(): """ Authenticates stream from streamer and publishes it to the site + + step-by-step: + fetch user info from stream key + insert stream into database + set user as streaming + periodically update thumbnail """ stream_key = request.form.get("name") + print("Stream request received") - # Check if stream key is valid - db = Database() - user_info = db.fetchone("""SELECT user_id, username, current_stream_title, current_selected_category_id - FROM users - WHERE stream_key = ?""", (stream_key,)) + # Open database connection + with Database() as db: + # Get user info from stream key + user_info = db.fetchone("""SELECT user_id, username, current_stream_title, current_selected_category_id, is_live + FROM users + WHERE stream_key = ?""", (stream_key,)) - if not user_info: - return "Unauthorized", 403 + # If stream key is invalid, return unauthorized + if not user_info or user_info["is_live"]: + return "Unauthorized", 403 + + # Insert stream into database + db.execute("""INSERT INTO streams (user_id, title, start_time, num_viewers, category_id) + VALUES (?, ?, ?, ?, ?)""", (user_info["user_id"], + user_info["current_stream_title"], + datetime.now(), + 0, + 1)) + + # Set user as streaming + db.execute("""UPDATE users SET is_live = 1 WHERE user_id = ?""", (user_info["user_id"],)) - # Insert stream into database - db.execute("""INSERT INTO streams (user_id, title, category_id, start_time, isLive) - VALUES (?, ?, ?, ?, ?)""", (user_info["user_id"], - user_info["current_stream_title"], - 1, - datetime.now(), - 1)) + # Update thumbnail periodically update_thumbnail.delay(user_info["user_id"]) return redirect(f"/{user_info['username']}") @@ -170,17 +185,54 @@ def publish_stream(): def end_stream(): """ Ends a stream + + step-by-step: + remove stream from database + move stream to vod table + set user as not streaming + convert ts files to mp4 + clean up old ts files + end thumbnail generation """ - db = Database() - - # get stream key - user_info = db.fetchone("""SELECT user_id FROM users WHERE stream_key = ?""", (request.form.get("name"),)) - stream_info = db.fetchone("""SELECT stream_id FROM streams WHERE user_id = ?""", (user_info["user_id"],)) - - if not user_info: - return "Unauthorized", 403 - # Remove stream from database - db.execute("""DELETE FROM streams WHERE user_id = ?""", (user_info["user_id"],)) + stream_key = request.form.get("name") + + # Open database connection + with Database() as db: + # Get user info from stream key + user_info = db.fetchone("""SELECT * + FROM users + WHERE stream_key = ?""", (stream_key,)) + + stream_info = db.fetchone("""SELECT * + FROM streams + WHERE user_id = ?""", (user_info["user_id"],)) + + + # If stream key is invalid, return unauthorized + if not user_info: + return "Unauthorized", 403 + + # Remove stream from database + db.execute("""DELETE FROM streams + WHERE user_id = ?""", (user_info["user_id"],)) + + # Move stream to vod table + stream_length = int((datetime.now() - parser.parse(stream_info["start_time"])).total_seconds()) + + db.execute("""INSERT INTO vods (user_id, title, datetime, category_id, length, views) + VALUES (?, ?, ?, ?, ?, ?)""", (user_info["user_id"], + user_info["current_stream_title"], + stream_info["start_time"], + user_info["current_selected_category_id"], + stream_length, + 0)) + + # Set user as not streaming + db.execute("""UPDATE users + SET is_live = 0 + WHERE user_id = ?""", (user_info["user_id"],)) + + combine_ts_stream.delay(user_info["username"]) return "Stream ended", 200 \ No newline at end of file diff --git a/web_server/celery_tasks/__init__.py b/web_server/celery_tasks/__init__.py index 284b71b..c23180d 100644 --- a/web_server/celery_tasks/__init__.py +++ b/web_server/celery_tasks/__init__.py @@ -1,6 +1,9 @@ from celery import Celery, shared_task, Task from utils.stream_utils import generate_thumbnail, get_streamer_live_status from time import sleep +from os import listdir, remove +from datetime import datetime +import subprocess def celery_init_app(app) -> Celery: class FlaskTask(Task): @@ -19,11 +22,43 @@ def update_thumbnail(user_id, sleep_time=180) -> None: """ Updates the thumbnail of a stream periodically """ - ffmpeg_wait_time = 5 + generate_thumbnail(user_id) + sleep(sleep_time) - # check if user is streaming - while get_streamer_live_status(user_id)['isLive']: - sleep(ffmpeg_wait_time) - generate_thumbnail(user_id) - sleep(sleep_time - ffmpeg_wait_time) - return \ No newline at end of file +@shared_task +def combine_ts_stream(username): + """ + Combines all ts files into a single vod, and removes the ts files + """ + path = f"stream_data/hls/{username}/" + ts_files = [f for f in listdir(path) if f.endswith(".ts")] + ts_files.sort() + + # Create temp file listing all ts files + with open(f"{path}list.txt", "w") as f: + for ts_file in ts_files: + f.write(f"file '{ts_file}'\n") + + # Concatenate all ts files into a single vod + file_name = datetime.now().strftime("%d-%m-%Y-%H-%M-%S") + vod_path = f"stream_data/hls/{username}/{file_name}.mp4" + vod_command = [ + "ffmpeg", + "-f", + "concat", + "-safe", + "0", + "-i", + f"{path}list.txt", + "-c", + "copy", + vod_path + ] + + subprocess.run(vod_command) + + # Remove ts files + for ts_file in ts_files: + remove(f"{path}{ts_file}") + + return vod_path diff --git a/web_server/database/app.db b/web_server/database/app.db index 8970324..b848ec4 100644 Binary files a/web_server/database/app.db and b/web_server/database/app.db differ diff --git a/web_server/database/testing_data.sql b/web_server/database/testing_data.sql index 442fbd3..fdfa058 100644 --- a/web_server/database/testing_data.sql +++ b/web_server/database/testing_data.sql @@ -1,6 +1,6 @@ -- Sample Data for users INSERT INTO users (username, password, email, num_followers, stream_key, is_partnered, bio, is_live, current_stream_title, current_selected_category_id) VALUES -('GamerDude', 'password123', 'gamerdude@example.com', 500, '1234', 0, 'Streaming my gaming adventures!', 1, 'Epic Gaming Session', 1), +('GamerDude', 'password123', 'gamerdude@example.com', 500, '1234', 0, 'Streaming my gaming adventures!', 0, 'Epic Gaming Session', 1), ('MusicLover', 'music4life', 'musiclover@example.com', 1200, '2345', 0, 'I share my favorite tunes.', 1, 'Live Music Jam', 2), ('ArtFan', 'artistic123', 'artfan@example.com', 300, '3456', 0, 'Exploring the world of art.', 1, 'Sketching Live', 3), ('EduGuru', 'learn123', 'eduguru@example.com', 800, '4567', 0, 'Teaching everything I know.', 1, 'Math Made Easy', 4), @@ -135,3 +135,5 @@ SELECT * FROM stream_tags; -- To see all tables in the database SELECT name FROM sqlite_master WHERE type='table'; + +UPDATE users SET is_live = 0 WHERE user_id = 1; diff --git a/web_server/utils/stream_utils.py b/web_server/utils/stream_utils.py index b5e309e..fea17fc 100644 --- a/web_server/utils/stream_utils.py +++ b/web_server/utils/stream_utils.py @@ -2,6 +2,7 @@ from database.database import Database from typing import Optional import os, subprocess from typing import Optional, List +from time import sleep def get_streamer_live_status(user_id: int): """ @@ -82,7 +83,6 @@ def get_user_vods(user_id: int): vods = db.fetchall("""SELECT * FROM vods WHERE user_id = ?;""", (user_id,)) return vods - def generate_thumbnail(user_id: int) -> None: """ Generates the thumbnail of a stream @@ -108,7 +108,20 @@ def generate_thumbnail(user_id: int) -> None: f"stream_data/thumbnails/{username['username']}.jpg" ] - subprocess.run(thumbnail_command) + attempts = 3 + + while attempts > 0: + try: + subprocess.run(thumbnail_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) + print(f"Thumbnail {username['username']} generated successfully") + break + except subprocess.CalledProcessError as e: + attempts -= 1 + print("FFmpeg failed with an error:") + print(e.stderr.decode()) # Print detailed error message + print("Retrying in 5 seconds...") + sleep(5) + continue def get_stream_tags(user_id: int) -> Optional[List[str]]: """ @@ -148,7 +161,7 @@ def transfer_stream_to_vod(user_id: int): """, (user_id,)) if not stream: - return None + return False ## TODO: calculate length in seconds, currently using temp value