119 lines
3.7 KiB
Python
119 lines
3.7 KiB
Python
import os
|
|
import datetime
|
|
import logging
|
|
|
|
from dotenv import load_dotenv
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.errors import HttpError
|
|
from dto.post import Post
|
|
from dto.comment import Comment
|
|
from server.connectors.base import BaseConnector
|
|
|
|
load_dotenv()
|
|
API_KEY = os.getenv("YOUTUBE_API_KEY")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
class YouTubeAPI(BaseConnector):
|
|
source_name: str = "youtube"
|
|
display_name: str = "YouTube"
|
|
search_enabled: bool = True
|
|
categories_enabled: bool = False
|
|
|
|
def __init__(self):
|
|
self.youtube = build("youtube", "v3", developerKey=API_KEY)
|
|
|
|
def get_new_posts_by_search(
|
|
self, search: str, category: str, post_limit: int
|
|
) -> list[Post]:
|
|
videos = self._search_videos(search, post_limit)
|
|
posts = []
|
|
|
|
for video in videos:
|
|
video_id = video["id"]["videoId"]
|
|
snippet = video["snippet"]
|
|
title = snippet["title"]
|
|
description = snippet["description"]
|
|
published_at = datetime.datetime.strptime(
|
|
snippet["publishedAt"], "%Y-%m-%dT%H:%M:%SZ"
|
|
).timestamp()
|
|
channel_title = snippet["channelTitle"]
|
|
|
|
comments = []
|
|
comments_data = self._get_video_comments(video_id)
|
|
for comment_thread in comments_data:
|
|
comment_snippet = comment_thread["snippet"]["topLevelComment"][
|
|
"snippet"
|
|
]
|
|
comment = Comment(
|
|
id=comment_thread["id"],
|
|
post_id=video_id,
|
|
content=comment_snippet["textDisplay"],
|
|
author=comment_snippet["authorDisplayName"],
|
|
timestamp=datetime.datetime.strptime(
|
|
comment_snippet["publishedAt"], "%Y-%m-%dT%H:%M:%SZ"
|
|
).timestamp(),
|
|
reply_to=None,
|
|
source=self.source_name,
|
|
)
|
|
|
|
comments.append(comment)
|
|
|
|
post = Post(
|
|
id=video_id,
|
|
content=f"{title}\n\n{description}",
|
|
author=channel_title,
|
|
timestamp=published_at,
|
|
url=f"https://www.youtube.com/watch?v={video_id}",
|
|
title=title,
|
|
source=self.source_name,
|
|
comments=comments,
|
|
)
|
|
|
|
posts.append(post)
|
|
|
|
return posts
|
|
|
|
def category_exists(self, category):
|
|
return True
|
|
|
|
def _search_videos(self, query, limit):
|
|
results = []
|
|
next_page_token = None
|
|
|
|
while len(results) < limit:
|
|
batch_size = min(50, limit - len(results))
|
|
|
|
request = self.youtube.search().list(
|
|
q=query,
|
|
part="snippet",
|
|
type="video",
|
|
maxResults=batch_size,
|
|
pageToken=next_page_token
|
|
)
|
|
|
|
response = request.execute()
|
|
results.extend(response.get("items", []))
|
|
logging.info(f"Fetched {len(results)} out of {limit} videos for query '{query}'")
|
|
|
|
next_page_token = response.get("nextPageToken")
|
|
if not next_page_token:
|
|
logging.warning(f"No more pages of results available for query '{query}'")
|
|
break
|
|
|
|
return results[:limit]
|
|
|
|
def _get_video_comments(self, video_id):
|
|
request = self.youtube.commentThreads().list(
|
|
part="snippet", videoId=video_id, textFormat="plainText"
|
|
)
|
|
|
|
try:
|
|
response = request.execute()
|
|
except HttpError as e:
|
|
print(f"Error fetching comments for video {video_id}: {e}")
|
|
return []
|
|
return response.get("items", [])
|