This commit is contained in:
white
2025-03-03 11:43:25 +00:00
17 changed files with 268 additions and 294 deletions

View File

@@ -95,12 +95,6 @@ def signup():
# Create user directories for stream data
path_manager.create_user(username)
# Create session for new user, to avoid them having unnecessary state info
session.clear()
session["username"] = username
session["user_id"] = get_user_id(username)
print(f"Logged in as {username}. session: {session.get('username')}. user_id: {session.get('user_id')}", flush=True)
# send_email(username)
return jsonify({
@@ -178,8 +172,11 @@ def login():
"error_fields": ["username", "password"],
"message": "Invalid username or password"
}), 401
# Add user directories for stream data in case they don't exist
path_manager.create_user(username)
# Set up session to avoid having unncessary state information
# Set up session
session.clear()
session["username"] = username
session["user_id"] = get_user_id(username)
@@ -209,8 +206,27 @@ def logout() -> dict:
"""
Log out and clear the users session.
If the user is currently streaming, end their stream first.
Can only be accessed by a logged in user.
"""
from database.database import Database
from utils.stream_utils import end_user_stream
# Check if user is currently streaming
user_id = session.get("user_id")
username = session.get("username")
with Database() as db:
is_streaming = db.fetchone("""SELECT is_live FROM users WHERE user_id = ?""", (user_id,))
if is_streaming and is_streaming.get("is_live") == 1:
# Get the user's stream key
stream_key_info = db.fetchone("""SELECT stream_key FROM users WHERE user_id = ?""", (user_id,))
stream_key = stream_key_info.get("stream_key") if stream_key_info else None
if stream_key:
# End the stream
end_user_stream(stream_key, user_id, username)
session.clear()
return {"logged_in": False}

View File

@@ -67,13 +67,17 @@ def recommended_streams() -> list[dict]:
return streams
@stream_bp.route('/streams/<string:username>/data')
@stream_bp.route('/streams/<int:streamer_id>/data')
def stream_data(streamer_id) -> dict:
def stream_data(username=None, streamer_id=None) -> dict:
"""
Returns a streamer's current stream data
"""
if username and not streamer_id:
streamer_id = get_user_id(username)
data = get_current_stream_data(streamer_id)
# If the user is the streamer, return the stream key also
if session.get('user_id') == streamer_id:
with Database() as db:
stream_key = db.fetchone(
@@ -112,33 +116,21 @@ def recommended_categories() -> list | list[dict]:
"""
user_id = session.get("user_id")
categories = get_user_category_recommendations(1)
categories = get_user_category_recommendations(user_id)
return jsonify(categories)
@login_required
@stream_bp.route('/categories/following')
@stream_bp.route('/streams/followed_categories')
def following_categories_streams():
"""
Returns popular streams in categories which the user followed
Returns popular streams from categories the user is following
"""
streams = get_followed_categories_recommendations(session.get('user_id'))
return jsonify(streams)
@login_required
@stream_bp.route('/categories/your_categories')
def following_your_categories():
"""
Returns categories which the user followed
"""
streams = get_followed_your_categories(session.get('user_id'))
return jsonify(streams)
# User Routes
@stream_bp.route('/user/<string:username>/status')
def user_live_status(username):
@@ -172,7 +164,7 @@ def user_live_status(username):
@stream_bp.route('/vods/<int:vod_id>')
def vod(vod_id):
"""
Returns a JSON of a vod
Returns details about a specific vod
"""
vod = get_vod(vod_id)
return jsonify(vod)
@@ -187,6 +179,7 @@ def vods(username):
"vod_id": int,
"title": str,
"datetime": str,
"username": str,
"category_name": str,
"length": int (in seconds),
"views": int
@@ -204,10 +197,8 @@ def get_all_vods():
Returns data of all VODs by all streamers in a JSON-compatible format
"""
with Database() as db:
vods = db.fetchall("SELECT * FROM vods")
print("Fetched VODs from DB:", vods)
vods = db.fetchall("""SELECT vods.*, username, category_name FROM vods JOIN users ON vods.user_id = users.user_id JOIN categories ON vods.category_id = categories.category_id;""")
return jsonify(vods)
# RTMP Server Routes
@@ -355,23 +346,12 @@ def update_stream():
@stream_bp.route("/end_stream", methods=["POST"])
def end_stream():
"""
Ends a stream
step-by-step:
remove stream from database
move stream to vod table
set user as not streaming
convert ts files to mp4
clean up old ts files
end thumbnail generation
Ends a stream based on the HTTP request
"""
print("Ending stream", flush=True)
print("TEST END STREAM")
stream_key = request.get_json().get("key")
print(stream_key, flush=True)
user_id = None
username = None
if not stream_key:
# Try getting stream_key from form data (for nginx in the case that the stream is ended on OBS's end)
stream_key = request.form.get("name")
@@ -380,60 +360,24 @@ def end_stream():
print("Unauthorized - No stream key provided", flush=True)
return "Unauthorized", 403
# Open database connection
# Get user info from stream key
with Database() as db:
initial_streams = db.fetchall("""SELECT title FROM streams""")
print("Initial streams:", initial_streams, flush=True)
# Get user info from stream key
user_info = db.fetchone("""SELECT *
user_info = db.fetchone("""SELECT user_id, username
FROM users
WHERE stream_key = ?""", (stream_key,))
stream_info = db.fetchone("""SELECT *
FROM streams
WHERE user_id = ?""", (user_id,))
print("Got stream_info", stream_info, flush=True)
# If stream key is invalid, return unauthorized
if not user_info:
print("Unauthorized - No user found from stream key", flush=True)
return "Unauthorized", 403
# If stream never published, return
if not stream_info:
print(f"Stream for stream key: {stream_key} never began", flush=True)
return "Stream ended", 200
# Remove stream from database
db.execute("""DELETE FROM streams
WHERE user_id = ?""", (user_id,))
# Move stream to vod table
stream_length = int(
(datetime.now() - parser.parse(stream_info.get("start_time"))).total_seconds())
db.execute("""INSERT INTO vods (user_id, title, datetime, category_id, length, views)
VALUES (?, ?, ?, ?, ?, ?)""", (user_id,
stream_info.get("title"),
stream_info.get(
"start_time"),
stream_info.get(
"category_id"),
stream_length,
0))
vod_id = db.get_last_insert_id()
# Set user as not streaming
db.execute("""UPDATE users
SET is_live = 0
WHERE user_id = ?""", (user_id,))
current_streams = db.fetchall("""SELECT title FROM streams""")
combine_ts_stream.delay(path_manager.get_stream_path(
username), path_manager.get_vods_path(username), vod_id)
print("Stream ended. Current streams now:", current_streams, flush=True)
return "Stream ended", 200
user_id = user_info["user_id"]
username = user_info["username"]
result, message = end_user_stream(stream_key, user_id, username)
if result:
print(f"Stream ended: {message}", flush=True)
return "Stream ended", 200
else:
print(f"Error ending stream: {message}", flush=True)
return "Error ending stream", 500

View File

@@ -61,17 +61,6 @@ def user_profile_picture_save():
return jsonify({"message": "Profile picture saved", "path": thumbnail_path})
@login_required
@user_bp.route('/user/same/<string:username>')
def user_is_same(username):
"""
Returns if given user is current user
"""
current_username = session.get("username")
if username == current_username:
return jsonify({"same": True})
return jsonify({"same": False})
## Subscription Routes
@login_required
@user_bp.route('/user/subscription/<string:streamer_name>')

View File

@@ -94,7 +94,7 @@ def get_highest_view_categories(no_categories: int = 4, offset: int = 0) -> Opti
""", (no_categories, offset))
return categories
def get_user_category_recommendations(user_id: 1, no_categories: int = 4) -> Optional[List[dict]]:
def get_user_category_recommendations(user_id = 1, no_categories: int = 4) -> Optional[List[dict]]:
"""
Queries user_preferences database to find users top favourite streaming category and returns the category
"""

View File

@@ -48,6 +48,81 @@ def get_current_stream_data(user_id: int) -> Optional[dict]:
""", (user_id,))
return most_recent_stream
def end_user_stream(stream_key, user_id, username):
"""
Utility function to end a user's stream
Parameters:
stream_key: The stream key of the user
user_id: The ID of the user
username: The username of the user
Returns:
bool: True if stream was ended successfully, False otherwise
"""
from flask import current_app
from datetime import datetime
from dateutil import parser
from celery_tasks.streaming import combine_ts_stream
from utils.path_manager import PathManager
path_manager = PathManager()
print(f"Ending stream for user {username} (ID: {user_id})", flush=True)
if not stream_key or not user_id or not username:
print("Cannot end stream - missing required information", flush=True)
return False
try:
# Open database connection
with Database() as db:
# Get stream info
stream_info = db.fetchone("""SELECT *
FROM streams
WHERE user_id = ?""", (user_id,))
# If user is not streaming, just return
if not stream_info:
print(f"User {username} (ID: {user_id}) is not streaming", flush=True)
return True, "User is not streaming"
# Remove stream from database
db.execute("""DELETE FROM streams
WHERE user_id = ?""", (user_id,))
# Move stream to vod table
stream_length = int(
(datetime.now() - parser.parse(stream_info.get("start_time"))).total_seconds())
db.execute("""INSERT INTO vods (user_id, title, datetime, category_id, length, views)
VALUES (?, ?, ?, ?, ?, ?)""", (user_id,
stream_info.get("title"),
stream_info.get("start_time"),
stream_info.get("category_id"),
stream_length,
0))
vod_id = db.get_last_insert_id()
# Set user as not streaming
db.execute("""UPDATE users
SET is_live = 0
WHERE user_id = ?""", (user_id,))
# Queue task to combine TS files into MP4
combine_ts_stream.delay(
path_manager.get_stream_path(username),
path_manager.get_vods_path(username),
vod_id
)
print(f"Stream ended for user {username} (ID: {user_id})", flush=True)
return True, "Stream ended successfully"
except Exception as e:
print(f"Error ending stream for user {username}: {str(e)}", flush=True)
return False, f"Error ending stream: {str(e)}"
def get_category_id(category_name: str) -> Optional[int]:
"""
Returns the category_id given a category name
@@ -77,7 +152,7 @@ def get_vod(vod_id: int) -> dict:
Returns data of a streamers vod
"""
with Database() as db:
vod = db.fetchone("""SELECT * FROM vods WHERE vod_id = ?;""", (vod_id,))
vod = db.fetchone("""SELECT vods.*, username, category_name FROM vods JOIN users ON vods.user_id = users.user_id JOIN categories ON vods.category_id = categories.category_id WHERE vod_id = ?;""", (vod_id,))
return vod
def get_latest_vod(user_id: int):
@@ -85,7 +160,7 @@ def get_latest_vod(user_id: int):
Returns data of the most recent stream by a streamer
"""
with Database() as db:
latest_vod = db.fetchone("""SELECT vods.*, category_name FROM vods JOIN categories ON vods.category_id = categories.category_id WHERE user_id = ? ORDER BY vod_id DESC;""", (user_id,))
latest_vod = db.fetchone("""SELECT vods.*, username, category_name FROM vods JOIN users ON vods.user_id = users.user_id JOIN categories ON vods.category_id = categories.category_id WHERE vods.user_id = ? ORDER BY vod_id DESC;""", (user_id,))
return latest_vod
def get_user_vods(user_id: int):
@@ -93,15 +168,7 @@ def get_user_vods(user_id: int):
Returns data of all vods by a streamer
"""
with Database() as db:
vods = db.fetchall("""SELECT vods.*, category_name FROM vods JOIN categories ON vods.category_id = categories.category_id WHERE user_id = ? ORDER BY vod_id DESC;""", (user_id,))
return vods
def get_all_vods():
"""
Returns data of all VODs by all streamers in a JSON-compatible format
"""
with Database() as db:
vods = db.fetchall("""SELECT * FROM vods""")
vods = db.fetchall("""SELECT vods.*, username, category_name FROM vods JOIN users ON vods.user_id = users.user_id JOIN categories ON vods.category_id = categories.category_id WHERE vods.user_id = ? ORDER BY vod_id DESC;""", (user_id,))
return vods
def generate_thumbnail(stream_file: str, thumbnail_file: str) -> None: