Compare commits

..

2 Commits

Author SHA1 Message Date
dc330b87b9 fix(celery): process dataset directly in fetch task
Calling the original `process_dataset` function led to issues with JSON serialisation.
2026-03-10 22:17:00 +00:00
7ccc934f71 build: change celery to debug mode 2026-03-10 22:14:45 +00:00
2 changed files with 12 additions and 4 deletions

View File

@@ -43,7 +43,7 @@ services:
- .env - .env
command: > command: >
celery -A server.queue.celery_app.celery worker celery -A server.queue.celery_app.celery worker
--loglevel=info --loglevel=debug
--pool=solo --pool=solo
depends_on: depends_on:
- postgres - postgres

View File

@@ -1,4 +1,5 @@
import pandas as pd import pandas as pd
import json
from server.queue.celery_app import celery from server.queue.celery_app import celery
from server.analysis.enrichment import DatasetEnrichment from server.analysis.enrichment import DatasetEnrichment
@@ -37,13 +38,20 @@ def fetch_and_process_dataset(self,
try: try:
for source_name, source_limit in per_source.items(): for source_name, source_limit in per_source.items():
connector = connectors[source_name]() connector = connectors[source_name]()
posts.extend(connector.get_new_posts_by_search( raw_posts = connector.get_new_posts_by_search(
search=search, search=search,
category=category, category=category,
post_limit=source_limit, post_limit=source_limit,
comment_limit=source_limit comment_limit=source_limit
)) )
posts.extend(post.to_dict() for post in raw_posts)
process_dataset.delay(dataset_id, [p.to_dict() for p in posts], topics) df = pd.DataFrame(posts)
processor = DatasetEnrichment(df, topics)
enriched_df = processor.enrich()
dataset_manager.save_dataset_content(dataset_id, enriched_df)
dataset_manager.set_dataset_status(dataset_id, "complete", "NLP Processing Completed Successfully")
except Exception as e: except Exception as e:
dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}") dataset_manager.set_dataset_status(dataset_id, "error", f"An error occurred: {e}")