feat(connectors): add base connector and registry for detection

Idea is to have a "plugin-type" system, where new connectors can extend the `BaseConnector` class and implement the fetch posts method.

These are automatically detected by the registry, and automatically used in new Flask endpoints that give a list of possible sources.

Allows for an open-ended system where new data scrapers / API consumers can be added dynamically.
This commit is contained in:
2026-03-09 21:28:44 +00:00
parent 262a70dbf3
commit cc799f7368
5 changed files with 73 additions and 4 deletions

View File

@@ -111,6 +111,20 @@ def get_user_datasets():
current_user = int(get_jwt_identity()) current_user = int(get_jwt_identity())
return jsonify(dataset_manager.get_user_datasets(current_user)), 200 return jsonify(dataset_manager.get_user_datasets(current_user)), 200
@app.route("/datasets/sources", methods=["GET"])
@jwt_required()
def get_dataset_sources():
return jsonify({""})
@app.route("/datasets/scrape", methods=["POST"])
@jwt_required()
def scrape_data():
if "sources" not in request.form:
return jsonify({"error": "Data source names are required."}), 400
sources = request.form.get("sources")
@app.route("/datasets/upload", methods=["POST"]) @app.route("/datasets/upload", methods=["POST"])
@jwt_required() @jwt_required()
def upload_data(): def upload_data():

23
server/connectors/base.py Normal file
View File

@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
from dto.post import Post
class BaseConnector(ABC):
# Each subclass declares these at the class level
source_name: str # machine-readable: "reddit", "youtube"
display_name: str # human-readable: "Reddit", "YouTube"
required_env: list[str] = [] # env vars needed to activate
@classmethod
def is_available(cls) -> bool:
"""Returns True if all required env vars are set."""
import os
return all(os.getenv(var) for var in cls.required_env)
@abstractmethod
def get_new_posts_by_search(self,
search: str = None,
category: str = None,
post_limit: int = 10,
comment_limit: int = 10
) -> list[Post]:
...

View File

@@ -7,6 +7,7 @@ from dto.post import Post
from dto.comment import Comment from dto.comment import Comment
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from server.connectors.base import BaseConnector
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -14,12 +15,18 @@ HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)" "User-Agent": "Mozilla/5.0 (compatible; ForumScraper/1.0)"
} }
class BoardsAPI: class BoardsAPI(BaseConnector):
def __init__(self): def __init__(self):
self.url = "https://www.boards.ie" self.url = "https://www.boards.ie"
self.source_name = "Boards.ie" self.source_name = "boards.ie"
self.display_name = "Boards.ie"
def get_new_category_posts(self, category: str, post_limit: int, comment_limit: int) -> list[Post]: def get_new_posts_by_search(self,
search: str,
category: str,
post_limit: int,
comment_limit: int
) -> list[Post]:
urls = [] urls = []
current_page = 1 current_page = 1

View File

@@ -0,0 +1,25 @@
import pkgutil
import importlib
import connectors
from connectors.base import BaseConnector
def _discover_connectors() -> list[type[BaseConnector]]:
"""Walk the connectors package and collect all BaseConnector subclasses."""
for _, module_name, _ in pkgutil.iter_modules(connectors.__path__):
if module_name in ("base", "registry"):
continue
importlib.import_module(f"connectors.{module_name}")
return [
cls for cls in BaseConnector.__subclasses__()
if cls.source_name # guard against abstract intermediaries
]
def get_available_connectors() -> list[type[BaseConnector]]:
return [c for c in _discover_connectors() if c.is_available()]
def get_connector_metadata() -> list[dict]:
return [
{"id": c.source_name, "label": c.display_name}
for c in get_available_connectors()
]

View File

@@ -1,7 +1,7 @@
import pandas as pd import pandas as pd
from server.db.database import PostgresConnector from server.db.database import PostgresConnector
from psycopg2.extras import Json from psycopg2.extras import Json
from server.exceptions import NotAuthorisedException, NonExistentDatasetException from server.exceptions import NonExistentDatasetException
class DatasetManager: class DatasetManager:
def __init__(self, db: PostgresConnector): def __init__(self, db: PostgresConnector):