| """ |
| Non-interactive RefCheck workflow for Hugging Face Spaces. |
| """ |
| from __future__ import annotations |
|
|
| import tempfile |
| from dataclasses import dataclass, field |
| from functools import lru_cache |
| from pathlib import Path |
| from typing import Any |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
| from main import ( |
| apply_fix, |
| apply_local_fix, |
| get_default_workflow, |
| validate_entry, |
| ) |
| from src.comparator import EntryReport, MetadataComparator |
| from src.fetcher import ( |
| ArxivFetcher, |
| CrossRefFetcher, |
| DBLPFetcher, |
| OpenAlexFetcher, |
| ScholarFetcher, |
| SemanticScholarFetcher, |
| ) |
| from src.local_db import LocalConferenceDB |
| from src.parser import BibEntry, BibParser |
| from src.sanitizer import BibSanitizer, SanitizeFix |
|
|
|
|
| @dataclass |
| class RefCheckOptions: |
| """Options for a non-interactive RefCheck run.""" |
|
|
| remove_unverified: bool = True |
| enable_google_scholar: bool = False |
| max_workers: int = 4 |
|
|
|
|
| @dataclass |
| class RefCheckResult: |
| """Artifacts and summary produced by a Space run.""" |
|
|
| total_input: int = 0 |
| total_output: int = 0 |
| verified: int = 0 |
| issues: int = 0 |
| not_found: int = 0 |
| fixed_details: dict[str, list[str]] = field(default_factory=dict) |
| removed_details: list[tuple[str, str, str]] = field(default_factory=list) |
| review_details: list[dict[str, Any]] = field(default_factory=list) |
| duplicate_details: dict[str, list[str]] = field(default_factory=dict) |
| sanitize_fixes: dict[str, list[SanitizeFix]] = field(default_factory=dict) |
| local_matches: int = 0 |
| local_db_loaded: bool = False |
| fixed_bib_path: str = "" |
| report_path: str = "" |
| report_markdown: str = "" |
|
|
|
|
| def run_refcheck_file(file_path: str | Path, options: RefCheckOptions | None = None) -> RefCheckResult: |
| """Validate and fix an uploaded BibTeX file without interactive prompts.""" |
| options = options or RefCheckOptions() |
| source_path = Path(file_path) |
| parser = BibParser() |
| entries = parser.parse_file(str(source_path)) |
| result = RefCheckResult(total_input=len(entries)) |
|
|
| if not entries: |
| result.report_markdown = "## RefCheck Report\n\nNo BibTeX entries were found." |
| result.report_path = _write_report(result.report_markdown) |
| result.fixed_bib_path = _write_bib(parser, [], source_path.stem) |
| return result |
|
|
| sanitizer = BibSanitizer() |
| result.sanitize_fixes = sanitizer.sanitize_all(entries) |
| _record_sanitize_fixes(result.fixed_details, result.sanitize_fixes) |
| result.duplicate_details = sanitizer.find_duplicates(entries) |
|
|
| result.local_db_loaded, api_entries, result.local_matches = _apply_local_db(entries, result.fixed_details) |
|
|
| fetchers = _build_fetchers() |
| workflow = get_default_workflow() |
| for step in workflow.steps: |
| if step.name == "google_scholar": |
| step.enabled = options.enable_google_scholar |
|
|
| comparator = MetadataComparator() |
| analysis = _analyze_entries(api_entries, workflow, fetchers, comparator, options.max_workers) |
|
|
| actions: dict[str, tuple[str, Any, list[Any]]] = {} |
|
|
| for entry, best_result, candidates in analysis: |
| if not best_result: |
| actions[entry.key] = ("keep", None, []) |
| elif getattr(entry, "_force_api_lookup", False) and best_result.fetched_data: |
| actions[entry.key] = ("fix", best_result, candidates) |
| elif best_result.confidence > 0.85 and best_result.fetched_data: |
| actions[entry.key] = ("fix", best_result, candidates) |
| elif best_result.is_match: |
| actions[entry.key] = ("keep", best_result, candidates) |
| elif candidates: |
| actions[entry.key] = ("review", best_result, candidates) |
| else: |
| actions[entry.key] = ("remove", best_result, candidates) |
|
|
| updated_entries: list[BibEntry] = [] |
|
|
| for entry in entries: |
| action, best_result, candidates = actions.get(entry.key, ("keep", None, [])) |
|
|
| if action == "fix": |
| changes = apply_fix(entry, best_result.fetched_data, all_candidates=candidates) |
| if changes: |
| result.fixed_details.setdefault(entry.key, []).extend(changes) |
| updated_entries.append(entry) |
| elif action == "review": |
| result.review_details.append(_review_payload(entry, best_result, candidates)) |
| updated_entries.append(entry) |
| elif action == "remove": |
| if options.remove_unverified: |
| result.removed_details.append((entry.key, entry.title, "No matching metadata found in any source")) |
| else: |
| result.review_details.append( |
| { |
| "key": entry.key, |
| "title": entry.title, |
| "reason": "No matching metadata found in any source", |
| "candidates": [], |
| } |
| ) |
| updated_entries.append(entry) |
| else: |
| updated_entries.append(entry) |
|
|
| result.total_output = len(updated_entries) |
| fixed_path = _write_bib(parser, updated_entries, source_path.stem) |
| result.fixed_bib_path = fixed_path |
|
|
| verified_entries = parser.parse_file(fixed_path) |
| verification_reports = _verify_entries( |
| verified_entries, |
| workflow, |
| fetchers, |
| comparator, |
| options.max_workers, |
| ) |
| result.verified = sum(1 for r in verification_reports if r.comparison and r.comparison.is_match) |
| result.issues = sum(1 for r in verification_reports if r.comparison and r.comparison.has_issues) |
| result.not_found = sum( |
| 1 |
| for r in verification_reports |
| if r.comparison and not r.comparison.is_match and not r.comparison.has_issues |
| ) |
|
|
| result.report_markdown = _build_report(result, verification_reports) |
| result.report_path = _write_report(result.report_markdown) |
| return result |
|
|
|
|
| def _build_fetchers() -> dict[str, Any]: |
| return { |
| "arxiv": ArxivFetcher(), |
| "crossref": CrossRefFetcher(), |
| "scholar": ScholarFetcher(), |
| "semantic": SemanticScholarFetcher(), |
| "openalex": OpenAlexFetcher(), |
| "dblp": DBLPFetcher(), |
| } |
|
|
|
|
| def _analyze_entries( |
| entries: list[BibEntry], |
| workflow: Any, |
| fetchers: dict[str, Any], |
| comparator: MetadataComparator, |
| max_workers: int, |
| ) -> list[tuple[BibEntry, Any, list[Any]]]: |
| if not entries: |
| return [] |
|
|
| analysis: list[tuple[BibEntry, Any, list[Any]]] = [] |
| worker_count = min(max(1, max_workers), len(entries)) |
| with ThreadPoolExecutor(max_workers=worker_count) as executor: |
| futures = { |
| executor.submit(validate_entry, entry, workflow, fetchers, comparator): entry |
| for entry in entries |
| } |
| for future in as_completed(futures): |
| entry = futures[future] |
| try: |
| best_result, candidates = future.result() |
| except Exception: |
| best_result, candidates = None, [] |
| analysis.append((entry, best_result, candidates)) |
| return analysis |
|
|
|
|
| def _verify_entries( |
| entries: list[BibEntry], |
| workflow: Any, |
| fetchers: dict[str, Any], |
| comparator: MetadataComparator, |
| max_workers: int, |
| ) -> list[EntryReport]: |
| reports: list[EntryReport] = [] |
| for entry, best_result, _ in _analyze_entries(entries, workflow, fetchers, comparator, max_workers): |
| reports.append(EntryReport(entry=entry, comparison=best_result)) |
| return reports |
|
|
|
|
| def _record_sanitize_fixes( |
| fixed_details: dict[str, list[str]], |
| sanitize_fixes: dict[str, list[SanitizeFix]], |
| ) -> None: |
| for key, fixes in sanitize_fixes.items(): |
| fixed_details.setdefault(key, []) |
| fixed_details[key].extend(fix.description for fix in fixes) |
|
|
|
|
| def _apply_local_db( |
| entries: list[BibEntry], |
| fixed_details: dict[str, list[str]], |
| ) -> tuple[bool, list[BibEntry], int]: |
| local_db = _load_local_db() |
| if not local_db.is_loaded: |
| return False, entries, 0 |
|
|
| api_entries = [] |
| match_count = 0 |
| for entry in entries: |
| official = local_db.lookup(entry.title) |
| if not official: |
| api_entries.append(entry) |
| continue |
|
|
| changes = apply_local_fix(entry, official) |
| match_count += 1 |
| if changes: |
| fixed_details.setdefault(entry.key, []).extend(changes) |
|
|
| return True, api_entries, match_count |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _load_local_db() -> LocalConferenceDB: |
| local_db = LocalConferenceDB() |
| local_db.load() |
| return local_db |
|
|
|
|
| def _review_payload(entry: BibEntry, best_result: Any, candidates: list[Any]) -> dict[str, Any]: |
| return { |
| "key": entry.key, |
| "title": entry.title, |
| "reason": "; ".join(best_result.issues) if best_result and best_result.issues else "Ambiguous match", |
| "candidates": [ |
| { |
| "source": candidate.source, |
| "confidence": candidate.confidence, |
| "title": getattr(candidate.fetched_data, "title", ""), |
| "year": getattr(candidate.fetched_data, "year", ""), |
| "doi": getattr(candidate.fetched_data, "doi", ""), |
| } |
| for candidate in candidates[:5] |
| ], |
| } |
|
|
|
|
| def _write_bib(parser: BibParser, entries: list[BibEntry], original_stem: str) -> str: |
| out_dir = Path(tempfile.mkdtemp(prefix="refcheck_")) |
| out_path = out_dir / f"{original_stem or 'references'}_refcheck_fixed.bib" |
| parser.save_entries(str(out_path), entries) |
| return str(out_path) |
|
|
|
|
| def _write_report(markdown: str) -> str: |
| out_dir = Path(tempfile.mkdtemp(prefix="refcheck_report_")) |
| out_path = out_dir / "refcheck_report.md" |
| out_path.write_text(markdown, encoding="utf-8") |
| return str(out_path) |
|
|
|
|
| def _build_report(result: RefCheckResult, reports: list[EntryReport]) -> str: |
| lines = [ |
| "## RefCheck Report", |
| "", |
| "### Summary", |
| "", |
| f"- Input entries: {result.total_input}", |
| f"- Output entries: {result.total_output}", |
| f"- Verified after fix: {result.verified}", |
| f"- Remaining issues: {result.issues}", |
| f"- Not found after fix: {result.not_found}", |
| f"- Local DB loaded: {'yes' if result.local_db_loaded else 'no'}", |
| f"- Local DB matches: {result.local_matches}", |
| "", |
| ] |
|
|
| if result.removed_details: |
| lines.extend(["### Removed", ""]) |
| for key, title, reason in result.removed_details: |
| lines.append(f"- `{key}`: {title} ({reason})") |
| lines.append("") |
|
|
| if result.fixed_details: |
| lines.extend(["### Fixed", ""]) |
| for key, changes in sorted(result.fixed_details.items()): |
| lines.append(f"- `{key}`") |
| for change in changes: |
| lines.append(f" - {change}") |
| lines.append("") |
|
|
| if result.duplicate_details: |
| lines.extend(["### Duplicate Titles", ""]) |
| for title, keys in result.duplicate_details.items(): |
| lines.append(f"- `{', '.join(keys)}`: {title}") |
| lines.append("") |
|
|
| if result.review_details: |
| lines.extend(["### Needs Review", ""]) |
| for item in result.review_details: |
| lines.append(f"- `{item['key']}`: {item['title']}") |
| lines.append(f" - Reason: {item['reason']}") |
| for candidate in item["candidates"]: |
| lines.append( |
| " - Candidate: " |
| f"{candidate['source']} " |
| f"(confidence {candidate['confidence']:.2f}) " |
| f"{candidate['title']} " |
| f"{candidate['year']} " |
| f"{candidate['doi']}".strip() |
| ) |
| lines.append("") |
|
|
| remaining = [ |
| report |
| for report in reports |
| if report.comparison and not report.comparison.is_match |
| ] |
| if remaining: |
| lines.extend(["### Verification Issues", ""]) |
| for report in remaining: |
| comparison = report.comparison |
| issues = "; ".join(comparison.issues) if comparison.issues else "Not matched" |
| lines.append( |
| f"- `{report.entry.key}` via {comparison.source} " |
| f"(confidence {comparison.confidence:.2f}): {issues}" |
| ) |
| lines.append("") |
|
|
| return "\n".join(lines).strip() + "\n" |
|
|