Compare commits

..

4 Commits

6 changed files with 305 additions and 112 deletions

View File

@@ -71,6 +71,25 @@ type NGram = {
type AverageEmotionByTopic = Emotion & {
n: number;
topic: string;
[key: string]: string | number;
};
type OverallEmotionAverage = {
emotion: string;
score: number;
};
type DominantEmotionDistribution = {
emotion: string;
count: number;
ratio: number;
};
type EmotionBySource = {
source: string;
dominant_emotion: string;
dominant_score: number;
event_count: number;
};
@@ -79,6 +98,9 @@ type ContentAnalysisResponse = {
average_emotion_by_topic: AverageEmotionByTopic[];
common_three_phrases: NGram[];
common_two_phrases: NGram[];
overall_emotion_average?: OverallEmotionAverage[];
dominant_emotion_distribution?: DominantEmotionDistribution[];
emotion_by_source?: EmotionBySource[];
}
// Summary
@@ -110,6 +132,9 @@ export type {
UserAnalysisResponse,
FrequencyWord,
AverageEmotionByTopic,
OverallEmotionAverage,
DominantEmotionDistribution,
EmotionBySource,
SummaryResponse,
TimeAnalysisResponse,
ContentAnalysisResponse,

View File

@@ -1,33 +1,86 @@
import pandas as pd
class EmotionalAnalysis:
def avg_emotion_by_topic(self, df: pd.DataFrame) -> dict:
emotion_cols = [
col for col in df.columns
if col.startswith("emotion_")
]
def _emotion_cols(self, df: pd.DataFrame) -> list[str]:
return [col for col in df.columns if col.startswith("emotion_")]
def avg_emotion_by_topic(self, df: pd.DataFrame) -> list[dict]:
emotion_cols = self._emotion_cols(df)
if not emotion_cols:
return []
counts = (
df[
(df["topic"] != "Misc")
]
.groupby("topic")
.size()
.rename("n")
df[(df["topic"] != "Misc")].groupby("topic").size().reset_index(name="n")
)
avg_emotion_by_topic = (
df[
(df["topic"] != "Misc")
]
df[(df["topic"] != "Misc")]
.groupby("topic")[emotion_cols]
.mean()
.reset_index()
)
avg_emotion_by_topic = avg_emotion_by_topic.merge(
counts,
on="topic"
avg_emotion_by_topic = avg_emotion_by_topic.merge(counts, on="topic")
return avg_emotion_by_topic.to_dict(orient="records")
def overall_emotion_average(self, df: pd.DataFrame) -> list[dict]:
emotion_cols = self._emotion_cols(df)
if not emotion_cols:
return []
means = df[emotion_cols].mean()
return [
{
"emotion": col.replace("emotion_", ""),
"score": float(means[col]),
}
for col in emotion_cols
]
def dominant_emotion_distribution(self, df: pd.DataFrame) -> list[dict]:
emotion_cols = self._emotion_cols(df)
if not emotion_cols or df.empty:
return []
dominant_per_row = df[emotion_cols].idxmax(axis=1)
counts = dominant_per_row.value_counts()
total = max(len(dominant_per_row), 1)
return [
{
"emotion": col.replace("emotion_", ""),
"count": int(count),
"ratio": round(float(count / total), 4),
}
for col, count in counts.items()
]
def emotion_by_source(self, df: pd.DataFrame) -> list[dict]:
emotion_cols = self._emotion_cols(df)
if not emotion_cols or "source" not in df.columns or df.empty:
return []
source_counts = df.groupby("source").size()
source_means = df.groupby("source")[emotion_cols].mean().reset_index()
rows = source_means.to_dict(orient="records")
output = []
for row in rows:
source = row["source"]
dominant_col = max(emotion_cols, key=lambda col: float(row.get(col, 0)))
output.append(
{
"source": str(source),
"dominant_emotion": dominant_col.replace("emotion_", ""),
"dominant_score": round(float(row.get(dominant_col, 0)), 4),
"event_count": int(source_counts.get(source, 0)),
}
)
return avg_emotion_by_topic.to_dict(orient='records')
return output

View File

@@ -6,7 +6,9 @@ from server.analysis.cultural import CulturalAnalysis
from server.analysis.emotional import EmotionalAnalysis
from server.analysis.interactional import InteractionAnalysis
from server.analysis.linguistic import LinguisticAnalysis
from server.analysis.summary import SummaryAnalysis
from server.analysis.temporal import TemporalAnalysis
from server.analysis.user import UserAnalysis
DOMAIN_STOPWORDS = {
"www",
@@ -36,12 +38,11 @@ class StatGen:
self.interaction_analysis = InteractionAnalysis(EXCLUDE_WORDS)
self.linguistic_analysis = LinguisticAnalysis(EXCLUDE_WORDS)
self.cultural_analysis = CulturalAnalysis()
self.summary_analysis = SummaryAnalysis()
self.user_analysis = UserAnalysis(self.interaction_analysis)
## Private Methods
def _prepare_filtered_df(self,
df: pd.DataFrame,
filters: dict | None = None
) -> pd.DataFrame:
def _prepare_filtered_df(self, df: pd.DataFrame, filters: dict | None = None) -> pd.DataFrame:
filters = filters or {}
filtered_df = df.copy()
@@ -51,10 +52,9 @@ class StatGen:
data_source_filter = filters.get("data_sources", None)
if search_query:
mask = (
filtered_df["content"].str.contains(search_query, case=False, na=False)
| filtered_df["author"].str.contains(search_query, case=False, na=False)
)
mask = filtered_df["content"].str.contains(
search_query, case=False, na=False
) | filtered_df["author"].str.contains(search_query, case=False, na=False)
# Only include title if the column exists
if "title" in filtered_df.columns:
@@ -76,10 +76,10 @@ class StatGen:
return filtered_df
## Public Methods
def filter_dataset(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
def filter_dataset(self, df: pd.DataFrame, filters: dict | None = None) -> list[dict]:
return self._prepare_filtered_df(df, filters).to_dict(orient="records")
def get_time_analysis(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
def temporal(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
filtered_df = self._prepare_filtered_df(df, filters)
return {
@@ -87,40 +87,43 @@ class StatGen:
"weekday_hour_heatmap": self.temporal_analysis.heatmap(filtered_df),
}
def get_content_analysis(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
def linguistic(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
filtered_df = self._prepare_filtered_df(df, filters)
return {
"word_frequencies": self.linguistic_analysis.word_frequencies(filtered_df),
"common_two_phrases": self.linguistic_analysis.ngrams(filtered_df),
"common_three_phrases": self.linguistic_analysis.ngrams(filtered_df, n=3),
"average_emotion_by_topic": self.emotional_analysis.avg_emotion_by_topic(
filtered_df
)
}
def get_user_analysis(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
def emotional(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
filtered_df = self._prepare_filtered_df(df, filters)
return {
"top_users": self.interaction_analysis.top_users(filtered_df),
"users": self.interaction_analysis.per_user_analysis(filtered_df),
"average_emotion_by_topic": self.emotional_analysis.avg_emotion_by_topic(filtered_df),
"overall_emotion_average": self.emotional_analysis.overall_emotion_average(filtered_df),
"dominant_emotion_distribution": self.emotional_analysis.dominant_emotion_distribution(filtered_df),
"emotion_by_source": self.emotional_analysis.emotion_by_source(filtered_df)
}
def user(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
filtered_df = self._prepare_filtered_df(df, filters)
return {
"top_users": self.user_analysis.top_users(filtered_df),
"users": self.user_analysis.users(filtered_df)
}
def interactional(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
filtered_df = self._prepare_filtered_df(df, filters)
return {
"average_thread_depth": self.interaction_analysis.average_thread_depth(filtered_df),
"average_thread_length_by_emotion": self.interaction_analysis.average_thread_length_by_emotion(filtered_df),
"interaction_graph": self.interaction_analysis.interaction_graph(filtered_df)
}
def get_interactional_analysis(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
filtered_df = self._prepare_filtered_df(df, filters)
return {
"average_thread_depth": self.interaction_analysis.average_thread_depth(
filtered_df
),
"average_thread_length_by_emotion": self.interaction_analysis.average_thread_length_by_emotion(
filtered_df
),
}
def get_cultural_analysis(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
def cultural(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
filtered_df = self._prepare_filtered_df(df, filters)
return {
@@ -136,35 +139,4 @@ class StatGen:
def summary(self, df: pd.DataFrame, filters: dict | None = None) -> dict:
filtered_df = self._prepare_filtered_df(df, filters)
total_posts = (filtered_df["type"] == "post").sum()
total_comments = (filtered_df["type"] == "comment").sum()
events_per_user = filtered_df.groupby("author").size()
if filtered_df.empty:
return {
"total_events": 0,
"total_posts": 0,
"total_comments": 0,
"unique_users": 0,
"comments_per_post": 0,
"lurker_ratio": 0,
"time_range": {
"start": None,
"end": None,
},
"sources": [],
}
return {
"total_events": int(len(filtered_df)),
"total_posts": int(total_posts),
"total_comments": int(total_comments),
"unique_users": int(events_per_user.count()),
"comments_per_post": round(total_comments / max(total_posts, 1), 2),
"lurker_ratio": round((events_per_user == 1).mean(), 2),
"time_range": {
"start": int(filtered_df["dt"].min().timestamp()),
"end": int(filtered_df["dt"].max().timestamp()),
},
"sources": filtered_df["source"].dropna().unique().tolist(),
}
return self.summary_analysis.summary(filtered_df)

View File

@@ -0,0 +1,64 @@
import pandas as pd
class SummaryAnalysis:
def total_events(self, df: pd.DataFrame) -> int:
return int(len(df))
def total_posts(self, df: pd.DataFrame) -> int:
return int(len(df[df["type"] == "post"]))
def total_comments(self, df: pd.DataFrame) -> int:
return int(len(df[df["type"] == "comment"]))
def unique_users(self, df: pd.DataFrame) -> int:
return int(len(df["author"].dropna().unique()))
def comments_per_post(self, total_comments: int, total_posts: int) -> float:
return round(total_comments / max(total_posts, 1), 2)
def lurker_ratio(self, df: pd.DataFrame) -> float:
events_per_user = df.groupby("author").size()
return round((events_per_user == 1).mean(), 2)
def time_range(self, df: pd.DataFrame) -> dict:
return {
"start": int(df["dt"].min().timestamp()),
"end": int(df["dt"].max().timestamp()),
}
def sources(self, df: pd.DataFrame) -> list:
return df["source"].dropna().unique().tolist()
def empty_summary(self) -> dict:
return {
"total_events": 0,
"total_posts": 0,
"total_comments": 0,
"unique_users": 0,
"comments_per_post": 0,
"lurker_ratio": 0,
"time_range": {
"start": None,
"end": None,
},
"sources": [],
}
def summary(self, df: pd.DataFrame) -> dict:
if df.empty:
return self.empty_summary()
total_posts = self.total_posts(df)
total_comments = self.total_comments(df)
return {
"total_events": self.total_events(df),
"total_posts": total_posts,
"total_comments": total_comments,
"unique_users": self.unique_users(df),
"comments_per_post": self.comments_per_post(total_comments, total_posts),
"lurker_ratio": self.lurker_ratio(df),
"time_range": self.time_range(df),
"sources": self.sources(df),
}

20
server/analysis/user.py Normal file
View File

@@ -0,0 +1,20 @@
import pandas as pd
from server.analysis.interactional import InteractionAnalysis
class UserAnalysis:
def __init__(self, interaction_analysis: InteractionAnalysis):
self.interaction_analysis = interaction_analysis
def top_users(self, df: pd.DataFrame) -> list:
return self.interaction_analysis.top_users(df)
def users(self, df: pd.DataFrame) -> dict | list:
return self.interaction_analysis.per_user_analysis(df)
def user(self, df: pd.DataFrame) -> dict:
return {
"top_users": self.top_users(df),
"users": self.users(df),
}

View File

@@ -186,7 +186,7 @@ def scrape_data():
dataset_manager.set_dataset_status(
dataset_id,
"fetching",
f"Data is being fetched from {', '.join(source['name'] for source in source_configs)}"
f"Data is being fetched from {', '.join(source['name'] for source in source_configs)}",
)
fetch_and_process_dataset.delay(
@@ -198,12 +198,14 @@ def scrape_data():
print(traceback.format_exc())
return jsonify({"error": "Failed to queue dataset processing"}), 500
return jsonify({
return jsonify(
{
"message": "Dataset queued for processing",
"dataset_id": dataset_id,
"status": "processing"
}), 202
"status": "processing",
}
), 202
@app.route("/datasets/upload", methods=["POST"])
@jwt_required()
@@ -233,7 +235,9 @@ def upload_data():
posts_df = pd.read_json(post_file, lines=True, convert_dates=False)
topics = json.load(topic_file)
dataset_id = dataset_manager.save_dataset_info(current_user, dataset_name, topics)
dataset_id = dataset_manager.save_dataset_info(
current_user, dataset_name, topics
)
process_dataset.delay(dataset_id, posts_df.to_dict(orient="records"), topics)
@@ -249,6 +253,7 @@ def upload_data():
except Exception as e:
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>", methods=["GET"])
@jwt_required()
def get_dataset(dataset_id):
@@ -256,7 +261,9 @@ def get_dataset(dataset_id):
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_info = dataset_manager.get_dataset_info(dataset_id)
included_cols = {"id", "name", "created_at"}
@@ -270,6 +277,7 @@ def get_dataset(dataset_id):
print(traceback.format_exc())
return jsonify({"error": "An unexpected error occured"}), 500
@app.route("/dataset/<int:dataset_id>", methods=["PATCH"])
@jwt_required()
def update_dataset(dataset_id):
@@ -277,7 +285,9 @@ def update_dataset(dataset_id):
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
body = request.get_json()
new_name = body.get("name")
@@ -286,7 +296,9 @@ def update_dataset(dataset_id):
return jsonify({"error": "A valid name must be provided"}), 400
dataset_manager.update_dataset_name(dataset_id, new_name.strip())
return jsonify({"message": f"Dataset {dataset_id} renamed to '{new_name.strip()}'"}), 200
return jsonify(
{"message": f"Dataset {dataset_id} renamed to '{new_name.strip()}'"}
), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException:
@@ -295,6 +307,7 @@ def update_dataset(dataset_id):
print(traceback.format_exc())
return jsonify({"error": "An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>", methods=["DELETE"])
@jwt_required()
def delete_dataset(dataset_id):
@@ -302,11 +315,17 @@ def delete_dataset(dataset_id):
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_manager.delete_dataset_info(dataset_id)
dataset_manager.delete_dataset_content(dataset_id)
return jsonify({"message": f"Dataset {dataset_id} metadata and content successfully deleted"}), 200
return jsonify(
{
"message": f"Dataset {dataset_id} metadata and content successfully deleted"
}
), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException:
@@ -315,6 +334,7 @@ def delete_dataset(dataset_id):
print(traceback.format_exc())
return jsonify({"error": "An unexpected error occured"}), 500
@app.route("/dataset/<int:dataset_id>/status", methods=["GET"])
@jwt_required()
def get_dataset_status(dataset_id):
@@ -322,7 +342,9 @@ def get_dataset_status(dataset_id):
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_status = dataset_manager.get_dataset_status(dataset_id)
return jsonify(dataset_status), 200
@@ -334,17 +356,44 @@ def get_dataset_status(dataset_id):
print(traceback.format_exc())
return jsonify({"error": "An unexpected error occured"}), 500
@app.route("/dataset/<int:dataset_id>/content", methods=["GET"])
@app.route("/dataset/<int:dataset_id>/linguistic", methods=["GET"])
@jwt_required()
def content_endpoint(dataset_id):
def get_linguistic_analysis(dataset_id):
try:
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_content_analysis(dataset_content, filters)), 200
return jsonify(stat_gen.linguistic(dataset_content, filters)), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException:
return jsonify({"error": "Dataset does not exist"}), 404
except ValueError as e:
return jsonify({"error": f"Malformed or missing data"}), 400
except Exception as e:
print(traceback.format_exc())
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/emotional", methods=["GET"])
@jwt_required()
def get_emotional_analysis(dataset_id):
try:
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.emotional(dataset_content, filters)), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException:
@@ -362,7 +411,9 @@ def get_summary(dataset_id):
try:
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
@@ -378,17 +429,19 @@ def get_summary(dataset_id):
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/time", methods=["GET"])
@app.route("/dataset/<int:dataset_id>/temporal", methods=["GET"])
@jwt_required()
def get_time_analysis(dataset_id):
def get_temporal_analysis(dataset_id):
try:
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_time_analysis(dataset_content, filters)), 200
return jsonify(stat_gen.temporal(dataset_content, filters)), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException:
@@ -406,11 +459,13 @@ def get_user_analysis(dataset_id):
try:
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_user_analysis(dataset_content, filters)), 200
return jsonify(stat_gen.user(dataset_content, filters)), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException:
@@ -428,11 +483,13 @@ def get_cultural_analysis(dataset_id):
try:
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_cultural_analysis(dataset_content, filters)), 200
return jsonify(stat_gen.cultural(dataset_content, filters)), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException:
@@ -444,17 +501,19 @@ def get_cultural_analysis(dataset_id):
return jsonify({"error": f"An unexpected error occurred"}), 500
@app.route("/dataset/<int:dataset_id>/interaction", methods=["GET"])
@app.route("/dataset/<int:dataset_id>/interactional", methods=["GET"])
@jwt_required()
def get_interaction_analysis(dataset_id):
try:
user_id = int(get_jwt_identity())
if not dataset_manager.authorize_user_dataset(dataset_id, user_id):
raise NotAuthorisedException("This user is not authorised to access this dataset")
raise NotAuthorisedException(
"This user is not authorised to access this dataset"
)
dataset_content = dataset_manager.get_dataset_content(dataset_id)
filters = get_request_filters()
return jsonify(stat_gen.get_interactional_analysis(dataset_content, filters)), 200
return jsonify(stat_gen.interactional(dataset_content, filters)), 200
except NotAuthorisedException:
return jsonify({"error": "User is not authorised to access this content"}), 403
except NonExistentDatasetException: