Implement YouTube API integration for video and comment fetching
This commit is contained in:
76
connectors/youtube_api.py
Normal file
76
connectors/youtube_api.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from googleapiclient.discovery import build
|
||||
from dto.post import Post
|
||||
from dto.comment import Comment
|
||||
|
||||
load_dotenv()
|
||||
|
||||
API_KEY = os.getenv("YOUTUBE_API_KEY")
|
||||
print(API_KEY)
|
||||
|
||||
class YouTubeAPI:
|
||||
def __init__(self):
|
||||
self.youtube = build('youtube', 'v3', developerKey=API_KEY)
|
||||
|
||||
def search_videos(self, query, limit):
|
||||
request = self.youtube.search().list(
|
||||
q=query,
|
||||
part='snippet',
|
||||
type='video',
|
||||
maxResults=limit
|
||||
)
|
||||
response = request.execute()
|
||||
return response.get('items', [])
|
||||
|
||||
def get_video_comments(self, video_id, limit):
|
||||
request = self.youtube.commentThreads().list(
|
||||
part='snippet',
|
||||
videoId=video_id,
|
||||
maxResults=limit,
|
||||
textFormat='plainText'
|
||||
)
|
||||
response = request.execute()
|
||||
return response.get('items', [])
|
||||
|
||||
def fetch_and_parse_videos(self, query, video_limit, comment_limit):
|
||||
videos = self.search_videos(query, video_limit)
|
||||
posts = []
|
||||
|
||||
for video in videos:
|
||||
video_id = video['id']['videoId']
|
||||
snippet = video['snippet']
|
||||
title = snippet['title']
|
||||
description = snippet['description']
|
||||
published_at = snippet['publishedAt']
|
||||
channel_title = snippet['channelTitle']
|
||||
|
||||
post = Post(
|
||||
id=video_id,
|
||||
content=f"{title}\n\n{description}",
|
||||
author=channel_title,
|
||||
timestamp=published_at,
|
||||
url=f"https://www.youtube.com/watch?v={video_id}",
|
||||
title=title,
|
||||
source="YouTube"
|
||||
)
|
||||
|
||||
post.comments = []
|
||||
comments_data = self.get_video_comments(video_id, comment_limit)
|
||||
for comment_thread in comments_data:
|
||||
comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
|
||||
comment = Comment(
|
||||
id=comment_thread['id'],
|
||||
post_id=video_id,
|
||||
content=comment_snippet['textDisplay'],
|
||||
author=comment_snippet['authorDisplayName'],
|
||||
timestamp=comment_snippet['publishedAt'],
|
||||
reply_to=None,
|
||||
source="YouTube"
|
||||
)
|
||||
post.comments.append(comment)
|
||||
|
||||
posts.append(post)
|
||||
|
||||
return posts
|
||||
Reference in New Issue
Block a user