FEAT: Implemented auto-updating thumbnails (includes the addition of Redis and Celery)
This commit is contained in:
@@ -10,6 +10,8 @@ services:
|
|||||||
- web_server
|
- web_server
|
||||||
networks:
|
networks:
|
||||||
- app_network
|
- app_network
|
||||||
|
volumes:
|
||||||
|
- stream_data:/stream_data
|
||||||
|
|
||||||
web_server:
|
web_server:
|
||||||
build:
|
build:
|
||||||
@@ -23,6 +25,8 @@ services:
|
|||||||
environment:
|
environment:
|
||||||
- FLASK_APP=blueprints.__init__
|
- FLASK_APP=blueprints.__init__
|
||||||
- FLASK_ENV=production
|
- FLASK_ENV=production
|
||||||
|
volumes:
|
||||||
|
- stream_data:/web_server/stream_data
|
||||||
|
|
||||||
frontend:
|
frontend:
|
||||||
build:
|
build:
|
||||||
@@ -33,7 +37,29 @@ services:
|
|||||||
- app_network
|
- app_network
|
||||||
depends_on:
|
depends_on:
|
||||||
- web_server
|
- web_server
|
||||||
|
|
||||||
|
redis:
|
||||||
|
image: "redis:alpine"
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
networks:
|
||||||
|
- app_network
|
||||||
|
|
||||||
|
celery:
|
||||||
|
build:
|
||||||
|
context: ./web_server
|
||||||
|
command: celery -A celery_tasks.celery_app worker --loglevel=info
|
||||||
|
depends_on:
|
||||||
|
- redis
|
||||||
|
volumes:
|
||||||
|
- .:/app
|
||||||
|
- stream_data:/web_server/stream_data
|
||||||
|
networks:
|
||||||
|
- app_network
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
app_network:
|
app_network:
|
||||||
driver: bridge
|
driver: bridge
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
stream_data:
|
||||||
@@ -24,7 +24,7 @@ rtmp {
|
|||||||
live on;
|
live on;
|
||||||
|
|
||||||
hls on;
|
hls on;
|
||||||
hls_path /tmp/hls;
|
hls_path /stream_data/hls;
|
||||||
|
|
||||||
allow publish 127.0.0.1;
|
allow publish 127.0.0.1;
|
||||||
deny publish all;
|
deny publish all;
|
||||||
@@ -69,7 +69,7 @@ http {
|
|||||||
|
|
||||||
# The MPEG-TS video chunks are stored in /tmp/hls
|
# The MPEG-TS video chunks are stored in /tmp/hls
|
||||||
location ~ ^/stream/user/(.+\.ts)$ {
|
location ~ ^/stream/user/(.+\.ts)$ {
|
||||||
alias /tmp/hls/$1;
|
alias /stream_data/hls/$1;
|
||||||
|
|
||||||
# Let the MPEG-TS video chunks be cacheable
|
# Let the MPEG-TS video chunks be cacheable
|
||||||
expires max;
|
expires max;
|
||||||
@@ -77,12 +77,20 @@ http {
|
|||||||
|
|
||||||
# The M3U8 playlists location
|
# The M3U8 playlists location
|
||||||
location ~ ^/stream/user/(.+\.m3u8)$ {
|
location ~ ^/stream/user/(.+\.m3u8)$ {
|
||||||
alias /tmp/hls/$1;
|
alias /stream_data/hls/$1;
|
||||||
|
|
||||||
# The M3U8 playlists should not be cacheable
|
# The M3U8 playlists should not be cacheable
|
||||||
expires -1d;
|
expires -1d;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# The thumbnails location
|
||||||
|
location ~ ^/stream/user/thumbnails/(.+\.jpg)$ {
|
||||||
|
alias /stream_data/thumbnails/$1;
|
||||||
|
|
||||||
|
# The thumbnails should not be cacheable
|
||||||
|
expires -1d;
|
||||||
|
}
|
||||||
|
|
||||||
location / {
|
location / {
|
||||||
proxy_pass http://frontend:5173; # frontend is the name of the React container in docker-compose
|
proxy_pass http://frontend:5173; # frontend is the name of the React container in docker-compose
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ from blueprints.user import user_bp
|
|||||||
from blueprints.streams import stream_bp
|
from blueprints.streams import stream_bp
|
||||||
from blueprints.chat import chat_bp
|
from blueprints.chat import chat_bp
|
||||||
from blueprints.socket import socketio
|
from blueprints.socket import socketio
|
||||||
|
from celery import Celery
|
||||||
|
from celery_tasks import celery_init_app
|
||||||
|
|
||||||
from os import getenv
|
from os import getenv
|
||||||
|
|
||||||
@@ -26,6 +28,17 @@ def create_app():
|
|||||||
app.config["SECRET_KEY"] = getenv("FLASK_SECRET_KEY")
|
app.config["SECRET_KEY"] = getenv("FLASK_SECRET_KEY")
|
||||||
app.config["SESSION_PERMANENT"] = False
|
app.config["SESSION_PERMANENT"] = False
|
||||||
app.config["SESSION_TYPE"] = "filesystem"
|
app.config["SESSION_TYPE"] = "filesystem"
|
||||||
|
|
||||||
|
app.config.from_mapping(
|
||||||
|
CELERY=dict(
|
||||||
|
broker_url="redis://redis:6379/0",
|
||||||
|
result_backend="redis://redis:6379/0",
|
||||||
|
task_ignore_result=True,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
app.config.from_prefixed_env()
|
||||||
|
celery = celery_init_app(app)
|
||||||
|
|
||||||
#! ↓↓↓ For development purposes only - Allow cross-origin requests for the frontend
|
#! ↓↓↓ For development purposes only - Allow cross-origin requests for the frontend
|
||||||
CORS(app, supports_credentials=True)
|
CORS(app, supports_credentials=True)
|
||||||
# csrf.init_app(app)
|
# csrf.init_app(app)
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
from flask import Blueprint, session, jsonify, g, request, redirect, abort
|
from flask import Blueprint, session, jsonify, g, request, redirect, abort, send_from_directory
|
||||||
from utils.stream_utils import (
|
from utils.stream_utils import (
|
||||||
streamer_live_status,
|
streamer_live_status,
|
||||||
streamer_most_recent_stream,
|
streamer_most_recent_stream,
|
||||||
user_stream,
|
user_stream,
|
||||||
followed_live_streams,
|
followed_live_streams,
|
||||||
followed_streamers,
|
followed_streamers
|
||||||
)
|
)
|
||||||
from utils.user_utils import get_user_id
|
from utils.user_utils import get_user_id
|
||||||
from blueprints.utils import login_required
|
from blueprints.utils import login_required
|
||||||
@@ -18,8 +18,13 @@ from utils.recommendation_utils import (
|
|||||||
from utils.utils import most_popular_category
|
from utils.utils import most_popular_category
|
||||||
from database.database import Database
|
from database.database import Database
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
from celery_tasks import update_thumbnail
|
||||||
|
|
||||||
stream_bp = Blueprint("stream", __name__)
|
stream_bp = Blueprint("stream", __name__)
|
||||||
|
|
||||||
|
# Constants
|
||||||
|
THUMBNAIL_GENERATION_INTERVAL = 180
|
||||||
|
|
||||||
@stream_bp.route('/get_streams')
|
@stream_bp.route('/get_streams')
|
||||||
def get_sample_streams() -> list[dict]:
|
def get_sample_streams() -> list[dict]:
|
||||||
@@ -185,6 +190,8 @@ def publish_stream():
|
|||||||
datetime.now(),
|
datetime.now(),
|
||||||
1))
|
1))
|
||||||
|
|
||||||
|
update_thumbnail.delay(user_info["user_id"])
|
||||||
|
|
||||||
return redirect(f"/{user_info['username']}")
|
return redirect(f"/{user_info['username']}")
|
||||||
|
|
||||||
@stream_bp.route("/end_stream", methods=["POST"])
|
@stream_bp.route("/end_stream", methods=["POST"])
|
||||||
@@ -201,4 +208,4 @@ def end_stream():
|
|||||||
# Set stream to not live
|
# Set stream to not live
|
||||||
db.execute("""UPDATE streams SET isLive = 0 WHERE user_id = ? AND isLive = 1""", (user_info["user_id"],))
|
db.execute("""UPDATE streams SET isLive = 0 WHERE user_id = ? AND isLive = 1""", (user_info["user_id"],))
|
||||||
|
|
||||||
return "Stream ended", 200
|
return "Stream ended", 200
|
||||||
29
web_server/celery_tasks/__init__.py
Normal file
29
web_server/celery_tasks/__init__.py
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
from celery import Celery, shared_task, Task
|
||||||
|
from utils.stream_utils import generate_thumbnail, streamer_live_status
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
def celery_init_app(app) -> Celery:
|
||||||
|
class FlaskTask(Task):
|
||||||
|
def __call__(self, *args: object, **kwargs: object) -> object:
|
||||||
|
with app.app_context():
|
||||||
|
return self.run(*args, **kwargs)
|
||||||
|
|
||||||
|
celery_app = Celery(app.name, task_cls=FlaskTask)
|
||||||
|
celery_app.config_from_object(app.config["CELERY"])
|
||||||
|
celery_app.set_default()
|
||||||
|
app.extensions["celery"] = celery_app
|
||||||
|
return celery_app
|
||||||
|
|
||||||
|
@shared_task
|
||||||
|
def update_thumbnail(user_id, sleep_time=10) -> None:
|
||||||
|
"""
|
||||||
|
Updates the thumbnail of a stream periodically
|
||||||
|
"""
|
||||||
|
ffmpeg_wait_time = 5
|
||||||
|
|
||||||
|
# check if user is streaming
|
||||||
|
while streamer_live_status(user_id)['isLive']:
|
||||||
|
sleep(ffmpeg_wait_time)
|
||||||
|
generate_thumbnail(user_id)
|
||||||
|
sleep(sleep_time - ffmpeg_wait_time)
|
||||||
|
return
|
||||||
4
web_server/celery_tasks/celery_app.py
Normal file
4
web_server/celery_tasks/celery_app.py
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
from blueprints import create_app
|
||||||
|
|
||||||
|
flask_app = create_app()
|
||||||
|
celery_app = flask_app.extensions["celery"]
|
||||||
@@ -23,4 +23,6 @@ Werkzeug==3.1.3
|
|||||||
WTForms==3.2.1
|
WTForms==3.2.1
|
||||||
Gunicorn==20.1.0
|
Gunicorn==20.1.0
|
||||||
gevent>=22.10.2
|
gevent>=22.10.2
|
||||||
gevent-websocket
|
gevent-websocket
|
||||||
|
celery==5.2.3
|
||||||
|
redis==5.2.1
|
||||||
@@ -1,9 +1,9 @@
|
|||||||
from database.database import Database
|
from database.database import Database
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
import sqlite3
|
import sqlite3, os, subprocess
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
def streamer_live_status(user_id: int) -> dict:
|
||||||
def streamer_live_status(user_id: int) -> bool:
|
|
||||||
"""
|
"""
|
||||||
Returns boolean on whether the given streamer is live
|
Returns boolean on whether the given streamer is live
|
||||||
"""
|
"""
|
||||||
@@ -12,7 +12,10 @@ def streamer_live_status(user_id: int) -> bool:
|
|||||||
SELECT isLive
|
SELECT isLive
|
||||||
FROM streams
|
FROM streams
|
||||||
WHERE user_id = ?
|
WHERE user_id = ?
|
||||||
|
ORDER BY stream_id DESC
|
||||||
|
LIMIT 1;
|
||||||
""", (user_id,))
|
""", (user_id,))
|
||||||
|
|
||||||
return is_live
|
return is_live
|
||||||
|
|
||||||
def followed_live_streams(user_id: int) -> list[dict]:
|
def followed_live_streams(user_id: int) -> list[dict]:
|
||||||
@@ -63,4 +66,34 @@ def user_stream(user_id: int, stream_id: int) -> dict:
|
|||||||
WHERE user_id = ?
|
WHERE user_id = ?
|
||||||
AND stream_id = ?
|
AND stream_id = ?
|
||||||
""", (user_id, stream_id))
|
""", (user_id, stream_id))
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
|
def generate_thumbnail(user_id: int) -> None:
|
||||||
|
"""
|
||||||
|
Returns the thumbnail of a stream
|
||||||
|
"""
|
||||||
|
db = Database()
|
||||||
|
username = db.fetchone("""SELECT * FROM users WHERE user_id = ?""", (user_id,))
|
||||||
|
db.close_connection()
|
||||||
|
|
||||||
|
if not username:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not os.path.exists(f"stream_data/thumbnails/"):
|
||||||
|
os.makedirs(f"stream_data/thumbnails/")
|
||||||
|
|
||||||
|
subprocess.Popen(["ls", "-lR"])
|
||||||
|
|
||||||
|
thumbnail_command = [
|
||||||
|
"ffmpeg",
|
||||||
|
"-y",
|
||||||
|
"-i",
|
||||||
|
f"stream_data/hls/{username['username']}/index.m3u8",
|
||||||
|
"-vframes",
|
||||||
|
"1",
|
||||||
|
"-q:v",
|
||||||
|
"2",
|
||||||
|
f"stream_data/thumbnails/{username['username']}.jpg"
|
||||||
|
]
|
||||||
|
|
||||||
|
subprocess.run(thumbnail_command)
|
||||||
Reference in New Issue
Block a user