""" Non-interactive RefCheck workflow for Hugging Face Spaces. """ from __future__ import annotations import copy import tempfile from dataclasses import dataclass, field from functools import lru_cache from pathlib import Path from typing import Any from concurrent.futures import ThreadPoolExecutor, as_completed from main import ( apply_fix, get_default_workflow, validate_entry, ) from src.comparator import EntryReport, MetadataComparator from src.fetcher import ( ArxivFetcher, CrossRefFetcher, DBLPFetcher, OpenAlexFetcher, ScholarFetcher, SemanticScholarFetcher, ) from src.local_db import LocalConferenceDB from src.parser import BibEntry, BibParser from src.sanitizer import BibSanitizer, SanitizeFix @dataclass class RefCheckOptions: """Options for a non-interactive RefCheck run.""" remove_unverified: bool = True enable_google_scholar: bool = False max_workers: int = 4 @dataclass class RefCheckResult: """Artifacts and summary produced by a Space run.""" source_stem: str = "references" total_input: int = 0 total_output: int = 0 verified: int = 0 issues: int = 0 not_found: int = 0 entries: list[BibEntry] = field(default_factory=list) review_items: list[dict[str, Any]] = field(default_factory=list) fixed_details: dict[str, list[str]] = field(default_factory=dict) removed_details: list[tuple[str, str, str]] = field(default_factory=list) review_details: list[dict[str, Any]] = field(default_factory=list) duplicate_details: dict[str, list[str]] = field(default_factory=dict) sanitize_fixes: dict[str, list[SanitizeFix]] = field(default_factory=dict) local_matches: int = 0 local_db_loaded: bool = False fixed_bib_path: str = "" report_path: str = "" report_markdown: str = "" def run_refcheck_file(file_path: str | Path, options: RefCheckOptions | None = None) -> RefCheckResult: """Validate and fix an uploaded BibTeX file without interactive prompts.""" options = options or RefCheckOptions() source_path = Path(file_path) parser = BibParser() entries = parser.parse_file(str(source_path)) result = RefCheckResult(source_stem=source_path.stem or "references", total_input=len(entries)) if not entries: result.report_markdown = "## RefCheck Report\n\nNo BibTeX entries were found." result.report_path = _write_report(result.report_markdown) result.fixed_bib_path = _write_bib(parser, [], result.source_stem) return result sanitizer = BibSanitizer() result.sanitize_fixes = sanitizer.sanitize_all(entries) _record_sanitize_fixes(result.fixed_details, result.sanitize_fixes) result.duplicate_details = sanitizer.find_duplicates(entries) result.local_db_loaded, api_entries, result.local_matches = _apply_local_db(entries, result.fixed_details) fetchers = _build_fetchers() workflow = get_default_workflow() for step in workflow.steps: if step.name == "google_scholar": step.enabled = options.enable_google_scholar comparator = MetadataComparator() analysis = _analyze_entries(api_entries, workflow, fetchers, comparator, options.max_workers) actions: dict[str, tuple[str, Any, list[Any]]] = {} for entry, best_result, candidates in analysis: if not best_result: actions[entry.key] = ("keep", None, []) elif best_result.is_match and best_result.fetched_data: actions[entry.key] = ("fix", best_result, candidates) elif candidates: actions[entry.key] = ("review", best_result, candidates) else: actions[entry.key] = ("remove", best_result, candidates) updated_entries: list[BibEntry] = [] for entry in entries: action, best_result, candidates = actions.get(entry.key, ("keep", None, [])) if action == "fix": changes = apply_fix(entry, best_result.fetched_data, all_candidates=candidates) if changes: result.fixed_details.setdefault(entry.key, []).extend(changes) updated_entries.append(entry) elif action == "review": result.review_items.append(_review_item(entry, best_result, candidates)) updated_entries.append(entry) elif action == "remove": if options.remove_unverified: result.removed_details.append((entry.key, entry.title, "No matching metadata found in any source")) else: result.review_items.append(_review_item(entry, best_result, candidates)) updated_entries.append(entry) else: updated_entries.append(entry) result.entries = updated_entries return finalize_result(result, options) def finalize_result(result: RefCheckResult, options: RefCheckOptions | None = None) -> RefCheckResult: """Write current entries, re-verify them, and refresh downloadable artifacts.""" options = options or RefCheckOptions() parser = BibParser() fetchers = _build_fetchers() workflow = get_default_workflow() for step in workflow.steps: if step.name == "google_scholar": step.enabled = options.enable_google_scholar comparator = MetadataComparator() result.review_details = [_review_payload_from_item(item) for item in result.review_items] result.total_output = len(result.entries) fixed_path = _write_bib(parser, result.entries, result.source_stem) result.fixed_bib_path = fixed_path verified_entries = parser.parse_file(fixed_path) verification_reports = _verify_entries( verified_entries, workflow, fetchers, comparator, options.max_workers, ) result.verified = sum(1 for r in verification_reports if r.comparison and r.comparison.is_match) result.issues = sum(1 for r in verification_reports if r.comparison and r.comparison.has_issues) result.not_found = sum( 1 for r in verification_reports if r.comparison and not r.comparison.is_match and not r.comparison.has_issues ) result.report_markdown = _build_report(result, verification_reports) result.report_path = _write_report(result.report_markdown) return result def preview_review_action( result: RefCheckResult | None, review_index: int, action: str, candidate_index: int | None = None, options: RefCheckOptions | None = None, ) -> str: """Preview and test a manual review action without mutating the session.""" if not result or not result.review_items: return "No unresolved entries are available." if review_index < 0 or review_index >= len(result.review_items): return "Select an unresolved entry first." options = options or RefCheckOptions() item = result.review_items[review_index] entry = _find_entry(result.entries, item["entry_key"]) if not entry: return "The selected entry is no longer in the working bibliography." if action == "keep": return _entry_preview_markdown(entry, "Keep original entry", ["No metadata changes will be applied."]) if action == "remove": return _entry_preview_markdown(entry, "Remove entry", ["This entry will be removed from the exported BibTeX."]) if action != "candidate": return "Select a candidate, keep, or remove action." candidates = item.get("candidates", []) if candidate_index is None or candidate_index < 0 or candidate_index >= len(candidates): return "Select a candidate first." candidate = candidates[candidate_index] if not _candidate_exact_match(candidate): return _entry_preview_markdown( entry, "Candidate blocked", [ "This candidate is not an exact title/author/year match, so RefCheck will not auto-apply it.", f"Candidate source: {candidate.source}", f"Candidate confidence: {candidate.confidence:.2f}", *_candidate_issue_lines(candidate), ], ) temp_entry = copy.deepcopy(entry) changes = apply_fix(temp_entry, candidate.fetched_data, allow_optional_updates=True) if not changes: changes = ["No field-level changes are needed for this candidate."] fetchers = _build_fetchers() workflow = get_default_workflow() for step in workflow.steps: if step.name == "google_scholar": step.enabled = options.enable_google_scholar comparator = MetadataComparator() best_result, _ = validate_entry(temp_entry, workflow, fetchers, comparator) test_lines = [ f"Candidate source: {candidate.source}", f"Candidate confidence before apply: {candidate.confidence:.2f}", ] if best_result: test_lines.extend( [ f"Verification source after apply: {best_result.source}", f"Verification confidence after apply: {best_result.confidence:.2f}", f"Verified after apply: {'yes' if best_result.is_match else 'no'}", ] ) if best_result.issues: test_lines.append(f"Remaining issues: {'; '.join(best_result.issues)}") return _entry_preview_markdown(temp_entry, "Candidate test", changes + test_lines) def apply_review_action( result: RefCheckResult | None, review_index: int, action: str, candidate_index: int | None = None, options: RefCheckOptions | None = None, ) -> RefCheckResult: """Apply a manual review action to the working bibliography.""" if not result or not result.review_items: raise ValueError("No unresolved entries are available.") if review_index < 0 or review_index >= len(result.review_items): raise ValueError("Select an unresolved entry first.") options = options or RefCheckOptions() item = result.review_items[review_index] entry = _find_entry(result.entries, item["entry_key"]) if not entry: raise ValueError("The selected entry is no longer in the working bibliography.") if action == "candidate": candidates = item.get("candidates", []) if candidate_index is None or candidate_index < 0 or candidate_index >= len(candidates): raise ValueError("Select a candidate first.") candidate = candidates[candidate_index] if not _candidate_exact_match(candidate): raise ValueError( "Selected candidate is not an exact title/author/year match; RefCheck will not auto-overwrite core metadata." ) changes = apply_fix(entry, candidate.fetched_data, allow_optional_updates=True) changes.append(f"Resolved manually with candidate from {candidate.source}.") result.fixed_details.setdefault(entry.key, []).extend(changes) elif action == "remove": result.entries = [existing for existing in result.entries if existing.key != entry.key] result.removed_details.append((entry.key, entry.title, "Removed during manual review")) elif action == "keep": result.fixed_details.setdefault(entry.key, []).append("Marked as manually reviewed; kept original entry.") else: raise ValueError("Select a candidate, keep, or remove action.") del result.review_items[review_index] return finalize_result(result, options) def _find_entry(entries: list[BibEntry], key: str) -> BibEntry | None: for entry in entries: if entry.key == key: return entry return None def _candidate_exact_match(candidate: Any) -> bool: return bool( candidate and getattr(candidate, "is_match", False) and getattr(candidate, "title_match", False) and getattr(candidate, "author_match", False) and getattr(candidate, "year_match", False) and not getattr(candidate, "author_initial_conflict", False) ) def _candidate_issue_lines(candidate: Any) -> list[str]: lines = list(getattr(candidate, "issues", []) or []) if not getattr(candidate, "title_match", False): lines.append("Title is not an exact-enough match") if not getattr(candidate, "author_match", False): lines.append("Authors are not an exact-enough match") if not getattr(candidate, "year_match", False): bib_year = getattr(candidate, "bib_year", "") or "[missing]" fetched_year = getattr(candidate, "fetched_year", "") or "[missing]" lines.append(f"Year mismatch: bib={bib_year}, candidate={fetched_year}") return [f"Blocking issue: {line}" for line in dict.fromkeys(lines)] def _entry_preview_markdown(entry: BibEntry, title: str, lines: list[str]) -> str: body = "\n".join(f"- {line}" for line in lines) return ( f"### {title}\n\n" f"**Key:** `{entry.key}`\n\n" f"**Title:** {entry.title or '[missing]'}\n\n" f"**Authors:** {entry.author or '[missing]'}\n\n" f"**Year:** {entry.year or '[missing]'}\n\n" f"{body}" ) def _build_fetchers() -> dict[str, Any]: return { "arxiv": ArxivFetcher(), "crossref": CrossRefFetcher(), "scholar": ScholarFetcher(), "semantic": SemanticScholarFetcher(), "openalex": OpenAlexFetcher(), "dblp": DBLPFetcher(), } def _analyze_entries( entries: list[BibEntry], workflow: Any, fetchers: dict[str, Any], comparator: MetadataComparator, max_workers: int, ) -> list[tuple[BibEntry, Any, list[Any]]]: if not entries: return [] analysis: list[tuple[BibEntry, Any, list[Any]]] = [] worker_count = min(max(1, max_workers), len(entries)) with ThreadPoolExecutor(max_workers=worker_count) as executor: futures = { executor.submit(validate_entry, entry, workflow, fetchers, comparator): entry for entry in entries } for future in as_completed(futures): entry = futures[future] try: best_result, candidates = future.result() except Exception: best_result, candidates = None, [] analysis.append((entry, best_result, candidates)) return analysis def _verify_entries( entries: list[BibEntry], workflow: Any, fetchers: dict[str, Any], comparator: MetadataComparator, max_workers: int, ) -> list[EntryReport]: reports: list[EntryReport] = [] for entry, best_result, _ in _analyze_entries(entries, workflow, fetchers, comparator, max_workers): reports.append(EntryReport(entry=entry, comparison=best_result)) return reports def _record_sanitize_fixes( fixed_details: dict[str, list[str]], sanitize_fixes: dict[str, list[SanitizeFix]], ) -> None: for key, fixes in sanitize_fixes.items(): fixed_details.setdefault(key, []) fixed_details[key].extend(fix.description for fix in fixes) def _apply_local_db( entries: list[BibEntry], fixed_details: dict[str, list[str]], ) -> tuple[bool, list[BibEntry], int]: local_db = _load_local_db() if not local_db.is_loaded: return False, entries, 0 match_count = 0 for entry in entries: official = local_db.lookup(entry.title) if official: match_count += 1 return True, entries, match_count @lru_cache(maxsize=1) def _load_local_db() -> LocalConferenceDB: local_db = LocalConferenceDB() local_db.load() return local_db def _review_item(entry: BibEntry, best_result: Any, candidates: list[Any]) -> dict[str, Any]: sorted_candidates = sorted(candidates, key=lambda item: item.confidence, reverse=True) return { "entry_key": entry.key, "entry": entry, "best_result": best_result, "candidates": sorted_candidates, } def _review_payload_from_item(item: dict[str, Any]) -> dict[str, Any]: return _review_payload( item["entry"], item.get("best_result"), item.get("candidates", []), ) def _review_payload(entry: BibEntry, best_result: Any, candidates: list[Any]) -> dict[str, Any]: return { "key": entry.key, "title": entry.title, "reason": "; ".join(best_result.issues) if best_result and best_result.issues else "Ambiguous match", "candidates": [ { "source": candidate.source, "confidence": candidate.confidence, "title": getattr(candidate.fetched_data, "title", ""), "year": getattr(candidate.fetched_data, "year", ""), "doi": getattr(candidate.fetched_data, "doi", ""), } for candidate in candidates[:5] ], } def _write_bib(parser: BibParser, entries: list[BibEntry], original_stem: str) -> str: out_dir = Path(tempfile.mkdtemp(prefix="refcheck_")) out_path = out_dir / f"{original_stem or 'references'}_refcheck_fixed.bib" parser.save_entries(str(out_path), entries) return str(out_path) def _write_report(markdown: str) -> str: out_dir = Path(tempfile.mkdtemp(prefix="refcheck_report_")) out_path = out_dir / "refcheck_report.md" out_path.write_text(markdown, encoding="utf-8") return str(out_path) def _build_report(result: RefCheckResult, reports: list[EntryReport]) -> str: lines = [ "## RefCheck Report", "", "### Summary", "", f"- Input entries: {result.total_input}", f"- Output entries: {result.total_output}", f"- Verified after fix: {result.verified}", f"- Remaining issues: {result.issues}", f"- Not found after fix: {result.not_found}", f"- Local DB loaded: {'yes' if result.local_db_loaded else 'no'}", f"- Local DB matches: {result.local_matches}", "", ] gate_status, gate_reasons = _submission_safety_gate(result) lines.extend(["### Submission Safety Gate", ""]) lines.append(f"- Status: **{gate_status}**") for reason in gate_reasons: lines.append(f"- {reason}") lines.append("") if result.removed_details: lines.extend(["### Removed", ""]) for key, title, reason in result.removed_details: lines.append(f"- `{key}`: {title} ({reason})") lines.append("") if result.fixed_details: lines.extend(["### Fixed", ""]) for key, changes in sorted(result.fixed_details.items()): lines.append(f"- `{key}`") for change in changes: lines.append(f" - {change}") lines.append("") if result.duplicate_details: lines.extend(["### Duplicate Titles", ""]) for title, keys in result.duplicate_details.items(): lines.append(f"- `{', '.join(keys)}`: {title}") lines.append("") if result.review_details: lines.extend(["### Needs Review", ""]) for item in result.review_details: lines.append(f"- `{item['key']}`: {item['title']}") lines.append(f" - Reason: {item['reason']}") for candidate in item["candidates"]: lines.append( " - Candidate: " f"{candidate['source']} " f"(confidence {candidate['confidence']:.2f}) " f"{candidate['title']} " f"{candidate['year']} " f"{candidate['doi']}".strip() ) lines.append("") remaining = [ report for report in reports if report.comparison and not report.comparison.is_match ] if remaining: lines.extend(["### Verification Issues", ""]) for report in remaining: comparison = report.comparison issues = "; ".join(comparison.issues) if comparison.issues else "Not matched" lines.append( f"- `{report.entry.key}` via {comparison.source} " f"(confidence {comparison.confidence:.2f}): {issues}" ) lines.append("") return "\n".join(lines).strip() + "\n" def _submission_safety_gate(result: RefCheckResult) -> tuple[str, list[str]]: reasons = [] if result.review_details: reasons.append(f"FAIL: {len(result.review_details)} reference(s) still need manual review.") if result.issues: reasons.append(f"FAIL: {result.issues} reference(s) still have strict verification issues.") if result.not_found: reasons.append(f"FAIL: {result.not_found} reference(s) could not be found in configured sources.") if result.removed_details: reasons.append( f"FAIL: {len(result.removed_details)} reference(s) were removed; confirm the paper text no longer cites them." ) if result.total_output and result.verified != result.total_output: reasons.append(f"FAIL: only {result.verified}/{result.total_output} output reference(s) are strictly verified.") if result.duplicate_details: reasons.append(f"WARN: {len(result.duplicate_details)} duplicate title group(s) should be checked.") failures = [reason for reason in reasons if reason.startswith("FAIL")] if failures: return "FAIL - do not submit yet", reasons return "PASS - all output references are strictly verified", reasons or ["PASS: no unresolved reference risks detected."]