PATCH: Refactored underlying file system for streams, seperated vods, streams and thumbnails
This commit is contained in:
@@ -65,3 +65,4 @@ networks:
|
|||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
stream_data:
|
stream_data:
|
||||||
|
driver: local
|
||||||
@@ -3,8 +3,5 @@ FROM tiangolo/nginx-rtmp
|
|||||||
COPY nginx.conf /etc/nginx/nginx.conf
|
COPY nginx.conf /etc/nginx/nginx.conf
|
||||||
EXPOSE 1935 8080
|
EXPOSE 1935 8080
|
||||||
|
|
||||||
RUN mkdir -p /stream_data/hls && \
|
|
||||||
chmod -R 777 /stream_data
|
|
||||||
|
|
||||||
# Start the Nginx server
|
# Start the Nginx server
|
||||||
CMD [ "nginx", "-g", "daemon off;" ]
|
CMD [ "nginx", "-g", "daemon off;" ]
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ rtmp {
|
|||||||
live on;
|
live on;
|
||||||
|
|
||||||
hls on;
|
hls on;
|
||||||
hls_path /stream_data/hls;
|
hls_path /stream_data/;
|
||||||
|
|
||||||
allow publish 127.0.0.1;
|
allow publish 127.0.0.1;
|
||||||
deny publish all;
|
deny publish all;
|
||||||
@@ -76,24 +76,24 @@ http {
|
|||||||
}
|
}
|
||||||
|
|
||||||
# The MPEG-TS video chunks are stored in /tmp/hls
|
# The MPEG-TS video chunks are stored in /tmp/hls
|
||||||
location ~ ^/stream/user/(.+\.ts)$ {
|
location ~ ^/stream/(.+)/(.+\.ts)$ {
|
||||||
alias /stream_data/hls/$1;
|
alias /stream_data/$1/stream/$2;
|
||||||
|
|
||||||
# Let the MPEG-TS video chunks be cacheable
|
# Let the MPEG-TS video chunks be cacheable
|
||||||
expires max;
|
expires max;
|
||||||
}
|
}
|
||||||
|
|
||||||
# The M3U8 playlists location
|
# The M3U8 playlists location
|
||||||
location ~ ^/stream/user/(.+\.m3u8)$ {
|
location ~ ^/stream/(.+)/(.+\.m3u8)$ {
|
||||||
alias /stream_data/hls/$1;
|
alias /stream_data/$1/stream/$2;
|
||||||
|
|
||||||
# The M3U8 playlists should not be cacheable
|
# The M3U8 playlists should not be cacheable
|
||||||
expires -1d;
|
expires -1d;
|
||||||
}
|
}
|
||||||
|
|
||||||
# The thumbnails location
|
# The thumbnails location
|
||||||
location ~ ^/stream/user/thumbnails/(.+\.jpg)$ {
|
location ~ ^/stream/(.+)/thumbnails/(.+\.jpg)$ {
|
||||||
alias /stream_data/thumbnails/$1;
|
alias /stream_data/$1/thumbnails/$2;
|
||||||
|
|
||||||
# The thumbnails should not be cacheable
|
# The thumbnails should not be cacheable
|
||||||
expires -1d;
|
expires -1d;
|
||||||
|
|||||||
@@ -7,12 +7,15 @@ from database.database import Database
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from celery_tasks import update_thumbnail, combine_ts_stream
|
from celery_tasks import update_thumbnail, combine_ts_stream
|
||||||
from dateutil import parser
|
from dateutil import parser
|
||||||
|
from utils.path_manager import PathManager
|
||||||
|
|
||||||
stream_bp = Blueprint("stream", __name__)
|
stream_bp = Blueprint("stream", __name__)
|
||||||
|
|
||||||
# Constants
|
# Constants
|
||||||
THUMBNAIL_GENERATION_INTERVAL = 180
|
THUMBNAIL_GENERATION_INTERVAL = 180
|
||||||
|
|
||||||
|
## Path Manager
|
||||||
|
path_manager = PathManager()
|
||||||
|
|
||||||
## Stream Routes
|
## Stream Routes
|
||||||
@stream_bp.route('/streams/popular/<int:no_streams>')
|
@stream_bp.route('/streams/popular/<int:no_streams>')
|
||||||
@@ -175,11 +178,17 @@ def publish_stream():
|
|||||||
# Set user as streaming
|
# Set user as streaming
|
||||||
db.execute("""UPDATE users SET is_live = 1 WHERE user_id = ?""", (user_info["user_id"],))
|
db.execute("""UPDATE users SET is_live = 1 WHERE user_id = ?""", (user_info["user_id"],))
|
||||||
|
|
||||||
|
username = user_info["username"]
|
||||||
|
|
||||||
|
# Local file creation
|
||||||
|
create_local_directories(username)
|
||||||
|
|
||||||
# Update thumbnail periodically
|
# Update thumbnail periodically
|
||||||
update_thumbnail.delay(user_info["user_id"])
|
update_thumbnail.delay(path_manager.get_stream_file_path(username),
|
||||||
|
path_manager.get_thumbnail_file_path(username),
|
||||||
|
THUMBNAIL_GENERATION_INTERVAL)
|
||||||
|
|
||||||
return redirect(f"/{user_info['username']}")
|
return redirect(f"/{user_info['username']}/stream/")
|
||||||
|
|
||||||
@stream_bp.route("/end_stream", methods=["POST"])
|
@stream_bp.route("/end_stream", methods=["POST"])
|
||||||
def end_stream():
|
def end_stream():
|
||||||
@@ -233,6 +242,9 @@ def end_stream():
|
|||||||
SET is_live = 0
|
SET is_live = 0
|
||||||
WHERE user_id = ?""", (user_info["user_id"],))
|
WHERE user_id = ?""", (user_info["user_id"],))
|
||||||
|
|
||||||
combine_ts_stream.delay(user_info["username"])
|
# Get username
|
||||||
|
username = user_info["username"]
|
||||||
|
|
||||||
|
combine_ts_stream.delay(path_manager.get_stream_path(username), path_manager.get_vods_path(username))
|
||||||
|
|
||||||
return "Stream ended", 200
|
return "Stream ended", 200
|
||||||
@@ -18,30 +18,29 @@ def celery_init_app(app) -> Celery:
|
|||||||
return celery_app
|
return celery_app
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
def update_thumbnail(user_id, sleep_time=180) -> None:
|
def update_thumbnail(stream_file, thumbnail_file, sleep_time) -> None:
|
||||||
"""
|
"""
|
||||||
Updates the thumbnail of a stream periodically
|
Updates the thumbnail of a stream periodically
|
||||||
"""
|
"""
|
||||||
generate_thumbnail(user_id)
|
generate_thumbnail(stream_file, thumbnail_file)
|
||||||
sleep(sleep_time)
|
sleep(sleep_time)
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
def combine_ts_stream(username):
|
def combine_ts_stream(stream_path, vods_path):
|
||||||
"""
|
"""
|
||||||
Combines all ts files into a single vod, and removes the ts files
|
Combines all ts files into a single vod, and removes the ts files
|
||||||
"""
|
"""
|
||||||
path = f"stream_data/hls/{username}/"
|
ts_files = [f for f in listdir(stream_path) if f.endswith(".ts")]
|
||||||
ts_files = [f for f in listdir(path) if f.endswith(".ts")]
|
|
||||||
ts_files.sort()
|
ts_files.sort()
|
||||||
|
|
||||||
# Create temp file listing all ts files
|
# Create temp file listing all ts files
|
||||||
with open(f"{path}list.txt", "w") as f:
|
with open(f"{stream_path}/list.txt", "w") as f:
|
||||||
for ts_file in ts_files:
|
for ts_file in ts_files:
|
||||||
f.write(f"file '{ts_file}'\n")
|
f.write(f"file '{ts_file}'\n")
|
||||||
|
|
||||||
# Concatenate all ts files into a single vod
|
# Concatenate all ts files into a single vod
|
||||||
file_name = datetime.now().strftime("%d-%m-%Y-%H-%M-%S")
|
file_name = datetime.now().strftime("%d-%m-%Y-%H-%M-%S") + ".mp4"
|
||||||
vod_path = f"stream_data/hls/{username}/{file_name}.mp4"
|
|
||||||
vod_command = [
|
vod_command = [
|
||||||
"ffmpeg",
|
"ffmpeg",
|
||||||
"-f",
|
"-f",
|
||||||
@@ -49,16 +48,14 @@ def combine_ts_stream(username):
|
|||||||
"-safe",
|
"-safe",
|
||||||
"0",
|
"0",
|
||||||
"-i",
|
"-i",
|
||||||
f"{path}list.txt",
|
f"{stream_path}/list.txt",
|
||||||
"-c",
|
"-c",
|
||||||
"copy",
|
"copy",
|
||||||
vod_path
|
f"{vods_path}/{file_name}"
|
||||||
]
|
]
|
||||||
|
|
||||||
subprocess.run(vod_command)
|
subprocess.run(vod_command)
|
||||||
|
|
||||||
# Remove ts files
|
# Remove ts files
|
||||||
for ts_file in ts_files:
|
for ts_file in ts_files:
|
||||||
remove(f"{path}{ts_file}")
|
remove(f"{stream_path}/{ts_file}")
|
||||||
|
|
||||||
return vod_path
|
|
||||||
|
|||||||
17
web_server/utils/path_manager.py
Normal file
17
web_server/utils/path_manager.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
# Description: This file contains the PathManager class which is responsible for managing the paths of the stream data.
|
||||||
|
|
||||||
|
class PathManager():
|
||||||
|
def get_vods_path(self, username):
|
||||||
|
return f"stream_data/{username}/vods"
|
||||||
|
|
||||||
|
def get_stream_path(self, username):
|
||||||
|
return f"stream_data/{username}/stream"
|
||||||
|
|
||||||
|
def get_thumbnail_path(self, username):
|
||||||
|
return f"stream_data/{username}/thumbnails"
|
||||||
|
|
||||||
|
def get_stream_file_path(self, username):
|
||||||
|
return f"{self.get_stream_path(username)}/index.m3u8"
|
||||||
|
|
||||||
|
def get_thumbnail_file_path(self, username):
|
||||||
|
return f"{self.get_thumbnail_path(username)}/stream.jpg"
|
||||||
@@ -83,44 +83,36 @@ def get_user_vods(user_id: int):
|
|||||||
vods = db.fetchall("""SELECT * FROM vods WHERE user_id = ?;""", (user_id,))
|
vods = db.fetchall("""SELECT * FROM vods WHERE user_id = ?;""", (user_id,))
|
||||||
return vods
|
return vods
|
||||||
|
|
||||||
def generate_thumbnail(user_id: int) -> None:
|
def generate_thumbnail(stream_file: str, thumbnail_file: str, retry_time=5, retry_count=3) -> None:
|
||||||
"""
|
"""
|
||||||
Generates the thumbnail of a stream
|
Generates the thumbnail of a stream
|
||||||
"""
|
"""
|
||||||
with Database() as db:
|
|
||||||
username = db.fetchone("""SELECT * FROM users WHERE user_id = ?""", (user_id,))
|
|
||||||
|
|
||||||
if not username:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if not os.path.exists(f"stream_data/thumbnails/"):
|
|
||||||
os.makedirs(f"stream_data/thumbnails/")
|
|
||||||
|
|
||||||
thumbnail_command = [
|
thumbnail_command = [
|
||||||
"ffmpeg",
|
"ffmpeg",
|
||||||
"-y",
|
"-y",
|
||||||
"-i",
|
"-i",
|
||||||
f"stream_data/hls/{username['username']}/index.m3u8",
|
f"{stream_file}",
|
||||||
"-vframes",
|
"-vframes",
|
||||||
"1",
|
"1",
|
||||||
"-q:v",
|
"-q:v",
|
||||||
"2",
|
"2",
|
||||||
f"stream_data/thumbnails/{username['username']}.jpg"
|
f"{thumbnail_file}"
|
||||||
]
|
]
|
||||||
|
|
||||||
attempts = 3
|
attempts = retry_count
|
||||||
|
|
||||||
while attempts > 0:
|
while attempts > 0:
|
||||||
try:
|
try:
|
||||||
subprocess.run(thumbnail_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
|
subprocess.run(thumbnail_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
|
||||||
print(f"Thumbnail {username['username']} generated successfully")
|
print(f"Thumbnail generated for {stream_file}")
|
||||||
break
|
break
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
attempts -= 1
|
attempts -= 1
|
||||||
print("FFmpeg failed with an error:")
|
print("FFmpeg failed with an error:")
|
||||||
print(e.stderr.decode()) # Print detailed error message
|
print(e.stderr.decode()) # Print detailed error message
|
||||||
print("Retrying in 5 seconds...")
|
print(f"Retrying in {retry_time} seconds...")
|
||||||
sleep(5)
|
sleep(retry_time)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
def get_stream_tags(user_id: int) -> Optional[List[str]]:
|
def get_stream_tags(user_id: int) -> Optional[List[str]]:
|
||||||
@@ -175,3 +167,33 @@ def transfer_stream_to_vod(user_id: int):
|
|||||||
""", (user_id,))
|
""", (user_id,))
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def create_local_directories(username: str):
|
||||||
|
"""
|
||||||
|
Create directories for user stream data if they do not exist
|
||||||
|
"""
|
||||||
|
|
||||||
|
vods_path = f"stream_data/{username}/vods"
|
||||||
|
stream_path = f"stream_data/{username}/stream"
|
||||||
|
thumbnail_path = f"stream_data/{username}/thumbnails"
|
||||||
|
|
||||||
|
if not os.path.exists(vods_path):
|
||||||
|
os.makedirs(vods_path)
|
||||||
|
|
||||||
|
if not os.path.exists(stream_path):
|
||||||
|
os.makedirs(stream_path)
|
||||||
|
|
||||||
|
if not os.path.exists(thumbnail_path):
|
||||||
|
os.makedirs(thumbnail_path)
|
||||||
|
|
||||||
|
# Fix permissions
|
||||||
|
os.chmod(f"stream_data/{username}", 0o777)
|
||||||
|
os.chmod(vods_path, 0o777)
|
||||||
|
os.chmod(stream_path, 0o777)
|
||||||
|
os.chmod(thumbnail_path, 0o777)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"vod_path": vods_path,
|
||||||
|
"stream_path": stream_path,
|
||||||
|
"thumbnail_path": thumbnail_path
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user