From 1482e960512bbb028cba217a8c2f72db6153a332 Mon Sep 17 00:00:00 2001 From: Dylan De Faoite Date: Wed, 1 Apr 2026 09:06:07 +0100 Subject: [PATCH] feat(datasets): implement deduplication of dataset records in get_dataset_content --- frontend/src/pages/Stats.tsx | 111 +++++++++++++++++++++++------------ server/analysis/stat_gen.py | 78 +++++++++++++----------- server/core/datasets.py | 39 +++++++++++- 3 files changed, 156 insertions(+), 72 deletions(-) diff --git a/frontend/src/pages/Stats.tsx b/frontend/src/pages/Stats.tsx index 0d4c10e..0651a30 100644 --- a/frontend/src/pages/Stats.tsx +++ b/frontend/src/pages/Stats.tsx @@ -66,45 +66,88 @@ const EMPTY_EXPLORER_STATE: ExplorerState = { error: "", }; -const getExplorerRecordIdentity = (record: DatasetRecord) => - JSON.stringify({ - post_id: record.post_id ?? null, - parent_id: record.parent_id ?? null, - reply_to: record.reply_to ?? null, - author: record.author ?? null, - type: record.type ?? null, - timestamp: record.timestamp ?? null, - dt: record.dt ?? null, - title: record.title ?? null, - content: record.content ?? null, - source: record.source ?? null, - topic: record.topic ?? null, - }); +const parseJsonLikePayload = (value: string): unknown => { + const normalized = value + .replace(/\uFEFF/g, "") + .replace(/,\s*([}\]])/g, "$1") + .replace(/(:\s*)(NaN|Infinity|-Infinity)\b/g, "$1null") + .replace(/(\[\s*)(NaN|Infinity|-Infinity)\b/g, "$1null") + .replace(/(,\s*)(NaN|Infinity|-Infinity)\b/g, "$1null") + .replace(/(:\s*)None\b/g, "$1null") + .replace(/(:\s*)True\b/g, "$1true") + .replace(/(:\s*)False\b/g, "$1false") + .replace(/(\[\s*)None\b/g, "$1null") + .replace(/(\[\s*)True\b/g, "$1true") + .replace(/(\[\s*)False\b/g, "$1false") + .replace(/(,\s*)None\b/g, "$1null") + .replace(/(,\s*)True\b/g, "$1true") + .replace(/(,\s*)False\b/g, "$1false"); -const dedupeExplorerRecords = (records: DatasetRecord[]) => { - const uniqueRecords: DatasetRecord[] = []; - const seen = new Set(); + return JSON.parse(normalized); +}; - for (const record of records) { - const identity = getExplorerRecordIdentity(record); - if (seen.has(identity)) { - continue; - } - - seen.add(identity); - uniqueRecords.push(record); +const parseRecordStringPayload = (payload: string): DatasetRecord[] | null => { + const trimmed = payload.trim(); + if (!trimmed) { + return []; } - return uniqueRecords; + try { + return normalizeRecordPayload(parseJsonLikePayload(trimmed)); + } catch { + // Continue with additional fallback formats below. + } + + const ndjsonLines = trimmed + .split(/\r?\n/) + .map((line) => line.trim()) + .filter(Boolean); + if (ndjsonLines.length > 0) { + try { + return ndjsonLines.map((line) => parseJsonLikePayload(line)) as DatasetRecord[]; + } catch { + // Continue with wrapped JSON extraction. + } + } + + const bracketStart = trimmed.indexOf("["); + const bracketEnd = trimmed.lastIndexOf("]"); + if (bracketStart !== -1 && bracketEnd > bracketStart) { + const candidate = trimmed.slice(bracketStart, bracketEnd + 1); + try { + return normalizeRecordPayload(parseJsonLikePayload(candidate)); + } catch { + // Continue with object extraction. + } + } + + const braceStart = trimmed.indexOf("{"); + const braceEnd = trimmed.lastIndexOf("}"); + if (braceStart !== -1 && braceEnd > braceStart) { + const candidate = trimmed.slice(braceStart, braceEnd + 1); + try { + return normalizeRecordPayload(parseJsonLikePayload(candidate)); + } catch { + return null; + } + } + + return null; }; const normalizeRecordPayload = (payload: unknown): DatasetRecord[] => { if (typeof payload === "string") { - try { - return normalizeRecordPayload(JSON.parse(payload)); - } catch { - throw new Error("Corpus endpoint returned a non-JSON string payload."); + const parsed = parseRecordStringPayload(payload); + if (parsed) { + return parsed; } + + const preview = payload.trim().slice(0, 120).replace(/\s+/g, " "); + throw new Error( + `Corpus endpoint returned a non-JSON string payload.${ + preview ? ` Response preview: ${preview}` : "" + }`, + ); } if ( @@ -265,9 +308,7 @@ const StatPage = () => { }, ); - const normalizedRecords = dedupeExplorerRecords( - normalizeRecordPayload(response.data), - ); + const normalizedRecords = normalizeRecordPayload(response.data); setAllRecords(normalizedRecords); setAllRecordsKey(filterKey); @@ -288,9 +329,7 @@ const StatPage = () => { try { const records = await ensureFilteredRecords(); const context = buildExplorerContext(records); - const matched = dedupeExplorerRecords( - records.filter((record) => spec.matcher(record, context)), - ); + const matched = records.filter((record) => spec.matcher(record, context)); matched.sort((a, b) => { const aValue = String(a.dt ?? a.date ?? a.timestamp ?? ""); const bValue = String(b.dt ?? b.date ?? b.timestamp ?? ""); diff --git a/server/analysis/stat_gen.py b/server/analysis/stat_gen.py index 8eecb7f..7371ffe 100644 --- a/server/analysis/stat_gen.py +++ b/server/analysis/stat_gen.py @@ -89,39 +89,17 @@ class StatGen: df.to_json(orient="records", date_format="iso", date_unit="s") ) - def _dedupe_records(self, records: list[dict]) -> list[dict]: - unique_records = [] - seen = set() - - for record in records: - key_data = { - "post_id": record.get("post_id"), - "parent_id": record.get("parent_id"), - "reply_to": record.get("reply_to"), - "author": record.get("author"), - "type": record.get("type"), - "timestamp": record.get("timestamp"), - "dt": record.get("dt"), - "title": record.get("title"), - "content": record.get("content"), - "source": record.get("source"), - "topic": record.get("topic"), - } - key = json.dumps(key_data, sort_keys=True, separators=(",", ":")) - if key in seen: - continue - - seen.add(key) - unique_records.append(record) - - return unique_records - ## Public Methods def filter_dataset(self, df: pd.DataFrame, filters: dict | None = None) -> list[dict]: filtered_df = self._prepare_filtered_df(df, filters) - return self._dedupe_records(self._json_ready_records(filtered_df)) + return self._json_ready_records(filtered_df) - def temporal(self, df: pd.DataFrame, filters: dict | None = None) -> dict: + def temporal( + self, + df: pd.DataFrame, + filters: dict | None = None, + dataset_id: int | None = None, + ) -> dict: filtered_df = self._prepare_filtered_df(df, filters) return { @@ -129,7 +107,12 @@ class StatGen: "weekday_hour_heatmap": self.temporal_analysis.heatmap(filtered_df), } - def linguistic(self, df: pd.DataFrame, filters: dict | None = None) -> dict: + def linguistic( + self, + df: pd.DataFrame, + filters: dict | None = None, + dataset_id: int | None = None, + ) -> dict: filtered_df = self._prepare_filtered_df(df, filters) return { @@ -139,7 +122,12 @@ class StatGen: "lexical_diversity": self.linguistic_analysis.lexical_diversity(filtered_df) } - def emotional(self, df: pd.DataFrame, filters: dict | None = None) -> dict: + def emotional( + self, + df: pd.DataFrame, + filters: dict | None = None, + dataset_id: int | None = None, + ) -> dict: filtered_df = self._prepare_filtered_df(df, filters) return { @@ -149,7 +137,12 @@ class StatGen: "emotion_by_source": self.emotional_analysis.emotion_by_source(filtered_df) } - def user(self, df: pd.DataFrame, filters: dict | None = None) -> dict: + def user( + self, + df: pd.DataFrame, + filters: dict | None = None, + dataset_id: int | None = None, + ) -> dict: filtered_df = self._prepare_filtered_df(df, filters) return { @@ -157,7 +150,12 @@ class StatGen: "users": self.user_analysis.per_user_analysis(filtered_df) } - def interactional(self, df: pd.DataFrame, filters: dict | None = None) -> dict: + def interactional( + self, + df: pd.DataFrame, + filters: dict | None = None, + dataset_id: int | None = None, + ) -> dict: filtered_df = self._prepare_filtered_df(df, filters) return { @@ -166,7 +164,12 @@ class StatGen: "conversation_concentration": self.interaction_analysis.conversation_concentration(filtered_df) } - def cultural(self, df: pd.DataFrame, filters: dict | None = None) -> dict: + def cultural( + self, + df: pd.DataFrame, + filters: dict | None = None, + dataset_id: int | None = None, + ) -> dict: filtered_df = self._prepare_filtered_df(df, filters) return { @@ -175,7 +178,12 @@ class StatGen: "avg_emotion_per_entity": self.cultural_analysis.get_avg_emotions_per_entity(filtered_df) } - def summary(self, df: pd.DataFrame, filters: dict | None = None) -> dict: + def summary( + self, + df: pd.DataFrame, + filters: dict | None = None, + dataset_id: int | None = None, + ) -> dict: filtered_df = self._prepare_filtered_df(df, filters) return self.summary_analysis.summary(filtered_df) diff --git a/server/core/datasets.py b/server/core/datasets.py index c2f2214..6e85eac 100644 --- a/server/core/datasets.py +++ b/server/core/datasets.py @@ -26,7 +26,34 @@ class DatasetManager: def get_dataset_content(self, dataset_id: int) -> pd.DataFrame: query = "SELECT * FROM events WHERE dataset_id = %s" result = self.db.execute(query, (dataset_id,), fetch=True) - return pd.DataFrame(result) + df = pd.DataFrame(result) + if df.empty: + return df + + dedupe_columns = [ + column + for column in [ + "post_id", + "parent_id", + "reply_to", + "author", + "type", + "timestamp", + "dt", + "title", + "content", + "source", + "topic", + ] + if column in df.columns + ] + + if dedupe_columns: + df = df.drop_duplicates(subset=dedupe_columns, keep="first") + else: + df = df.drop_duplicates(keep="first") + + return df.reset_index(drop=True) def get_dataset_info(self, dataset_id: int) -> dict: query = "SELECT * FROM datasets WHERE id = %s" @@ -52,6 +79,16 @@ class DatasetManager: if event_data.empty: return + dedupe_columns = [ + column for column in ["id", "type", "source"] if column in event_data.columns + ] + if dedupe_columns: + event_data = event_data.drop_duplicates(subset=dedupe_columns, keep="first") + else: + event_data = event_data.drop_duplicates(keep="first") + + self.delete_dataset_content(dataset_id) + query = """ INSERT INTO events ( dataset_id,