REFACTOR: Moved streaming tasks to dedicated folder and updated refs

This commit is contained in:
2025-02-28 19:38:03 +00:00
parent 077530b6e6
commit 6f449eea4e
3 changed files with 72 additions and 58 deletions

View File

@@ -5,7 +5,7 @@ from utils.user_utils import get_user_id
from blueprints.middleware import login_required from blueprints.middleware import login_required
from database.database import Database from database.database import Database
from datetime import datetime from datetime import datetime
from celery_tasks import update_thumbnail, combine_ts_stream from celery_tasks.streaming import update_thumbnail, combine_ts_stream
from dateutil import parser from dateutil import parser
from utils.path_manager import PathManager from utils.path_manager import PathManager
import json import json
@@ -205,7 +205,6 @@ def publish_stream():
periodically update thumbnail periodically update thumbnail
""" """
try: try:
data = json.loads(request.form.get("data")) data = json.loads(request.form.get("data"))
except json.JSONDecodeError as ex: except json.JSONDecodeError as ex:

View File

@@ -1,10 +1,4 @@
from celery import Celery, shared_task, Task from celery import Celery, shared_task, Task
from utils.stream_utils import generate_thumbnail, get_streamer_live_status
from time import sleep
from os import listdir, remove
from datetime import datetime
from celery_tasks.preferences import user_preferences
import subprocess
def celery_init_app(app) -> Celery: def celery_init_app(app) -> Celery:
class FlaskTask(Task): class FlaskTask(Task):
@@ -23,53 +17,3 @@ def celery_init_app(app) -> Celery:
celery_app.set_default() celery_app.set_default()
app.extensions["celery"] = celery_app app.extensions["celery"] = celery_app
return celery_app return celery_app
@shared_task
def update_thumbnail(user_id, stream_file, thumbnail_file, sleep_time) -> None:
"""
Updates the thumbnail of a stream periodically
"""
if get_streamer_live_status(user_id)['is_live']:
print("Updating thumbnail...")
generate_thumbnail(stream_file, thumbnail_file)
update_thumbnail.apply_async((user_id, stream_file, thumbnail_file, sleep_time), countdown=sleep_time)
else:
print("Stream has ended, stopping thumbnail updates")
@shared_task
def combine_ts_stream(stream_path, vods_path, vod_file_name):
"""
Combines all ts files into a single vod, and removes the ts files
"""
ts_files = [f for f in listdir(stream_path) if f.endswith(".ts")]
ts_files.sort()
# Create temp file listing all ts files
with open(f"{stream_path}/list.txt", "w") as f:
for ts_file in ts_files:
f.write(f"file '{ts_file}'\n")
# Concatenate all ts files into a single vod
vod_command = [
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
f"{stream_path}/list.txt",
"-c",
"copy",
f"{vods_path}/{vod_file_name}.mp4"
]
subprocess.run(vod_command)
# Remove ts files
for ts_file in ts_files:
remove(f"{stream_path}/{ts_file}")
# Remove m3u8 file
remove(f"{stream_path}/index.m3u8")

View File

@@ -0,0 +1,71 @@
from celery import Celery, shared_task, Task
from datetime import datetime
from celery_tasks.preferences import user_preferences
from utils.stream_utils import generate_thumbnail, get_streamer_live_status
from time import sleep
from os import listdir, remove
import subprocess
@shared_task
def update_thumbnail(user_id, stream_file, thumbnail_file, sleep_time) -> None:
"""
Updates the thumbnail of a stream periodically
"""
if get_streamer_live_status(user_id)['is_live']:
print("Updating thumbnail...")
generate_thumbnail(stream_file, thumbnail_file)
update_thumbnail.apply_async((user_id, stream_file, thumbnail_file, sleep_time), countdown=sleep_time)
else:
print("Stream has ended, stopping thumbnail updates")
@shared_task
def combine_ts_stream(stream_path, vods_path, vod_file_name):
"""
Combines all ts files into a single vod, and removes the ts files
"""
ts_files = [f for f in listdir(stream_path) if f.endswith(".ts")]
ts_files.sort()
# Create temp file listing all ts files
with open(f"{stream_path}/list.txt", "w") as f:
for ts_file in ts_files:
f.write(f"file '{ts_file}'\n")
# Concatenate all ts files into a single vod
vod_command = [
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
f"{stream_path}/list.txt",
"-c",
"copy",
f"{vods_path}/{vod_file_name}.mp4"
]
subprocess.run(vod_command)
# Remove ts files
for ts_file in ts_files:
remove(f"{stream_path}/{ts_file}")
# Remove m3u8 file
remove(f"{stream_path}/index.m3u8")
@shared_task
def convert_image_to_png(image_path, png_path):
"""
Converts an image to a png
"""
image_command = [
"ffmpeg",
"-y",
"-i",
image_path,
png_path
]
subprocess.run(image_command)