import requests import logging import time from dto.post import Post from dto.user import User from dto.comment import Comment logger = logging.getLogger(__name__) class RedditAPI: def __init__(self): self.url = "https://www.reddit.com/" self.source_name = "Reddit" # Public Methods # def search_new_subreddit_posts(self, search: str, subreddit: str, limit: int = 10) -> tuple[list[Post], list[Comment]]: params = { 'q': search, 'limit': limit, 'restrict_sr': 'on', 'sort': 'new' } logger.info(f"Searching subreddit '{subreddit}' for '{search}' with limit {limit}") url = f"r/{subreddit}/search.json" posts = [] comments = [] while len(posts) < limit: batch_limit = min(100, limit - len(posts)) params['limit'] = batch_limit data = self._fetch_data(url, params) batch_posts, batch_comments = self._parse_posts(data) logger.debug(f"Fetched {len(batch_posts)} posts and {len(batch_comments)} comments from search in subreddit {subreddit}") if not batch_posts: break posts.extend(batch_posts) comments.extend(batch_comments) return posts, comments def get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> tuple[list[Post], list[Comment]]: posts = [] comments = [] after = None url = f"r/{subreddit}/new.json" logger.info(f"Fetching new posts from subreddit: {subreddit}") while len(posts) < limit: batch_limit = min(100, limit - len(posts)) params = { 'limit': batch_limit, 'after': after } data = self._fetch_data(url, params) batch_posts, batch_comments = self._parse_posts(data) logger.debug(f"Fetched {len(batch_posts)} new posts and {len(batch_comments)} comments from subreddit {subreddit}") if not batch_posts: break posts.extend(batch_posts) comments.extend(batch_comments) after = data['data'].get('after') if not after: break return posts, comments def get_user(self, username: str) -> User: data = self._fetch_data(f"user/{username}/about.json", {}) return self._parse_user(data) ## Private Methods ## def _parse_posts(self, data) -> tuple[list[Post], list[Comment]]: posts = [] comments = [] total_num_posts = len(data['data']['children']) current_index = 0 for item in data['data']['children']: current_index += 1 logger.debug(f"Parsing post {current_index} of {total_num_posts}") post_data = item['data'] post = Post( id=post_data['id'], author=post_data['author'], title=post_data['title'], content=post_data.get('selftext', ''), url=post_data['url'], timestamp=post_data['created_utc'], source=self.source_name) post.subreddit = post_data['subreddit'] post.upvotes = post_data['ups'] posts.append(post) comments.extend(self._get_post_comments(post.id)) return posts, comments def _get_post_comments(self, post_id: str) -> list[Comment]: comments: list[Comment] = [] url = f"comments/{post_id}.json" data = self._fetch_data(url, {}) if len(data) < 2: return comments comment_data = data[1]['data']['children'] def _parse_comment_tree(items, parent_id=None): for item in items: if item['kind'] != 't1': continue comment_info = item['data'] comment = Comment( id=comment_info['id'], post_id=post_id, author=comment_info['author'], content=comment_info.get('body', ''), timestamp=comment_info['created_utc'], reply_to=parent_id or comment_info.get('parent_id', None), source=self.source_name ) comments.append(comment) # Process replies recursively replies = comment_info.get('replies') if replies and isinstance(replies, dict): reply_items = replies.get('data', {}).get('children', []) _parse_comment_tree(reply_items, parent_id=comment.id) _parse_comment_tree(comment_data) return comments def _parse_user(self, data) -> User: user_data = data['data'] user = User( username=user_data['name'], created_utc=user_data['created_utc']) user.karma = user_data['total_karma'] return user def _fetch_data(self, endpoint: str, params: dict) -> dict: url = f"{self.url}{endpoint}" max_retries = 15 backoff = 1 # seconds for attempt in range(max_retries): try: response = requests.get(url, headers={'User-agent': 'python:ethnography-college-project:0.1 (by /u/ThisBirchWood)'}, params=params) if response.status_code == 429: wait_time = response.headers.get("Retry-After", backoff) logger.warning(f"Rate limited by Reddit API. Retrying in {wait_time} seconds...") time.sleep(wait_time) backoff *= 2 continue if response.status_code == 500: logger.warning("Server error from Reddit API. Retrying...") time.sleep(backoff) backoff *= 2 continue response.raise_for_status() return response.json() except requests.RequestException as e: print(f"Error fetching data from Reddit API: {e}") return {}