refactor(dataset creation): update API methods to return only posts

This commit is contained in:
2026-02-09 21:20:08 +00:00
parent 645d2fdfdb
commit ec91904481
6 changed files with 87 additions and 65 deletions

View File

@@ -14,7 +14,7 @@ class RedditAPI:
self.source_name = "Reddit"
# Public Methods #
def search_new_subreddit_posts(self, search: str, subreddit: str, limit: int = 10) -> tuple[list[Post], list[Comment]]:
def search_new_subreddit_posts(self, search: str, subreddit: str, limit: int) -> list[Post]:
params = {
'q': search,
'limit': limit,
@@ -25,27 +25,25 @@ class RedditAPI:
logger.info(f"Searching subreddit '{subreddit}' for '{search}' with limit {limit}")
url = f"r/{subreddit}/search.json"
posts = []
comments = []
while len(posts) < limit:
batch_limit = min(100, limit - len(posts))
params['limit'] = batch_limit
data = self._fetch_data(url, params)
batch_posts, batch_comments = self._parse_posts(data)
data = self._fetch_post_overviews(url, params)
batch_posts = self._parse_posts(data)
logger.debug(f"Fetched {len(batch_posts)} posts and {len(batch_comments)} comments from search in subreddit {subreddit}")
logger.debug(f"Fetched {len(batch_posts)} posts from search in subreddit {subreddit}")
if not batch_posts:
break
posts.extend(batch_posts)
comments.extend(batch_comments)
return posts, comments
return posts
def get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> tuple[list[Post], list[Comment]]:
def get_new_subreddit_posts(self, subreddit: str, limit: int = 10) -> list[Post]:
posts = []
comments = []
after = None
url = f"r/{subreddit}/new.json"
@@ -58,30 +56,28 @@ class RedditAPI:
'after': after
}
data = self._fetch_data(url, params)
batch_posts, batch_comments = self._parse_posts(data)
data = self._fetch_post_overviews(url, params)
batch_posts = self._parse_posts(data)
logger.debug(f"Fetched {len(batch_posts)} new posts and {len(batch_comments)} comments from subreddit {subreddit}")
logger.debug(f"Fetched {len(batch_posts)} new posts from subreddit {subreddit}")
if not batch_posts:
break
posts.extend(batch_posts)
comments.extend(batch_comments)
after = data['data'].get('after')
if not after:
break
return posts, comments
return posts
def get_user(self, username: str) -> User:
data = self._fetch_data(f"user/{username}/about.json", {})
data = self._fetch_post_overviews(f"user/{username}/about.json", {})
return self._parse_user(data)
## Private Methods ##
def _parse_posts(self, data) -> tuple[list[Post], list[Comment]]:
def _parse_posts(self, data) -> list[Post]:
posts = []
comments = []
total_num_posts = len(data['data']['children'])
current_index = 0
@@ -98,19 +94,19 @@ class RedditAPI:
content=post_data.get('selftext', ''),
url=post_data['url'],
timestamp=post_data['created_utc'],
source=self.source_name)
source=self.source_name,
comments=self._get_post_comments(post_data['id']))
post.subreddit = post_data['subreddit']
post.upvotes = post_data['ups']
posts.append(post)
comments.extend(self._get_post_comments(post.id))
return posts, comments
return posts
def _get_post_comments(self, post_id: str) -> list[Comment]:
comments: list[Comment] = []
url = f"comments/{post_id}.json"
data = self._fetch_data(url, {})
data = self._fetch_post_overviews(url, {})
if len(data) < 2:
return comments
@@ -151,7 +147,7 @@ class RedditAPI:
user.karma = user_data['total_karma']
return user
def _fetch_data(self, endpoint: str, params: dict) -> dict:
def _fetch_post_overviews(self, endpoint: str, params: dict) -> dict:
url = f"{self.url}{endpoint}"
max_retries = 15
backoff = 1 # seconds