From 76de1b7d9dbeef98abfe4b845dc816d8cd32952e Mon Sep 17 00:00:00 2001 From: JustIceO7 Date: Tue, 18 Feb 2025 02:02:34 +0000 Subject: [PATCH] FEAT: Added Celery Beat FEAT: Added preferences Celery task to automatically record what categories users watched --- docker-compose.yml | 19 ++++++- frontend/src/App.tsx | 4 +- frontend/src/components/Video/ChatPanel.tsx | 10 +++- frontend/src/context/AuthContext.tsx | 4 +- web_server/blueprints/chat.py | 63 ++++++++++++++++++++- web_server/blueprints/oauth.py | 1 + web_server/celery_tasks/__init__.py | 8 +++ web_server/celery_tasks/preferences.py | 36 ++++++++++++ web_server/utils/stream_utils.py | 2 +- 9 files changed, 135 insertions(+), 12 deletions(-) create mode 100644 web_server/celery_tasks/preferences.py diff --git a/docker-compose.yml b/docker-compose.yml index 51c1309..17806c4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ - services: nginx: build: @@ -13,6 +12,7 @@ services: - app_network volumes: - stream_data:/stream_data + web_server: build: context: ./web_server @@ -60,7 +60,20 @@ services: - database_data:/web_server/database networks: - app_network - + + celery_beat: + build: + context: ./web_server + command: celery -A celery_tasks.celery_app beat --loglevel=info + depends_on: + - redis + volumes: + - .:/app + - stream_data:/web_server/stream_data + - database_data:/web_server/database + networks: + - app_network + networks: app_network: driver: bridge @@ -69,4 +82,4 @@ volumes: stream_data: driver: local database_data: - driver: local \ No newline at end of file + driver: local diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index a4fc03e..67cf578 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -13,12 +13,14 @@ import ResultsPage from "./pages/ResultsPage"; function App() { const [isLoggedIn, setIsLoggedIn] = useState(false); + const [user_id, setUserID] = useState(null); const [username, setUsername] = useState(null); useEffect(() => { fetch("/api/user/login_status") .then((response) => response.json()) .then((data) => { + setUserID(data.user_id); setIsLoggedIn(data.status); setUsername(data.username); }) @@ -30,7 +32,7 @@ function App() { return ( diff --git a/frontend/src/components/Video/ChatPanel.tsx b/frontend/src/components/Video/ChatPanel.tsx index bdda1d2..6240883 100644 --- a/frontend/src/components/Video/ChatPanel.tsx +++ b/frontend/src/components/Video/ChatPanel.tsx @@ -24,7 +24,7 @@ const ChatPanel: React.FC = ({ onViewerCountChange, onInputFocus, }) => { - const { isLoggedIn, username } = useAuth(); + const { isLoggedIn, username, user_id} = useAuth(); const { showAuthModal, setShowAuthModal } = useAuthModal(); const { socket, isConnected } = useSocket(); const [messages, setMessages] = useState([]); @@ -37,13 +37,17 @@ const ChatPanel: React.FC = ({ if (socket && isConnected) { // Add username check socket.emit("join", { + user_id: user_id ? user_id : null, username: username ? username : "Guest", stream_id: streamId, }); // Handle beforeunload event const handleBeforeUnload = () => { - socket.emit("leave", { stream_id: streamId }); + socket.emit("leave", { + user_id: user_id ? user_id : null, + username: username ? username : "Guest", + stream_id: streamId, }); socket.disconnect(); }; @@ -83,7 +87,7 @@ const ChatPanel: React.FC = ({ socket.disconnect(); }; } - }, [socket, isConnected, username, streamId]); + }, [socket, isConnected, user_id, username, streamId]); // Auto-scroll to bottom when new messages arrive useEffect(() => { diff --git a/frontend/src/context/AuthContext.tsx b/frontend/src/context/AuthContext.tsx index 5f86a42..f7d296b 100644 --- a/frontend/src/context/AuthContext.tsx +++ b/frontend/src/context/AuthContext.tsx @@ -3,12 +3,14 @@ import { createContext, useContext } from "react"; interface AuthContextType { isLoggedIn: boolean; username: string | null; + user_id: number | null; setIsLoggedIn: (value: boolean) => void; setUsername: (value: string | null) => void; + setUserId: (value: number | null) => void; } export const AuthContext = createContext( - undefined, + undefined ); export function useAuth() { diff --git a/web_server/blueprints/chat.py b/web_server/blueprints/chat.py index 6a989d7..29f9768 100644 --- a/web_server/blueprints/chat.py +++ b/web_server/blueprints/chat.py @@ -1,10 +1,14 @@ -from flask import Blueprint, jsonify +from flask import Blueprint, jsonify, session from database.database import Database from .socket import socketio from flask_socketio import emit, join_room, leave_room from datetime import datetime from utils.user_utils import get_user_id +import redis +import json +redis_url = "redis://redis:6379/1" +r = redis.from_url(redis_url, decode_responses=True) chat_bp = Blueprint("chat", __name__) #NOTE: <---------------------- ROUTES NEEDS TO BE CHANGED TO VIDEO OR DELETED AS DEEMED APPROPRIATE ----------------------> @@ -22,8 +26,12 @@ def handle_join(data) -> None: """ Allow a user to join the chat of the stream they are watching. """ + print(data, flush=True) stream_id = data.get("stream_id") if stream_id: + user_id = get_user_id(data["username"]) + if user_id: + add_favourability_entry(str(user_id), str(stream_id)) join_room(stream_id) num_viewers = len(list(socketio.server.manager.get_participants("/", stream_id))) update_viewers(stream_id, num_viewers) @@ -40,11 +48,15 @@ def handle_leave(data) -> None: """ Handle what happens when a user leaves the stream they are watching in regards to the chat. """ + print(data, flush=True) stream_id = data.get("stream_id") + user_id = data.get("user_id") if stream_id: leave_room(stream_id) + if user_id: + remove_favourability_entry(data["user_id"], stream_id) num_viewers = len(list(socketio.server.manager.get_participants("/", stream_id))) - update_viewers(stream_id, num_viewers) + update_viewers(str(user_id), str(stream_id)) emit("status", { "message": f"Welcome to the chat, stream_id: {stream_id}", @@ -132,4 +144,49 @@ def update_viewers(user_id, num_viewers): SET num_viewers = ? WHERE user_id = ?; """, (num_viewers, user_id)) - db.close_connection \ No newline at end of file + db.close_connection +#TODO: Make sure that users entry within Redis is removed if they disconnect from socket +def add_favourability_entry(user_id, stream_id): + """ + Adds entry to Redis that user is watching a streamer + """ + current_viewers = r.hget("current_viewers", "viewers") + + if current_viewers: + current_viewers = json.loads(current_viewers) + else: + current_viewers = {} + + + # Checks if user exists already + if user_id in current_viewers: + # If already exists append stream to user + current_viewers[user_id].append(stream_id) + else: + # Creates new entry for user and stream + current_viewers[user_id] = [stream_id] + + r.hset("current_viewers", "viewers", json.dumps(current_viewers)) + +def remove_favourability_entry(user_id, stream_id): + """ + Removes entry to Redis that user is watching a streamer + """ + current_viewers = r.hget("current_viewers", "viewers") + + # If key exists + if current_viewers: + current_viewers = json.loads(current_viewers) + else: + current_viewers = {} + + # Checks if user exists already + if user_id in current_viewers: + # Removes specific stream from user + current_viewers[user_id] = [stream for stream in current_viewers[user_id] if stream != stream_id] + + # If user is no longer watching any streams + if not current_viewers[user_id]: + del current_viewers[user_id] + + r.hset("current_viewers", "viewers", json.dumps(current_viewers)) \ No newline at end of file diff --git a/web_server/blueprints/oauth.py b/web_server/blueprints/oauth.py index 8d5846e..fd3bc35 100644 --- a/web_server/blueprints/oauth.py +++ b/web_server/blueprints/oauth.py @@ -98,6 +98,7 @@ def google_auth(): session.clear() session["username"] = user_data["username"] session["user_id"] = user_data["user_id"] + print(f"session: {session.get('username')}. user_id: {session.get('user_id')}", flush=True) return redirect(origin) diff --git a/web_server/celery_tasks/__init__.py b/web_server/celery_tasks/__init__.py index bb00459..c119113 100644 --- a/web_server/celery_tasks/__init__.py +++ b/web_server/celery_tasks/__init__.py @@ -3,6 +3,7 @@ from utils.stream_utils import generate_thumbnail, get_streamer_live_status from time import sleep from os import listdir, remove from datetime import datetime +from celery_tasks.preferences import user_preferences import subprocess def celery_init_app(app) -> Celery: @@ -13,10 +14,17 @@ def celery_init_app(app) -> Celery: celery_app = Celery(app.name, task_cls=FlaskTask) celery_app.config_from_object(app.config["CELERY"]) + celery_app.conf.beat_schedule = { + 'user-favourability-task': { + 'task': 'celery_tasks.preferences.user_preferences', + 'schedule': 30.0, + }, + } celery_app.set_default() app.extensions["celery"] = celery_app return celery_app + @shared_task def update_thumbnail(user_id, stream_file, thumbnail_file, sleep_time) -> None: """ diff --git a/web_server/celery_tasks/preferences.py b/web_server/celery_tasks/preferences.py new file mode 100644 index 0000000..c6ff9c7 --- /dev/null +++ b/web_server/celery_tasks/preferences.py @@ -0,0 +1,36 @@ +from celery import shared_task +from database.database import Database +import redis +import json + +redis_url = "redis://redis:6379/1" +r = redis.from_url(redis_url, decode_responses=True) + +@shared_task +def user_preferences(): + """ + Updates users preferences on different stream categories based on the streams they are currently watching + """ + stats = r.hget("current_viewers", "viewers") + # If there are any current viewers + if stats: + stats = json.loads(stats) + print(stats, flush=True) + with Database() as db: + # Loop over all users and their currently watching streams + for user_id, stream_ids in stats.items(): + # For each user and stream combination + for stream_id in stream_ids: + # Retrieves category associated with stream + current_category = db.fetchone("""SELECT category_id FROM streams + WHERE user_id = ? + """, (stream_id)) + # If stream is still live then update the user_preferences table to reflect their preferences + if current_category: + db.execute("""INSERT INTO user_preferences (user_id,category_id,favourability) + VALUES (?,?,?) + ON CONFLICT(user_id, category_id) + DO UPDATE SET favourability = favourability + 1 + """, (user_id, current_category["category_id"], 1)) + data = db.fetchall("SELECT * FROM user_preferences") + print(data,flush=True) \ No newline at end of file diff --git a/web_server/utils/stream_utils.py b/web_server/utils/stream_utils.py index eaa8e0a..52581de 100644 --- a/web_server/utils/stream_utils.py +++ b/web_server/utils/stream_utils.py @@ -39,7 +39,7 @@ def get_current_stream_data(user_id: int) -> Optional[dict]: """ with Database() as db: most_recent_stream = db.fetchone(""" - SELECT s.user_id, u.username, s.title, s.start_time, s.num_viewers, c.category_name + SELECT s.user_id, u.username, s.title, s.start_time, s.num_viewers, c.category_name, c.category_id FROM streams AS s JOIN categories AS c ON s.category_id = c.category_id JOIN users AS u ON s.user_id = u.user_id