refactor: move connectors to backend dir

They will now be more used in the backend.
This commit is contained in:
2026-03-09 20:53:13 +00:00
parent 738af5415b
commit ca444e9cb0
3 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,178 @@
import datetime
import requests
import logging
import re
from dto.post import Post
from dto.comment import Comment
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
logger = logging.getLogger(__name__)
HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)"
}
class BoardsAPI:
def __init__(self):
self.url = "https://www.boards.ie"
self.source_name = "Boards.ie"
def get_new_category_posts(self, category: str, post_limit: int, comment_limit: int) -> list[Post]:
urls = []
current_page = 1
logger.info(f"Fetching posts from category: {category}")
while len(urls) < post_limit:
url = f"{self.url}/categories/{category}/p{current_page}"
html = self._fetch_page(url)
soup = BeautifulSoup(html, "html.parser")
logger.debug(f"Processing page {current_page} for category {category}")
for a in soup.select("a.threadbit-threadlink"):
if len(urls) >= post_limit:
break
href = a.get("href")
if href:
urls.append(href)
current_page += 1
logger.debug(f"Fetched {len(urls)} post URLs from category {category}")
# Fetch post details for each URL and create Post objects
posts = []
def fetch_and_parse(post_url):
html = self._fetch_page(post_url)
post = self._parse_thread(html, post_url, comment_limit)
return post
with ThreadPoolExecutor(max_workers=30) as executor:
futures = {executor.submit(fetch_and_parse, url): url for url in urls}
for i, future in enumerate(as_completed(futures)):
post_url = futures[future]
logger.debug(f"Fetching Post {i + 1} / {len(urls)} details from URL: {post_url}")
try:
post = future.result()
posts.append(post)
except Exception as e:
logger.error(f"Error fetching post from {post_url}: {e}")
return posts
def _fetch_page(self, url: str) -> str:
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
return response.text
def _parse_thread(self, html: str, post_url: str, comment_limit: int) -> Post:
soup = BeautifulSoup(html, "html.parser")
# Author
author_tag = soup.select_one(".userinfo-username-title")
author = author_tag.text.strip() if author_tag else None
# Timestamp
timestamp_tag = soup.select_one(".postbit-header")
timestamp = None
if timestamp_tag:
match = re.search(r"\d{2}-\d{2}-\d{4}\s+\d{2}:\d{2}[AP]M", timestamp_tag.get_text())
timestamp = match.group(0) if match else None
# convert to unix epoch
timestamp = datetime.datetime.strptime(timestamp, "%d-%m-%Y %I:%M%p").timestamp() if timestamp else None
# Post ID
post_num = re.search(r"discussion/(\d+)", post_url)
post_num = post_num.group(1) if post_num else None
# Content
content_tag = soup.select_one(".Message.userContent")
content = content_tag.get_text(separator="\n", strip=True) if content_tag else None
# Title
title_tag = soup.select_one(".PageTitle h1")
title = title_tag.text.strip() if title_tag else None
# Comments
comments = self._parse_comments(post_url, post_num, comment_limit)
post = Post(
id=post_num,
author=author,
title=title,
content=content,
url=post_url,
timestamp=timestamp,
source=self.source_name,
comments=comments
)
return post
def _parse_comments(self, url: str, post_id: str, comment_limit: int) -> list[Comment]:
comments = []
current_url = url
while current_url and len(comments) < comment_limit:
html = self._fetch_page(current_url)
page_comments = self._parse_page_comments(html, post_id)
comments.extend(page_comments)
# Check for next page
soup = BeautifulSoup(html, "html.parser")
next_link = soup.find("a", class_="Next")
if next_link and next_link.get('href'):
href = next_link.get('href')
current_url = href if href.startswith('http') else self.url + href
else:
current_url = None
return comments
def _parse_page_comments(self, html: str, post_id: str) -> list:
comments = []
soup = BeautifulSoup(html, "html.parser")
comment_tags = soup.find_all("li", class_="ItemComment")
for tag in comment_tags:
# COmment ID
comment_id = tag.get("id")
# Author
user_elem = tag.find('span', class_='userinfo-username-title')
username = user_elem.get_text(strip=True) if user_elem else None
# Timestamp
date_elem = tag.find('span', class_='DateCreated')
timestamp = date_elem.get_text(strip=True) if date_elem else None
timestamp = datetime.datetime.strptime(timestamp, "%d-%m-%Y %I:%M%p").timestamp() if timestamp else None
# Content
message_div = tag.find('div', class_='Message userContent')
if message_div.blockquote:
message_div.blockquote.decompose()
content = message_div.get_text(separator="\n", strip=True) if message_div else None
comment = Comment(
id=comment_id,
post_id=post_id,
author=username,
content=content,
timestamp=timestamp,
reply_to=None,
source=self.source_name
)
comments.append(comment)
return comments

View File

@@ -0,0 +1,178 @@
import requests
import logging
import time
from dto.post import Post
from dto.user import User
from dto.comment import Comment
logger = logging.getLogger(__name__)
class RedditAPI:
def __init__(self):
self.url = "https://www.reddit.com/"
self.source_name = "Reddit"
# Public Methods #
def search_new_subreddit_posts(self, search: str, subreddit: str, limit: int) -> list[Post]:
params = {
'q': search,
'limit': limit,
'restrict_sr': 'on',
'sort': 'new'
}
logger.info(f"Searching subreddit '{subreddit}' for '{search}' with limit {limit}")
url = f"r/{subreddit}/search.json"
posts = []
while len(posts) < limit:
batch_limit = min(100, limit - len(posts))
params['limit'] = batch_limit
data = self._fetch_post_overviews(url, params)
batch_posts = self._parse_posts(data)
logger.debug(f"Fetched {len(batch_posts)} posts from search in subreddit {subreddit}")
if not batch_posts:
break
posts.extend(batch_posts)
return posts
def get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]:
posts = []
after = None
url = f"r/{subreddit}/new.json"
logger.info(f"Fetching new posts from subreddit: {subreddit}")
while len(posts) < limit:
batch_limit = min(100, limit - len(posts))
params = {
'limit': batch_limit,
'after': after
}
data = self._fetch_post_overviews(url, params)
batch_posts = self._parse_posts(data)
logger.debug(f"Fetched {len(batch_posts)} new posts from subreddit {subreddit}")
if not batch_posts:
break
posts.extend(batch_posts)
after = data['data'].get('after')
if not after:
break
return posts
def get_user(self, username: str) -> User:
data = self._fetch_post_overviews(f"user/{username}/about.json", {})
return self._parse_user(data)
## Private Methods ##
def _parse_posts(self, data) -> list[Post]:
posts = []
total_num_posts = len(data['data']['children'])
current_index = 0
for item in data['data']['children']:
current_index += 1
logger.debug(f"Parsing post {current_index} of {total_num_posts}")
post_data = item['data']
post = Post(
id=post_data['id'],
author=post_data['author'],
title=post_data['title'],
content=post_data.get('selftext', ''),
url=post_data['url'],
timestamp=post_data['created_utc'],
source=self.source_name,
comments=self._get_post_comments(post_data['id']))
post.subreddit = post_data['subreddit']
post.upvotes = post_data['ups']
posts.append(post)
return posts
def _get_post_comments(self, post_id: str) -> list[Comment]:
comments: list[Comment] = []
url = f"comments/{post_id}.json"
data = self._fetch_post_overviews(url, {})
if len(data) < 2:
return comments
comment_data = data[1]['data']['children']
def _parse_comment_tree(items, parent_id=None):
for item in items:
if item['kind'] != 't1':
continue
comment_info = item['data']
comment = Comment(
id=comment_info['id'],
post_id=post_id,
author=comment_info['author'],
content=comment_info.get('body', ''),
timestamp=comment_info['created_utc'],
reply_to=parent_id or comment_info.get('parent_id', None),
source=self.source_name
)
comments.append(comment)
# Process replies recursively
replies = comment_info.get('replies')
if replies and isinstance(replies, dict):
reply_items = replies.get('data', {}).get('children', [])
_parse_comment_tree(reply_items, parent_id=comment.id)
_parse_comment_tree(comment_data)
return comments
def _parse_user(self, data) -> User:
user_data = data['data']
user = User(
username=user_data['name'],
created_utc=user_data['created_utc'])
user.karma = user_data['total_karma']
return user
def _fetch_post_overviews(self, endpoint: str, params: dict) -> dict:
url = f"{self.url}{endpoint}"
max_retries = 15
backoff = 1 # seconds
for attempt in range(max_retries):
try:
response = requests.get(url, headers={'User-agent': 'python:ethnography-college-project:0.1 (by /u/ThisBirchWood)'}, params=params)
if response.status_code == 429:
wait_time = response.headers.get("Retry-After", backoff)
logger.warning(f"Rate limited by Reddit API. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
backoff *= 2
continue
if response.status_code == 500:
logger.warning("Server error from Reddit API. Retrying...")
time.sleep(backoff)
backoff *= 2
continue
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error fetching data from Reddit API: {e}")
return {}

View File

@@ -0,0 +1,84 @@
import os
import datetime
from dotenv import load_dotenv
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from dto.post import Post
from dto.comment import Comment
load_dotenv()
API_KEY = os.getenv("YOUTUBE_API_KEY")
class YouTubeAPI:
def __init__(self):
self.youtube = build('youtube', 'v3', developerKey=API_KEY)
def search_videos(self, query, limit):
request = self.youtube.search().list(
q=query,
part='snippet',
type='video',
maxResults=limit
)
response = request.execute()
return response.get('items', [])
def get_video_comments(self, video_id, limit):
request = self.youtube.commentThreads().list(
part='snippet',
videoId=video_id,
maxResults=limit,
textFormat='plainText'
)
try:
response = request.execute()
except HttpError as e:
print(f"Error fetching comments for video {video_id}: {e}")
return []
return response.get('items', [])
def fetch_videos(self, query, video_limit, comment_limit) -> list[Post]:
videos = self.search_videos(query, video_limit)
posts = []
for video in videos:
video_id = video['id']['videoId']
snippet = video['snippet']
title = snippet['title']
description = snippet['description']
published_at = datetime.datetime.strptime(snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
channel_title = snippet['channelTitle']
comments = []
comments_data = self.get_video_comments(video_id, comment_limit)
for comment_thread in comments_data:
comment_snippet = comment_thread['snippet']['topLevelComment']['snippet']
comment = Comment(
id=comment_thread['id'],
post_id=video_id,
content=comment_snippet['textDisplay'],
author=comment_snippet['authorDisplayName'],
timestamp=datetime.datetime.strptime(comment_snippet['publishedAt'], "%Y-%m-%dT%H:%M:%SZ").timestamp(),
reply_to=None,
source="YouTube"
)
comments.append(comment)
post = Post(
id=video_id,
content=f"{title}\n\n{description}",
author=channel_title,
timestamp=published_at,
url=f"https://www.youtube.com/watch?v={video_id}",
title=title,
source="YouTube",
comments=comments
)
posts.append(post)
return posts