Created utils folder which holds funcs to be called within the flask routes, added pseudo code to streams.py and user.py which are untested, data still needs to be jsonified to be sent to React

This commit is contained in:
JustIceO7
2025-01-26 06:51:24 +00:00
parent 64a5bf6f82
commit ff9feaacec
6 changed files with 251 additions and 37 deletions

View File

@@ -0,0 +1,26 @@
from database.database import Database
from user_utils import get_user_id
from typing import Optional, List, Tuple
def user_recommendation_category(username: str) -> Optional[int]:
"""
Queries user_preferences database to find users favourite streaming category and returns the category
"""
db = Database()
cursor = db.create_connection()
user_id = get_user_id(username)
data = cursor.execute(
"SELECT category_id FROM user_preferences WHERE user_id = ? ORDER BY favourability DESC LIMIT 1", (user_id,)).fetchone()
return data[0]
def recommendations_based_on_category(category_id: int) -> Optional[List[Tuple[int, str, int]]]:
"""
Queries stream database to get top 10 most viewed streams based on given category and returns (stream_id, title, num_viewers)
"""
db = Database()
cursor = db.create_connection()
data = cursor.execute(
"SELECT streamer_id, stream_id, title, num_viewers FROM streams WHERE category_id = ? ORDER BY num_viwers DESC LIMIT 10", (category_id,)).fetchall()
return data

View File

@@ -0,0 +1,85 @@
from database.database import Database
from typing import Optional
def streamer_data(streamer_id: int):
"""
Retrieves data given streamer (username, since, isPartnered)
"""
db = Database()
cursor = db.create_connection()
streamer_data = cursor.execute("""SELECT username, since, isPartnered FROM
streamers JOIN users ON
streamers.user_id = users.user_id
WHERE streamer_id = ?""", (streamer_id,)).fetchone()
return streamer_data
def streamer_live_status(streamer_id: int) -> bool:
"""
Returns whether the given streamer is live
"""
db = Database()
cursor = db.create_connection()
return bool(cursor.execute("SELECT 1 FROM streams WHERE streamer_id = ? AND isLive = 1 ORDER BY stream_id DESC", (streamer_id,)).fetchone())
def followed_live_streams(user_id: int):
"""
Searches for streamers who the user followed which are currently live
"""
db = Database()
cursor = db.create_connection()
live_streams = cursor.execute("""
SELECT streamer_id, stream_id, title, num_viewers
FROM streams
WHERE streamer_id IN (SELECT streamer_id FROM follows WHERE user_id = ?)
AND stream_id = (SELECT MAX(stream_id) FROM streams WHERE streamer_id = streams.streamer_id)
AND isLive = 1
""", (user_id,)).fetchall()
return live_streams
def streamer_name(streamer_id: int) -> Optional[str]:
"""
Returns streamers username given streamer_id
"""
db = Database()
cursor = db.create_connection()
streamer_username = cursor.execute(
"SELECT username FROM users WHERE user_id = (SELECT user_id FROM streamers WHERE streamer_id = ?)", (streamer_id,)).fetchone()
return streamer_username[0] if streamer_username else None
def streamer_id(streamer_name: str) -> Optional[int]:
"""
Returns streamers id given streamers name
"""
db = Database()
cursor = db.create_connection()
streamer_id = cursor.execute(
"SELECT streamer_id FROM streamers WHERE user_id = (SELECT user_id FROM users WHERE username = ?)",(streamer_name,)).fetchone()
return streamer_id[0] if streamer_id else None
def streamer_most_recent_stream(streamer_id: int):
"""
Returns data of the most recent stream by a streamer
"""
db = Database()
cursor = db.create_connection()
most_recent_stream = cursor.execute("""SELECT * FROM streams WHERE
streamer_id = ? AND
stream_id = (SELECT MAX(stream_id) FROM
streams WHERE streamer_id = ?)""", (streamer_id,streamer_id)).fetchone()
return most_recent_stream
def streamer_stream(streamer_id: int, stream_id: int):
"""
Returns data of a streamers selected stream
"""
db = Database()
cursor = db.create_connection()
stream = cursor.execute("SELECT * FROM streams WHERE streamer_id = ? AND stream_id = ?", (streamer_id,stream_id)).fetchone()
return stream

View File

@@ -0,0 +1,55 @@
from database.database import Database
from typing import Optional
from datetime import datetime
def get_user_id(username: str) -> Optional[int]:
"""
Returns user_id associated with given username
"""
db = Database()
cursor = db.create_connection()
data = cursor.execute("SELECT user_id FROM user WHERE username = ?", (username,)).fetchone()
return data[0] if data else None
def get_username(user_id: str) -> Optional[str]:
"""
Returns username associated with given user_id
"""
db = Database()
cursor = db.create_connection()
data = cursor.execute("SELECT username FROM user WHERE username = ?", (user_id,)).fetchone()
return data[0] if data else None
def is_subscribed(user_id: int, streamer_id: int) -> bool:
"""
Returns True if user is subscribed to a streamer else False
"""
db = Database()
cursor = db.create_connection()
return bool(cursor.execute(
"SELECT 1 FROM subscribes WHERE user_id = ? AND streamer_id = ? AND expires > since",
(user_id, streamer_id)
).fetchone())
def is_following(user_id: int, streamer_id:int) -> bool:
db = Database()
cursor = db.create_connection()
return bool(cursor.execute(
"SELECT 1 FROM follows WHERE user_id = ? AND streamer_id = ?",
(user_id, streamer_id)
).fetchone())
def subscription_expiration(user_id: int, streamer_id: int) -> int:
"""
Returns the amount of time left until user subscription to a streamer ends
"""
db = Database()
cursor = db.create_connection()
data = cursor.execute(
"SELECT expires from subscriptions WHERE user_id = ? AND streamer_id = ? AND expires > since", (user_id,streamer_id)).fetchone()
remaining_time = 0
if data:
expiration_date = data[0]
remaining_time = (expiration_date - datetime.now()).seconds
return remaining_time

19
web_server/utils/utils.py Normal file
View File

@@ -0,0 +1,19 @@
from database.database import Database
def categories():
"""
Returns all possible streaming categories
"""
db = Database()
cursor = db.create_connection()
all_categories = cursor.execute("SELECT * FROM categories").fetchall()
return all_categories
def tags():
"""
Returns all possible streaming tags
"""
db = Database()
cursor = db.create_connection()
all_tags = cursor.execute("SELECT * FROM tags").fetchall()
return all_tags