#!/usr/bin/env python3 """ BibGuard - Citation Hallucination Detector Validates bibliography entries against multiple academic data sources: arXiv, CrossRef, DBLP, Semantic Scholar, OpenAlex, and Google Scholar Usage: python main.py --bib references.bib python main.py --bib references.bib --output report.md """ import argparse import sys from pathlib import Path from datetime import datetime from dataclasses import dataclass, field from typing import List, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import threading import copy from src.parser import BibParser from src.fetcher import ( ArxivFetcher, CrossRefFetcher, DBLPFetcher, SemanticScholarFetcher, OpenAlexFetcher, ScholarFetcher ) from src.comparator import MetadataComparator, EntryReport, resolve_year, CURRENT_YEAR from src.sanitizer import BibSanitizer from src.local_db import LocalConferenceDB from src.ui import BibUI from src.utils import ProgressDisplay, TextNormalizer @dataclass class WorkflowStep: name: str enabled: bool = True display_name: str = "" priority: int = 0 @dataclass class WorkflowConfig: steps: List[WorkflowStep] = field(default_factory=list) def get_enabled_steps(self) -> List[WorkflowStep]: return sorted([s for s in self.steps if s.enabled], key=lambda x: x.priority) def get_default_workflow() -> WorkflowConfig: return WorkflowConfig(steps=[ WorkflowStep("arxiv_id", True, "arXiv by ID", 0), WorkflowStep("crossref_doi", True, "CrossRef by DOI", 1), WorkflowStep("semantic_scholar", True, "Semantic Scholar", 2), WorkflowStep("dblp", True, "DBLP", 3), WorkflowStep("openalex", True, "OpenAlex", 4), WorkflowStep("arxiv_title", True, "arXiv by Title", 5), WorkflowStep("crossref_title", True, "CrossRef by Title", 6), WorkflowStep("google_scholar", False, "Google Scholar", 7), ]) def main(): parser = argparse.ArgumentParser( description="BibGuard: Citation Fixer & Validator", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument("--bib", "-b", required=True, help="Path to .bib file") parser.add_argument("--output", "-o", help="Output report path (optional)") args = parser.parse_args() bib_path = Path(args.bib) if not bib_path.exists(): print(f"Error: Bib file not found: {args.bib}") sys.exit(1) workflow = get_default_workflow() try: run_fix_and_verify(bib_path, workflow) except KeyboardInterrupt: print("\nCancelled") sys.exit(130) def run_fix_and_verify(bib_path: Path, workflow): """Run validation, auto-fix issues, and verify.""" progress = ProgressDisplay() bib_parser = BibParser() ui = BibUI() print(f"๐Ÿ“š BibGuard - Auto-Fix & Verify") print(f" Target: {bib_path}\n") # --- Pass 1: Validate & Fix --- entries = bib_parser.parse_file(str(bib_path)) if not entries: print("No entries found") return print(f"Found {len(entries)} entries. Running validation and auto-fix...\n") # Initialize components fetchers = { 'arxiv': ArxivFetcher(), 'crossref': CrossRefFetcher(), 'scholar': ScholarFetcher(), 'semantic': SemanticScholarFetcher(), 'openalex': OpenAlexFetcher(), 'dblp': DBLPFetcher(), } comparator = MetadataComparator() sanitizer = BibSanitizer() fixed_count = 0 updated_entries = [] fixed_details = {} # Key: entry_key, Value: list of changes removed_details = [] # List of (entry_key, reason) manual_review_queue = [] # List of (entry, best_result, candidates) # --- Phase 0: Sanitize (Offline Checks) --- print("๐Ÿงน Running formatting sanity checks...") sanitize_fixes = sanitizer.sanitize_all(entries) ui.show_sanitize_report(sanitize_fixes) # If sanitization made changes, save immediately so Phase 1 works on clean data if sanitize_fixes: bib_parser.save_entries(str(bib_path), entries) # Merge sanitize fixes into fixed_details for the final report for key, fixes in sanitize_fixes.items(): if key not in fixed_details: fixed_details[key] = [] for fix in fixes: fixed_details[key].append(fix.description) fixed_count += 1 # Duplicate detection dupes = sanitizer.find_duplicates(entries) if dupes: print(f"\nโš  Found {len(dupes)} duplicate title(s):") for title, keys in dupes.items(): print(f" {' / '.join(keys)}") print() # --- Phase 0.5: Local DB Lookup --- local_db = LocalConferenceDB() local_db_loaded = local_db.load() api_needed_entries = entries # Always verify against live/network sources. if local_db_loaded: local_matched_count = 0 for entry in entries: official = local_db.lookup(entry.title) if official: local_matched_count += 1 if local_matched_count > 0: print(f" ๐Ÿ“š Local DB matched: {local_matched_count}; still verifying all entries online") # --- Phase 1: Analysis (API Fetch) --- analysis_results = [] with progress.progress_context(len(api_needed_entries), "Analyzing Entries") as prog: with ThreadPoolExecutor(max_workers=min(10, max(1, len(api_needed_entries)))) as executor: futures = {executor.submit(validate_entry, e, workflow, fetchers, comparator): e for e in api_needed_entries} for future in as_completed(futures): entry = futures[future] try: best_result, candidates = future.result() analysis_results.append((entry, best_result, candidates)) prog.update(entry.key, "Analyzed", 1) except Exception as e: prog.mark_error() prog.update(entry.key, "Failed", 1) # Keep valid entry even if fetch failed analysis_results.append((entry, None, [])) # --- Phase 2: Meaningful Report --- # Categorize results to_fix = [] to_review = [] to_remove = [] ok_entries = [] for entry, best_result, candidates in analysis_results: if not best_result: ok_entries.append(entry) continue if best_result.is_match and best_result.fetched_data: to_fix.append((entry, best_result, candidates)) elif candidates: to_review.append((entry, best_result, candidates)) else: to_remove.append(entry) # Visualize Analysis Report ui.show_analysis_report(ok_entries, to_fix, to_review, to_remove) if not (to_fix or to_review or to_remove): return # --- Phase 3: Apply Fixes --- print(f"\n๐Ÿš€ Applying fixes...") updated_entries = [] # Add OK entries first (preserve order if we cared, but we sort later usually) updated_entries.extend(ok_entries) # Process Fixes for entry, best_result, candidates in to_fix: changes = apply_fix(entry, best_result.fetched_data, all_candidates=candidates, allow_optional_updates=True) if changes: fixed_count += 1 fixed_details[entry.key] = changes updated_entries.append(entry) # Process Removals for entry in to_remove: removed_details.append((entry, "No matching metadata found in any source")) # Do NOT add to updated_entries # Process Reviews (Add to queue) for item in to_review: manual_review_queue.append(item) updated_entries.append(item[0]) # Add tentatively, filter later if removed # --- Interactive Manual Review --- if manual_review_queue: print(f"\n\n๐Ÿ” Manual Review Required for {len(manual_review_queue)} entries:") # Sort by key for consistent order manual_review_queue.sort(key=lambda x: x[0].key) entries_to_remove = set() for entry, best_res, candidates in manual_review_queue: ui.show_manual_review(entry, best_res, candidates, apply_fix) while True: choice = input(f"\nSelect [1-{len(candidates)}], (s)kip, (r)emove, or (q)uit: ").strip().lower() if choice == 'q': print("Exiting manual review.") # Keep remaining in queue as is (already in updated_entries) break elif choice == 's': print("Skipped.") break elif choice == 'r': print("Marked for removal.") entries_to_remove.add(entry.key) removed_details.append((entry, "Removed by user during manual review")) break elif choice.isdigit(): idx = int(choice) - 1 if 0 <= idx < len(candidates): selected = candidates[idx] if not _candidate_exact_match(selected): print("Cannot apply: selected candidate is not an exact title/author/year match.") continue changes = apply_fix(entry, selected.fetched_data, allow_optional_updates=True) if changes: fixed_count += 1 if entry.key not in fixed_details: fixed_details[entry.key] = [] fixed_details[entry.key].extend(changes) print(f"Applied: {', '.join(changes)}") else: print("No changes needed for selected source.") break else: print("Invalid selection.") else: print("Invalid input.") if choice == 'q': break # Filter out removed entries if entries_to_remove: updated_entries = [e for e in updated_entries if e.key not in entries_to_remove] # Overwrite file if changes made # Overwrite file if changes made (beyond Phase 0 sanitization) has_phase1_changes = any(k not in sanitize_fixes for k in fixed_details) or removed_details if has_phase1_changes or fixed_count > len(sanitize_fixes): bib_parser.save_entries(str(bib_path), updated_entries) # --- Pass 2: Double Check --- print("\n๐Ÿ”„ Double checking (Re-validation)...") entries = bib_parser.parse_file(str(bib_path)) reports = [] with progress.progress_context(len(entries), "Verifying") as prog: with ThreadPoolExecutor(max_workers=min(10, len(entries))) as executor: # Note: validate_entry now returns tuple, need to handle futures = {executor.submit(validate_entry, e, workflow, fetchers, comparator): e for e in entries} for future in as_completed(futures): entry = futures[future] try: best_result, _ = future.result() # Ignore candidates in verify pass reports.append(EntryReport(entry=entry, comparison=best_result)) if best_result.is_match: prog.mark_success() else: prog.mark_error() prog.update(entry.key, "Verified", 1) except Exception: prog.mark_error() prog.update(entry.key, "Failed", 1) # Summary total = len(entries) verified = sum(1 for r in reports if r.comparison and r.comparison.is_match) issues = sum(1 for r in reports if r.comparison and r.comparison.has_issues) not_found = sum(1 for r in reports if r.comparison and not r.comparison.is_match and not r.comparison.has_issues) # Visual Final Status ui.show_final_report(total, verified, issues, not_found, reports, fixed_count, fixed_details, removed_details) print("") def apply_local_fix(entry, official) -> list: """ Apply non-core fixes from local conference DB. This never changes title, authors, or year; those fields define the reference identity and must be verified against live metadata. """ changes = [] # Entry type upgrade: misc/article โ†’ inproceedings if booktitle exists if official.booktitle and entry.entry_type.lower() in ('misc', 'article'): old_type = entry.entry_type entry.entry_type = 'inproceedings' if 'ENTRYTYPE' in entry.raw_entry: entry.raw_entry['ENTRYTYPE'] = 'inproceedings' # Clear journal if it was arXiv if entry.journal and 'arxiv' in entry.journal.lower(): entry.journal = "" if 'journal' in entry.raw_entry: del entry.raw_entry['journal'] changes.append(f"Type: @{old_type} โ†’ @inproceedings [local_db]") # Booktitle: adopt from DB if missing or different if official.booktitle and not entry.booktitle: entry.booktitle = official.booktitle entry.raw_entry['booktitle'] = official.booktitle changes.append(f"Booktitle: [Added] {official.booktitle[:50]}... [local_db]") # DOI: adopt if missing if official.doi and not entry.doi: entry.doi = official.doi entry.raw_entry['doi'] = official.doi changes.append(f"DOI: [Added] {official.doi} [local_db]") return changes def apply_fix( entry, data, all_candidates=None, *, allow_core_updates: bool = False, allow_optional_updates: bool = False, ) -> list: """Update only safe metadata by default. Core identity fields (title, author, year) are not overwritten unless allow_core_updates=True. RefCheck should validate references, not transform a nearby candidate into a different citation. """ changes = [] # Helper to clean string def clean(s): return str(s).strip() if s else "" # Title new_title = clean(data.title) if new_title and new_title.lower() != entry.title.lower(): if allow_core_updates: changes.append(f"Title: {entry.title} -> {new_title}") entry.title = new_title # Year: Use resolve_year() if we have multiple candidates if allow_core_updates: if all_candidates: best_year, year_src = resolve_year(all_candidates, bib_year=entry.year) if best_year and best_year != entry.year: if int(best_year) > CURRENT_YEAR: changes.append(f"โš  Skip suspicious future year {best_year} from {year_src}") else: changes.append(f"Year: {entry.year} -> {best_year} [{year_src}]") entry.year = best_year else: # Single candidate fallback new_year = clean(getattr(data, 'year', '')) if new_year and new_year != entry.year: if new_year.isdigit() and int(new_year) > CURRENT_YEAR: changes.append(f"โš  Skip suspicious future year {new_year}") else: changes.append(f"Year: {entry.year} -> {new_year}") entry.year = new_year # Author: Smart Merge Strategy # Check for author initial conflict first has_initial_conflict = False if all_candidates: for cand in all_candidates: if hasattr(cand, 'author_initial_conflict') and cand.author_initial_conflict: has_initial_conflict = True break if not allow_core_updates: pass elif has_initial_conflict: # Don't overwrite authors when initials conflict changes.append(f"โš  Author initial conflict detected โ€” preserving bib authors") else: # Normal author merge logic current_authors_raw = TextNormalizer.parse_author_list(entry.author) current_authors_norm = [TextNormalizer.normalize_author_name(a) for a in current_authors_raw] new_authors_list = getattr(data, 'authors', []) if isinstance(new_authors_list, str): new_authors_list = TextNormalizer.parse_author_list(new_authors_list) # Strip DBLP disambiguation IDs from new authors new_authors_list = [TextNormalizer.strip_dblp_disambiguation_id(str(a)) for a in new_authors_list] # Also check if the EXISTING bib authors have DBLP disambiguation IDs baked in for raw_auth in current_authors_raw: if TextNormalizer.has_dblp_disambiguation_id(raw_auth.strip()): changes.append(f"โš  DBLP disambiguation ID detected in author: '{raw_auth.strip()}'") final_authors = [] for new_auth in new_authors_list: new_auth_str = str(new_auth).strip() new_auth_norm = TextNormalizer.normalize_author_name(new_auth_str) # Try to find a match in the existing list match_found = False for i, old_norm in enumerate(current_authors_norm): if old_norm == new_auth_norm: # Found a match! Use the OLD format final_authors.append(current_authors_raw[i].strip()) match_found = True break if not match_found: # New author, use the new string final_authors.append(new_auth_str) # Reconstruct the string new_author_str = " and ".join(final_authors) # Check if the result is effectively different from the original full string def simple_norm(s): return s.lower().replace(" ", "").strip() if simple_norm(new_author_str) != simple_norm(entry.author): old_auth = (entry.author[:50] + '...') if len(entry.author) > 50 else entry.author new_auth_disp = (new_author_str[:50] + '...') if len(new_author_str) > 50 else new_author_str changes.append(f"Author: {old_auth} -> {new_auth_disp}") entry.author = new_author_str # Optional fields (doi, journal, etc.) if allow_optional_updates and hasattr(data, 'doi') and data.doi and not entry.doi: changes.append(f"DOI: [Added] {data.doi}") entry.doi = data.doi return changes def _candidate_exact_match(candidate) -> bool: return bool( candidate and getattr(candidate, "is_match", False) and getattr(candidate, "title_match", False) and getattr(candidate, "author_match", False) and getattr(candidate, "year_match", False) and not getattr(candidate, "author_initial_conflict", False) ) def validate_entry(entry, workflow, fetchers, comparator): """Validate a single entry against configured data sources. Returns (best_result, all_results).""" from src.utils import TextNormalizer results = [] for step in workflow.get_enabled_steps(): result = None data = None if step.name == "arxiv_id" and entry.has_arxiv: data = fetchers['arxiv'].fetch_by_id(entry.arxiv_id) if data: result = comparator.compare(entry, data, "arxiv") elif step.name == "crossref_doi" and entry.doi: data = fetchers['crossref'].search_by_doi(entry.doi) if data: # DOI cross-validation: check if the DOI actually resolves to this paper from src.sanitizer import BibSanitizer doi_fixes = BibSanitizer().check_doi_title_match(entry, data) if doi_fixes: # DOI points to a different work โ€” skip this result # The fixes have already cleared the bad DOI from the entry result = None else: result = comparator.compare(entry, data, "crossref") elif step.name == "semantic_scholar" and entry.title: data = fetchers['semantic'].fetch_by_doi(entry.doi) if entry.doi else None if not data: data = fetchers['semantic'].search_by_title(entry.title) if data: result = comparator.compare(entry, data, "semantic_scholar") elif step.name == "dblp" and entry.title: data = fetchers['dblp'].search_by_title(entry.title) if data: result = comparator.compare(entry, data, "dblp") elif step.name == "openalex" and entry.title: data = fetchers['openalex'].fetch_by_doi(entry.doi) if entry.doi else None if not data: data = fetchers['openalex'].search_by_title(entry.title) if data: result = comparator.compare(entry, data, "openalex") elif step.name == "arxiv_title" and entry.title: metas = fetchers['arxiv'].search_by_title(entry.title) if metas: norm1 = TextNormalizer.normalize_for_comparison(entry.title) best, best_sim = None, 0 for m in metas: sim = TextNormalizer.similarity_ratio( norm1, TextNormalizer.normalize_for_comparison(m.title) ) if sim > best_sim: best, best_sim = m, sim if best and best_sim > 0.5: result = comparator.compare(entry, best, "arxiv") elif step.name == "crossref_title" and entry.title: data = fetchers['crossref'].search_by_title(entry.title) if data: result = comparator.compare(entry, data, "crossref") elif step.name == "google_scholar" and entry.title: data = fetchers['scholar'].search_by_title(entry.title) if data: result = comparator.compare(entry, data, "scholar") if result: result.evidence_step = step.name result.evidence_url = getattr(data, "url", "") if data else "" results.append(result) if results: best = max(results, key=lambda r: r.confidence) _apply_cross_source_conflict_guard(best, results) _apply_evidence_guard(best, results) return best, results # No results return comparator.create_unable_result(entry, "Not found in any data source"), [] def _apply_cross_source_conflict_guard(best, results) -> None: """Reject candidates when exact-title sources disagree on core metadata.""" if not best or not getattr(best, "fetched_title", ""): return conflicts = [] for result in results: if result is best: continue if getattr(result, "title_similarity", 0.0) < 0.95: continue best_year = str(getattr(best, "fetched_year", "") or "").strip() other_year = str(getattr(result, "fetched_year", "") or "").strip() if best_year and other_year and best_year != other_year: conflicts.append(f"{result.source}={other_year}") if not conflicts: return issue = ( f"Cross-source year conflict: best {best.source}={best.fetched_year}, " f"also found {'; '.join(dict.fromkeys(conflicts))}" ) if issue not in best.issues: best.issues.append(issue) best.is_match = False best.confidence = min(best.confidence, 0.8) def _apply_evidence_guard(best, results) -> None: """Require primary evidence or at least two agreeing exact sources.""" if not best or not getattr(best, "is_match", False): return evidence_step = getattr(best, "evidence_step", "") if evidence_step in {"arxiv_id", "arxiv_title", "crossref_doi"}: return best_year = str(getattr(best, "fetched_year", "") or "").strip() agreeing_sources = {getattr(best, "source", "")} for result in results: if result is best or not getattr(result, "is_match", False): continue if getattr(result, "title_similarity", 0.0) < 0.95: continue other_year = str(getattr(result, "fetched_year", "") or "").strip() if best_year and other_year == best_year: agreeing_sources.add(getattr(result, "source", "")) if len(agreeing_sources) >= 2: return issue = ( "Insufficient evidence: exact match found only in " f"{best.source}; needs arXiv/DOI evidence or another agreeing source" ) if issue not in best.issues: best.issues.append(issue) best.is_match = False best.confidence = min(best.confidence, 0.8) if __name__ == "__main__": main()