| """ |
| Non-interactive RefCheck workflow for Hugging Face Spaces. |
| """ |
| from __future__ import annotations |
|
|
| import copy |
| import tempfile |
| from dataclasses import dataclass, field |
| from functools import lru_cache |
| from pathlib import Path |
| from typing import Any |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
| from main import ( |
| apply_fix, |
| get_default_workflow, |
| validate_entry, |
| ) |
| from src.comparator import EntryReport, MetadataComparator |
| from src.fetcher import ( |
| ArxivFetcher, |
| CrossRefFetcher, |
| DBLPFetcher, |
| OpenAlexFetcher, |
| ScholarFetcher, |
| SemanticScholarFetcher, |
| ) |
| from src.local_db import LocalConferenceDB |
| from src.parser import BibEntry, BibParser |
| from src.sanitizer import BibSanitizer, SanitizeFix |
|
|
|
|
| @dataclass |
| class RefCheckOptions: |
| """Options for a non-interactive RefCheck run.""" |
|
|
| remove_unverified: bool = True |
| enable_google_scholar: bool = False |
| max_workers: int = 4 |
|
|
|
|
| @dataclass |
| class RefCheckResult: |
| """Artifacts and summary produced by a Space run.""" |
|
|
| source_stem: str = "references" |
| total_input: int = 0 |
| total_output: int = 0 |
| verified: int = 0 |
| issues: int = 0 |
| not_found: int = 0 |
| entries: list[BibEntry] = field(default_factory=list) |
| review_items: list[dict[str, Any]] = field(default_factory=list) |
| fixed_details: dict[str, list[str]] = field(default_factory=dict) |
| removed_details: list[tuple[str, str, str]] = field(default_factory=list) |
| review_details: list[dict[str, Any]] = field(default_factory=list) |
| duplicate_details: dict[str, list[str]] = field(default_factory=dict) |
| sanitize_fixes: dict[str, list[SanitizeFix]] = field(default_factory=dict) |
| local_matches: int = 0 |
| local_db_loaded: bool = False |
| fixed_bib_path: str = "" |
| report_path: str = "" |
| report_markdown: str = "" |
|
|
|
|
| def run_refcheck_file(file_path: str | Path, options: RefCheckOptions | None = None) -> RefCheckResult: |
| """Validate and fix an uploaded BibTeX file without interactive prompts.""" |
| options = options or RefCheckOptions() |
| source_path = Path(file_path) |
| parser = BibParser() |
| entries = parser.parse_file(str(source_path)) |
| result = RefCheckResult(source_stem=source_path.stem or "references", total_input=len(entries)) |
|
|
| if not entries: |
| result.report_markdown = "## RefCheck Report\n\nNo BibTeX entries were found." |
| result.report_path = _write_report(result.report_markdown) |
| result.fixed_bib_path = _write_bib(parser, [], result.source_stem) |
| return result |
|
|
| sanitizer = BibSanitizer() |
| result.sanitize_fixes = sanitizer.sanitize_all(entries) |
| _record_sanitize_fixes(result.fixed_details, result.sanitize_fixes) |
| result.duplicate_details = sanitizer.find_duplicates(entries) |
|
|
| result.local_db_loaded, api_entries, result.local_matches = _apply_local_db(entries, result.fixed_details) |
|
|
| fetchers = _build_fetchers() |
| workflow = get_default_workflow() |
| for step in workflow.steps: |
| if step.name == "google_scholar": |
| step.enabled = options.enable_google_scholar |
|
|
| comparator = MetadataComparator() |
| analysis = _analyze_entries(api_entries, workflow, fetchers, comparator, options.max_workers) |
|
|
| actions: dict[str, tuple[str, Any, list[Any]]] = {} |
|
|
| for entry, best_result, candidates in analysis: |
| if not best_result: |
| actions[entry.key] = ("keep", None, []) |
| elif best_result.is_match and best_result.fetched_data: |
| actions[entry.key] = ("fix", best_result, candidates) |
| elif candidates: |
| actions[entry.key] = ("review", best_result, candidates) |
| else: |
| actions[entry.key] = ("remove", best_result, candidates) |
|
|
| updated_entries: list[BibEntry] = [] |
|
|
| for entry in entries: |
| action, best_result, candidates = actions.get(entry.key, ("keep", None, [])) |
|
|
| if action == "fix": |
| changes = apply_fix(entry, best_result.fetched_data, all_candidates=candidates) |
| if changes: |
| result.fixed_details.setdefault(entry.key, []).extend(changes) |
| updated_entries.append(entry) |
| elif action == "review": |
| result.review_items.append(_review_item(entry, best_result, candidates)) |
| updated_entries.append(entry) |
| elif action == "remove": |
| if options.remove_unverified: |
| result.removed_details.append((entry.key, entry.title, "No matching metadata found in any source")) |
| else: |
| result.review_items.append(_review_item(entry, best_result, candidates)) |
| updated_entries.append(entry) |
| else: |
| updated_entries.append(entry) |
|
|
| result.entries = updated_entries |
| return finalize_result(result, options) |
|
|
|
|
| def finalize_result(result: RefCheckResult, options: RefCheckOptions | None = None) -> RefCheckResult: |
| """Write current entries, re-verify them, and refresh downloadable artifacts.""" |
| options = options or RefCheckOptions() |
| parser = BibParser() |
| fetchers = _build_fetchers() |
| workflow = get_default_workflow() |
| for step in workflow.steps: |
| if step.name == "google_scholar": |
| step.enabled = options.enable_google_scholar |
|
|
| comparator = MetadataComparator() |
| result.review_details = [_review_payload_from_item(item) for item in result.review_items] |
| result.total_output = len(result.entries) |
| fixed_path = _write_bib(parser, result.entries, result.source_stem) |
| result.fixed_bib_path = fixed_path |
|
|
| verified_entries = parser.parse_file(fixed_path) |
| verification_reports = _verify_entries( |
| verified_entries, |
| workflow, |
| fetchers, |
| comparator, |
| options.max_workers, |
| ) |
| result.verified = sum(1 for r in verification_reports if r.comparison and r.comparison.is_match) |
| result.issues = sum(1 for r in verification_reports if r.comparison and r.comparison.has_issues) |
| result.not_found = sum( |
| 1 |
| for r in verification_reports |
| if r.comparison and not r.comparison.is_match and not r.comparison.has_issues |
| ) |
|
|
| result.report_markdown = _build_report(result, verification_reports) |
| result.report_path = _write_report(result.report_markdown) |
| return result |
|
|
|
|
| def preview_review_action( |
| result: RefCheckResult | None, |
| review_index: int, |
| action: str, |
| candidate_index: int | None = None, |
| options: RefCheckOptions | None = None, |
| ) -> str: |
| """Preview and test a manual review action without mutating the session.""" |
| if not result or not result.review_items: |
| return "No unresolved entries are available." |
| if review_index < 0 or review_index >= len(result.review_items): |
| return "Select an unresolved entry first." |
|
|
| options = options or RefCheckOptions() |
| item = result.review_items[review_index] |
| entry = _find_entry(result.entries, item["entry_key"]) |
| if not entry: |
| return "The selected entry is no longer in the working bibliography." |
|
|
| if action == "keep": |
| return _entry_preview_markdown(entry, "Keep original entry", ["No metadata changes will be applied."]) |
| if action == "remove": |
| return _entry_preview_markdown(entry, "Remove entry", ["This entry will be removed from the exported BibTeX."]) |
| if action != "candidate": |
| return "Select a candidate, keep, or remove action." |
|
|
| candidates = item.get("candidates", []) |
| if candidate_index is None or candidate_index < 0 or candidate_index >= len(candidates): |
| return "Select a candidate first." |
|
|
| candidate = candidates[candidate_index] |
| if not _candidate_exact_match(candidate): |
| return _entry_preview_markdown( |
| entry, |
| "Candidate blocked", |
| [ |
| "This candidate is not an exact title/author/year match, so RefCheck will not auto-apply it.", |
| f"Candidate source: {candidate.source}", |
| f"Candidate confidence: {candidate.confidence:.2f}", |
| *_candidate_issue_lines(candidate), |
| ], |
| ) |
|
|
| temp_entry = copy.deepcopy(entry) |
| changes = apply_fix(temp_entry, candidate.fetched_data, allow_optional_updates=True) |
| if not changes: |
| changes = ["No field-level changes are needed for this candidate."] |
|
|
| fetchers = _build_fetchers() |
| workflow = get_default_workflow() |
| for step in workflow.steps: |
| if step.name == "google_scholar": |
| step.enabled = options.enable_google_scholar |
| comparator = MetadataComparator() |
| best_result, _ = validate_entry(temp_entry, workflow, fetchers, comparator) |
| test_lines = [ |
| f"Candidate source: {candidate.source}", |
| f"Candidate confidence before apply: {candidate.confidence:.2f}", |
| ] |
| if best_result: |
| test_lines.extend( |
| [ |
| f"Verification source after apply: {best_result.source}", |
| f"Verification confidence after apply: {best_result.confidence:.2f}", |
| f"Verified after apply: {'yes' if best_result.is_match else 'no'}", |
| ] |
| ) |
| if best_result.issues: |
| test_lines.append(f"Remaining issues: {'; '.join(best_result.issues)}") |
|
|
| return _entry_preview_markdown(temp_entry, "Candidate test", changes + test_lines) |
|
|
|
|
| def apply_review_action( |
| result: RefCheckResult | None, |
| review_index: int, |
| action: str, |
| candidate_index: int | None = None, |
| options: RefCheckOptions | None = None, |
| ) -> RefCheckResult: |
| """Apply a manual review action to the working bibliography.""" |
| if not result or not result.review_items: |
| raise ValueError("No unresolved entries are available.") |
| if review_index < 0 or review_index >= len(result.review_items): |
| raise ValueError("Select an unresolved entry first.") |
|
|
| options = options or RefCheckOptions() |
| item = result.review_items[review_index] |
| entry = _find_entry(result.entries, item["entry_key"]) |
| if not entry: |
| raise ValueError("The selected entry is no longer in the working bibliography.") |
|
|
| if action == "candidate": |
| candidates = item.get("candidates", []) |
| if candidate_index is None or candidate_index < 0 or candidate_index >= len(candidates): |
| raise ValueError("Select a candidate first.") |
| candidate = candidates[candidate_index] |
| if not _candidate_exact_match(candidate): |
| raise ValueError( |
| "Selected candidate is not an exact title/author/year match; RefCheck will not auto-overwrite core metadata." |
| ) |
| changes = apply_fix(entry, candidate.fetched_data, allow_optional_updates=True) |
| changes.append(f"Resolved manually with candidate from {candidate.source}.") |
| result.fixed_details.setdefault(entry.key, []).extend(changes) |
| elif action == "remove": |
| result.entries = [existing for existing in result.entries if existing.key != entry.key] |
| result.removed_details.append((entry.key, entry.title, "Removed during manual review")) |
| elif action == "keep": |
| result.fixed_details.setdefault(entry.key, []).append("Marked as manually reviewed; kept original entry.") |
| else: |
| raise ValueError("Select a candidate, keep, or remove action.") |
|
|
| del result.review_items[review_index] |
| return finalize_result(result, options) |
|
|
|
|
| def _find_entry(entries: list[BibEntry], key: str) -> BibEntry | None: |
| for entry in entries: |
| if entry.key == key: |
| return entry |
| return None |
|
|
|
|
| def _candidate_exact_match(candidate: Any) -> bool: |
| return bool( |
| candidate |
| and getattr(candidate, "is_match", False) |
| and getattr(candidate, "title_match", False) |
| and getattr(candidate, "author_match", False) |
| and getattr(candidate, "year_match", False) |
| and not getattr(candidate, "author_initial_conflict", False) |
| ) |
|
|
|
|
| def _candidate_issue_lines(candidate: Any) -> list[str]: |
| lines = list(getattr(candidate, "issues", []) or []) |
| if not getattr(candidate, "title_match", False): |
| lines.append("Title is not an exact-enough match") |
| if not getattr(candidate, "author_match", False): |
| lines.append("Authors are not an exact-enough match") |
| if not getattr(candidate, "year_match", False): |
| bib_year = getattr(candidate, "bib_year", "") or "[missing]" |
| fetched_year = getattr(candidate, "fetched_year", "") or "[missing]" |
| lines.append(f"Year mismatch: bib={bib_year}, candidate={fetched_year}") |
| return [f"Blocking issue: {line}" for line in dict.fromkeys(lines)] |
|
|
|
|
| def _entry_preview_markdown(entry: BibEntry, title: str, lines: list[str]) -> str: |
| body = "\n".join(f"- {line}" for line in lines) |
| return ( |
| f"### {title}\n\n" |
| f"**Key:** `{entry.key}`\n\n" |
| f"**Title:** {entry.title or '[missing]'}\n\n" |
| f"**Authors:** {entry.author or '[missing]'}\n\n" |
| f"**Year:** {entry.year or '[missing]'}\n\n" |
| f"{body}" |
| ) |
|
|
|
|
| def _build_fetchers() -> dict[str, Any]: |
| return { |
| "arxiv": ArxivFetcher(), |
| "crossref": CrossRefFetcher(), |
| "scholar": ScholarFetcher(), |
| "semantic": SemanticScholarFetcher(), |
| "openalex": OpenAlexFetcher(), |
| "dblp": DBLPFetcher(), |
| } |
|
|
|
|
| def _analyze_entries( |
| entries: list[BibEntry], |
| workflow: Any, |
| fetchers: dict[str, Any], |
| comparator: MetadataComparator, |
| max_workers: int, |
| ) -> list[tuple[BibEntry, Any, list[Any]]]: |
| if not entries: |
| return [] |
|
|
| analysis: list[tuple[BibEntry, Any, list[Any]]] = [] |
| worker_count = min(max(1, max_workers), len(entries)) |
| with ThreadPoolExecutor(max_workers=worker_count) as executor: |
| futures = { |
| executor.submit(validate_entry, entry, workflow, fetchers, comparator): entry |
| for entry in entries |
| } |
| for future in as_completed(futures): |
| entry = futures[future] |
| try: |
| best_result, candidates = future.result() |
| except Exception: |
| best_result, candidates = None, [] |
| analysis.append((entry, best_result, candidates)) |
| return analysis |
|
|
|
|
| def _verify_entries( |
| entries: list[BibEntry], |
| workflow: Any, |
| fetchers: dict[str, Any], |
| comparator: MetadataComparator, |
| max_workers: int, |
| ) -> list[EntryReport]: |
| reports: list[EntryReport] = [] |
| for entry, best_result, _ in _analyze_entries(entries, workflow, fetchers, comparator, max_workers): |
| reports.append(EntryReport(entry=entry, comparison=best_result)) |
| return reports |
|
|
|
|
| def _record_sanitize_fixes( |
| fixed_details: dict[str, list[str]], |
| sanitize_fixes: dict[str, list[SanitizeFix]], |
| ) -> None: |
| for key, fixes in sanitize_fixes.items(): |
| fixed_details.setdefault(key, []) |
| fixed_details[key].extend(fix.description for fix in fixes) |
|
|
|
|
| def _apply_local_db( |
| entries: list[BibEntry], |
| fixed_details: dict[str, list[str]], |
| ) -> tuple[bool, list[BibEntry], int]: |
| local_db = _load_local_db() |
| if not local_db.is_loaded: |
| return False, entries, 0 |
|
|
| match_count = 0 |
| for entry in entries: |
| official = local_db.lookup(entry.title) |
| if official: |
| match_count += 1 |
|
|
| return True, entries, match_count |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _load_local_db() -> LocalConferenceDB: |
| local_db = LocalConferenceDB() |
| local_db.load() |
| return local_db |
|
|
|
|
| def _review_item(entry: BibEntry, best_result: Any, candidates: list[Any]) -> dict[str, Any]: |
| sorted_candidates = sorted(candidates, key=lambda item: item.confidence, reverse=True) |
| return { |
| "entry_key": entry.key, |
| "entry": entry, |
| "best_result": best_result, |
| "candidates": sorted_candidates, |
| } |
|
|
|
|
| def _review_payload_from_item(item: dict[str, Any]) -> dict[str, Any]: |
| return _review_payload( |
| item["entry"], |
| item.get("best_result"), |
| item.get("candidates", []), |
| ) |
|
|
|
|
| def _review_payload(entry: BibEntry, best_result: Any, candidates: list[Any]) -> dict[str, Any]: |
| return { |
| "key": entry.key, |
| "title": entry.title, |
| "reason": "; ".join(best_result.issues) if best_result and best_result.issues else "Ambiguous match", |
| "candidates": [ |
| { |
| "source": candidate.source, |
| "confidence": candidate.confidence, |
| "title": getattr(candidate.fetched_data, "title", ""), |
| "year": getattr(candidate.fetched_data, "year", ""), |
| "doi": getattr(candidate.fetched_data, "doi", ""), |
| } |
| for candidate in candidates[:5] |
| ], |
| } |
|
|
|
|
| def _write_bib(parser: BibParser, entries: list[BibEntry], original_stem: str) -> str: |
| out_dir = Path(tempfile.mkdtemp(prefix="refcheck_")) |
| out_path = out_dir / f"{original_stem or 'references'}_refcheck_fixed.bib" |
| parser.save_entries(str(out_path), entries) |
| return str(out_path) |
|
|
|
|
| def _write_report(markdown: str) -> str: |
| out_dir = Path(tempfile.mkdtemp(prefix="refcheck_report_")) |
| out_path = out_dir / "refcheck_report.md" |
| out_path.write_text(markdown, encoding="utf-8") |
| return str(out_path) |
|
|
|
|
| def _build_report(result: RefCheckResult, reports: list[EntryReport]) -> str: |
| lines = [ |
| "## RefCheck Report", |
| "", |
| "### Summary", |
| "", |
| f"- Input entries: {result.total_input}", |
| f"- Output entries: {result.total_output}", |
| f"- Verified after fix: {result.verified}", |
| f"- Remaining issues: {result.issues}", |
| f"- Not found after fix: {result.not_found}", |
| f"- Local DB loaded: {'yes' if result.local_db_loaded else 'no'}", |
| f"- Local DB matches: {result.local_matches}", |
| "", |
| ] |
|
|
| gate_status, gate_reasons = _submission_safety_gate(result) |
| lines.extend(["### Submission Safety Gate", ""]) |
| lines.append(f"- Status: **{gate_status}**") |
| for reason in gate_reasons: |
| lines.append(f"- {reason}") |
| lines.append("") |
|
|
| if result.removed_details: |
| lines.extend(["### Removed", ""]) |
| for key, title, reason in result.removed_details: |
| lines.append(f"- `{key}`: {title} ({reason})") |
| lines.append("") |
|
|
| if result.fixed_details: |
| lines.extend(["### Fixed", ""]) |
| for key, changes in sorted(result.fixed_details.items()): |
| lines.append(f"- `{key}`") |
| for change in changes: |
| lines.append(f" - {change}") |
| lines.append("") |
|
|
| if result.duplicate_details: |
| lines.extend(["### Duplicate Titles", ""]) |
| for title, keys in result.duplicate_details.items(): |
| lines.append(f"- `{', '.join(keys)}`: {title}") |
| lines.append("") |
|
|
| if result.review_details: |
| lines.extend(["### Needs Review", ""]) |
| for item in result.review_details: |
| lines.append(f"- `{item['key']}`: {item['title']}") |
| lines.append(f" - Reason: {item['reason']}") |
| for candidate in item["candidates"]: |
| lines.append( |
| " - Candidate: " |
| f"{candidate['source']} " |
| f"(confidence {candidate['confidence']:.2f}) " |
| f"{candidate['title']} " |
| f"{candidate['year']} " |
| f"{candidate['doi']}".strip() |
| ) |
| lines.append("") |
|
|
| remaining = [ |
| report |
| for report in reports |
| if report.comparison and not report.comparison.is_match |
| ] |
| if remaining: |
| lines.extend(["### Verification Issues", ""]) |
| for report in remaining: |
| comparison = report.comparison |
| issues = "; ".join(comparison.issues) if comparison.issues else "Not matched" |
| lines.append( |
| f"- `{report.entry.key}` via {comparison.source} " |
| f"(confidence {comparison.confidence:.2f}): {issues}" |
| ) |
| lines.append("") |
|
|
| return "\n".join(lines).strip() + "\n" |
|
|
|
|
| def _submission_safety_gate(result: RefCheckResult) -> tuple[str, list[str]]: |
| reasons = [] |
| if result.review_details: |
| reasons.append(f"FAIL: {len(result.review_details)} reference(s) still need manual review.") |
| if result.issues: |
| reasons.append(f"FAIL: {result.issues} reference(s) still have strict verification issues.") |
| if result.not_found: |
| reasons.append(f"FAIL: {result.not_found} reference(s) could not be found in configured sources.") |
| if result.removed_details: |
| reasons.append( |
| f"FAIL: {len(result.removed_details)} reference(s) were removed; confirm the paper text no longer cites them." |
| ) |
| if result.total_output and result.verified != result.total_output: |
| reasons.append(f"FAIL: only {result.verified}/{result.total_output} output reference(s) are strictly verified.") |
| if result.duplicate_details: |
| reasons.append(f"WARN: {len(result.duplicate_details)} duplicate title group(s) should be checked.") |
|
|
| failures = [reason for reason in reasons if reason.startswith("FAIL")] |
| if failures: |
| return "FAIL - do not submit yet", reasons |
| return "PASS - all output references are strictly verified", reasons or ["PASS: no unresolved reference risks detected."] |
|
|