77 lines
2.4 KiB
Python
77 lines
2.4 KiB
Python
import pandas as pd
|
|
import logging
|
|
|
|
from server.queue.celery_app import celery
|
|
from server.analysis.enrichment import DatasetEnrichment
|
|
from server.db.database import PostgresConnector
|
|
from server.core.datasets import DatasetManager
|
|
from server.connectors.registry import get_available_connectors
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery.task(bind=True, max_retries=3)
|
|
def process_dataset(self, dataset_id: int, posts: list, topics: dict):
|
|
db = PostgresConnector()
|
|
dataset_manager = DatasetManager(db)
|
|
|
|
try:
|
|
df = pd.DataFrame(posts)
|
|
|
|
dataset_manager.set_dataset_status(
|
|
dataset_id, "processing", "NLP Processing Started"
|
|
)
|
|
|
|
processor = DatasetEnrichment(df, topics)
|
|
enriched_df = processor.enrich()
|
|
|
|
dataset_manager.save_dataset_content(dataset_id, enriched_df)
|
|
dataset_manager.set_dataset_status(
|
|
dataset_id, "complete", "NLP Processing Completed Successfully"
|
|
)
|
|
except Exception as e:
|
|
dataset_manager.set_dataset_status(
|
|
dataset_id, "error", f"An error occurred: {e}"
|
|
)
|
|
|
|
|
|
@celery.task(bind=True, max_retries=3)
|
|
def fetch_and_process_dataset(
|
|
self, dataset_id: int, source_info: list[dict], topics: dict
|
|
):
|
|
connectors = get_available_connectors()
|
|
db = PostgresConnector()
|
|
dataset_manager = DatasetManager(db)
|
|
posts = []
|
|
|
|
try:
|
|
for metadata in source_info:
|
|
name = metadata["name"]
|
|
search = metadata.get("search")
|
|
category = metadata.get("category")
|
|
limit = metadata.get("limit", 100)
|
|
|
|
connector = connectors[name]()
|
|
raw_posts = connector.get_new_posts_by_search(
|
|
search=search, category=category, post_limit=limit
|
|
)
|
|
posts.extend(post.to_dict() for post in raw_posts)
|
|
|
|
df = pd.DataFrame(posts)
|
|
|
|
dataset_manager.set_dataset_status(
|
|
dataset_id, "processing", "NLP Processing Started"
|
|
)
|
|
|
|
processor = DatasetEnrichment(df, topics)
|
|
enriched_df = processor.enrich()
|
|
|
|
dataset_manager.save_dataset_content(dataset_id, enriched_df)
|
|
dataset_manager.set_dataset_status(
|
|
dataset_id, "complete", "NLP Processing Completed Successfully"
|
|
)
|
|
except Exception as e:
|
|
dataset_manager.set_dataset_status(
|
|
dataset_id, "error", f"An error occurred: {e}"
|
|
)
|