FEAT: Streams now save to mp4 after a stream is stopped, instead of discarded

This commit is contained in:
2025-02-11 20:57:27 +00:00
parent 07652eed0d
commit 899764eaf8
7 changed files with 142 additions and 36 deletions

View File

@@ -3,5 +3,8 @@ FROM tiangolo/nginx-rtmp
COPY nginx.conf /etc/nginx/nginx.conf
EXPOSE 1935 8080
RUN mkdir -p /stream_data/hls && \
chmod -R 777 /stream_data
# Start the Nginx server
CMD [ "nginx", "-g", "daemon off;" ]

View File

@@ -32,6 +32,7 @@ rtmp {
hls_nested on;
hls_fragment 5s;
hls_playlist_length 60s;
hls_cleanup off;
}
}
}

View File

@@ -5,7 +5,8 @@ from utils.user_utils import get_user_id
from blueprints.middleware import login_required
from database.database import Database
from datetime import datetime
from celery_tasks import update_thumbnail
from celery_tasks import update_thumbnail, combine_ts_stream
from dateutil import parser
stream_bp = Blueprint("stream", __name__)
@@ -142,26 +143,40 @@ def vods(username):
def publish_stream():
"""
Authenticates stream from streamer and publishes it to the site
step-by-step:
fetch user info from stream key
insert stream into database
set user as streaming
periodically update thumbnail
"""
stream_key = request.form.get("name")
print("Stream request received")
# Check if stream key is valid
db = Database()
user_info = db.fetchone("""SELECT user_id, username, current_stream_title, current_selected_category_id
# Open database connection
with Database() as db:
# Get user info from stream key
user_info = db.fetchone("""SELECT user_id, username, current_stream_title, current_selected_category_id, is_live
FROM users
WHERE stream_key = ?""", (stream_key,))
if not user_info:
# If stream key is invalid, return unauthorized
if not user_info or user_info["is_live"]:
return "Unauthorized", 403
# Insert stream into database
db.execute("""INSERT INTO streams (user_id, title, category_id, start_time, isLive)
db.execute("""INSERT INTO streams (user_id, title, start_time, num_viewers, category_id)
VALUES (?, ?, ?, ?, ?)""", (user_info["user_id"],
user_info["current_stream_title"],
1,
datetime.now(),
0,
1))
# Set user as streaming
db.execute("""UPDATE users SET is_live = 1 WHERE user_id = ?""", (user_info["user_id"],))
# Update thumbnail periodically
update_thumbnail.delay(user_info["user_id"])
return redirect(f"/{user_info['username']}")
@@ -170,17 +185,54 @@ def publish_stream():
def end_stream():
"""
Ends a stream
step-by-step:
remove stream from database
move stream to vod table
set user as not streaming
convert ts files to mp4
clean up old ts files
end thumbnail generation
"""
db = Database()
# get stream key
user_info = db.fetchone("""SELECT user_id FROM users WHERE stream_key = ?""", (request.form.get("name"),))
stream_info = db.fetchone("""SELECT stream_id FROM streams WHERE user_id = ?""", (user_info["user_id"],))
stream_key = request.form.get("name")
# Open database connection
with Database() as db:
# Get user info from stream key
user_info = db.fetchone("""SELECT *
FROM users
WHERE stream_key = ?""", (stream_key,))
stream_info = db.fetchone("""SELECT *
FROM streams
WHERE user_id = ?""", (user_info["user_id"],))
# If stream key is invalid, return unauthorized
if not user_info:
return "Unauthorized", 403
# Remove stream from database
db.execute("""DELETE FROM streams WHERE user_id = ?""", (user_info["user_id"],))
db.execute("""DELETE FROM streams
WHERE user_id = ?""", (user_info["user_id"],))
# Move stream to vod table
stream_length = int((datetime.now() - parser.parse(stream_info["start_time"])).total_seconds())
db.execute("""INSERT INTO vods (user_id, title, datetime, category_id, length, views)
VALUES (?, ?, ?, ?, ?, ?)""", (user_info["user_id"],
user_info["current_stream_title"],
stream_info["start_time"],
user_info["current_selected_category_id"],
stream_length,
0))
# Set user as not streaming
db.execute("""UPDATE users
SET is_live = 0
WHERE user_id = ?""", (user_info["user_id"],))
combine_ts_stream.delay(user_info["username"])
return "Stream ended", 200

View File

@@ -1,6 +1,9 @@
from celery import Celery, shared_task, Task
from utils.stream_utils import generate_thumbnail, get_streamer_live_status
from time import sleep
from os import listdir, remove
from datetime import datetime
import subprocess
def celery_init_app(app) -> Celery:
class FlaskTask(Task):
@@ -19,11 +22,43 @@ def update_thumbnail(user_id, sleep_time=180) -> None:
"""
Updates the thumbnail of a stream periodically
"""
ffmpeg_wait_time = 5
# check if user is streaming
while get_streamer_live_status(user_id)['isLive']:
sleep(ffmpeg_wait_time)
generate_thumbnail(user_id)
sleep(sleep_time - ffmpeg_wait_time)
return
sleep(sleep_time)
@shared_task
def combine_ts_stream(username):
"""
Combines all ts files into a single vod, and removes the ts files
"""
path = f"stream_data/hls/{username}/"
ts_files = [f for f in listdir(path) if f.endswith(".ts")]
ts_files.sort()
# Create temp file listing all ts files
with open(f"{path}list.txt", "w") as f:
for ts_file in ts_files:
f.write(f"file '{ts_file}'\n")
# Concatenate all ts files into a single vod
file_name = datetime.now().strftime("%d-%m-%Y-%H-%M-%S")
vod_path = f"stream_data/hls/{username}/{file_name}.mp4"
vod_command = [
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
f"{path}list.txt",
"-c",
"copy",
vod_path
]
subprocess.run(vod_command)
# Remove ts files
for ts_file in ts_files:
remove(f"{path}{ts_file}")
return vod_path

Binary file not shown.

View File

@@ -1,6 +1,6 @@
-- Sample Data for users
INSERT INTO users (username, password, email, num_followers, stream_key, is_partnered, bio, is_live, current_stream_title, current_selected_category_id) VALUES
('GamerDude', 'password123', 'gamerdude@example.com', 500, '1234', 0, 'Streaming my gaming adventures!', 1, 'Epic Gaming Session', 1),
('GamerDude', 'password123', 'gamerdude@example.com', 500, '1234', 0, 'Streaming my gaming adventures!', 0, 'Epic Gaming Session', 1),
('MusicLover', 'music4life', 'musiclover@example.com', 1200, '2345', 0, 'I share my favorite tunes.', 1, 'Live Music Jam', 2),
('ArtFan', 'artistic123', 'artfan@example.com', 300, '3456', 0, 'Exploring the world of art.', 1, 'Sketching Live', 3),
('EduGuru', 'learn123', 'eduguru@example.com', 800, '4567', 0, 'Teaching everything I know.', 1, 'Math Made Easy', 4),
@@ -135,3 +135,5 @@ SELECT * FROM stream_tags;
-- To see all tables in the database
SELECT name FROM sqlite_master WHERE type='table';
UPDATE users SET is_live = 0 WHERE user_id = 1;

View File

@@ -2,6 +2,7 @@ from database.database import Database
from typing import Optional
import os, subprocess
from typing import Optional, List
from time import sleep
def get_streamer_live_status(user_id: int):
"""
@@ -82,7 +83,6 @@ def get_user_vods(user_id: int):
vods = db.fetchall("""SELECT * FROM vods WHERE user_id = ?;""", (user_id,))
return vods
def generate_thumbnail(user_id: int) -> None:
"""
Generates the thumbnail of a stream
@@ -108,7 +108,20 @@ def generate_thumbnail(user_id: int) -> None:
f"stream_data/thumbnails/{username['username']}.jpg"
]
subprocess.run(thumbnail_command)
attempts = 3
while attempts > 0:
try:
subprocess.run(thumbnail_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
print(f"Thumbnail {username['username']} generated successfully")
break
except subprocess.CalledProcessError as e:
attempts -= 1
print("FFmpeg failed with an error:")
print(e.stderr.decode()) # Print detailed error message
print("Retrying in 5 seconds...")
sleep(5)
continue
def get_stream_tags(user_id: int) -> Optional[List[str]]:
"""
@@ -148,7 +161,7 @@ def transfer_stream_to_vod(user_id: int):
""", (user_id,))
if not stream:
return None
return False
## TODO: calculate length in seconds, currently using temp value