RefCheck / src /space_service.py
voidful's picture
Add submission safety gate to reports
a82f053 verified
"""
Non-interactive RefCheck workflow for Hugging Face Spaces.
"""
from __future__ import annotations
import copy
import tempfile
from dataclasses import dataclass, field
from functools import lru_cache
from pathlib import Path
from typing import Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from main import (
apply_fix,
get_default_workflow,
validate_entry,
)
from src.comparator import EntryReport, MetadataComparator
from src.fetcher import (
ArxivFetcher,
CrossRefFetcher,
DBLPFetcher,
OpenAlexFetcher,
ScholarFetcher,
SemanticScholarFetcher,
)
from src.local_db import LocalConferenceDB
from src.parser import BibEntry, BibParser
from src.sanitizer import BibSanitizer, SanitizeFix
@dataclass
class RefCheckOptions:
"""Options for a non-interactive RefCheck run."""
remove_unverified: bool = True
enable_google_scholar: bool = False
max_workers: int = 4
@dataclass
class RefCheckResult:
"""Artifacts and summary produced by a Space run."""
source_stem: str = "references"
total_input: int = 0
total_output: int = 0
verified: int = 0
issues: int = 0
not_found: int = 0
entries: list[BibEntry] = field(default_factory=list)
review_items: list[dict[str, Any]] = field(default_factory=list)
fixed_details: dict[str, list[str]] = field(default_factory=dict)
removed_details: list[tuple[str, str, str]] = field(default_factory=list)
review_details: list[dict[str, Any]] = field(default_factory=list)
duplicate_details: dict[str, list[str]] = field(default_factory=dict)
sanitize_fixes: dict[str, list[SanitizeFix]] = field(default_factory=dict)
local_matches: int = 0
local_db_loaded: bool = False
fixed_bib_path: str = ""
report_path: str = ""
report_markdown: str = ""
def run_refcheck_file(file_path: str | Path, options: RefCheckOptions | None = None) -> RefCheckResult:
"""Validate and fix an uploaded BibTeX file without interactive prompts."""
options = options or RefCheckOptions()
source_path = Path(file_path)
parser = BibParser()
entries = parser.parse_file(str(source_path))
result = RefCheckResult(source_stem=source_path.stem or "references", total_input=len(entries))
if not entries:
result.report_markdown = "## RefCheck Report\n\nNo BibTeX entries were found."
result.report_path = _write_report(result.report_markdown)
result.fixed_bib_path = _write_bib(parser, [], result.source_stem)
return result
sanitizer = BibSanitizer()
result.sanitize_fixes = sanitizer.sanitize_all(entries)
_record_sanitize_fixes(result.fixed_details, result.sanitize_fixes)
result.duplicate_details = sanitizer.find_duplicates(entries)
result.local_db_loaded, api_entries, result.local_matches = _apply_local_db(entries, result.fixed_details)
fetchers = _build_fetchers()
workflow = get_default_workflow()
for step in workflow.steps:
if step.name == "google_scholar":
step.enabled = options.enable_google_scholar
comparator = MetadataComparator()
analysis = _analyze_entries(api_entries, workflow, fetchers, comparator, options.max_workers)
actions: dict[str, tuple[str, Any, list[Any]]] = {}
for entry, best_result, candidates in analysis:
if not best_result:
actions[entry.key] = ("keep", None, [])
elif best_result.is_match and best_result.fetched_data:
actions[entry.key] = ("fix", best_result, candidates)
elif candidates:
actions[entry.key] = ("review", best_result, candidates)
else:
actions[entry.key] = ("remove", best_result, candidates)
updated_entries: list[BibEntry] = []
for entry in entries:
action, best_result, candidates = actions.get(entry.key, ("keep", None, []))
if action == "fix":
changes = apply_fix(entry, best_result.fetched_data, all_candidates=candidates)
if changes:
result.fixed_details.setdefault(entry.key, []).extend(changes)
updated_entries.append(entry)
elif action == "review":
result.review_items.append(_review_item(entry, best_result, candidates))
updated_entries.append(entry)
elif action == "remove":
if options.remove_unverified:
result.removed_details.append((entry.key, entry.title, "No matching metadata found in any source"))
else:
result.review_items.append(_review_item(entry, best_result, candidates))
updated_entries.append(entry)
else:
updated_entries.append(entry)
result.entries = updated_entries
return finalize_result(result, options)
def finalize_result(result: RefCheckResult, options: RefCheckOptions | None = None) -> RefCheckResult:
"""Write current entries, re-verify them, and refresh downloadable artifacts."""
options = options or RefCheckOptions()
parser = BibParser()
fetchers = _build_fetchers()
workflow = get_default_workflow()
for step in workflow.steps:
if step.name == "google_scholar":
step.enabled = options.enable_google_scholar
comparator = MetadataComparator()
result.review_details = [_review_payload_from_item(item) for item in result.review_items]
result.total_output = len(result.entries)
fixed_path = _write_bib(parser, result.entries, result.source_stem)
result.fixed_bib_path = fixed_path
verified_entries = parser.parse_file(fixed_path)
verification_reports = _verify_entries(
verified_entries,
workflow,
fetchers,
comparator,
options.max_workers,
)
result.verified = sum(1 for r in verification_reports if r.comparison and r.comparison.is_match)
result.issues = sum(1 for r in verification_reports if r.comparison and r.comparison.has_issues)
result.not_found = sum(
1
for r in verification_reports
if r.comparison and not r.comparison.is_match and not r.comparison.has_issues
)
result.report_markdown = _build_report(result, verification_reports)
result.report_path = _write_report(result.report_markdown)
return result
def preview_review_action(
result: RefCheckResult | None,
review_index: int,
action: str,
candidate_index: int | None = None,
options: RefCheckOptions | None = None,
) -> str:
"""Preview and test a manual review action without mutating the session."""
if not result or not result.review_items:
return "No unresolved entries are available."
if review_index < 0 or review_index >= len(result.review_items):
return "Select an unresolved entry first."
options = options or RefCheckOptions()
item = result.review_items[review_index]
entry = _find_entry(result.entries, item["entry_key"])
if not entry:
return "The selected entry is no longer in the working bibliography."
if action == "keep":
return _entry_preview_markdown(entry, "Keep original entry", ["No metadata changes will be applied."])
if action == "remove":
return _entry_preview_markdown(entry, "Remove entry", ["This entry will be removed from the exported BibTeX."])
if action != "candidate":
return "Select a candidate, keep, or remove action."
candidates = item.get("candidates", [])
if candidate_index is None or candidate_index < 0 or candidate_index >= len(candidates):
return "Select a candidate first."
candidate = candidates[candidate_index]
if not _candidate_exact_match(candidate):
return _entry_preview_markdown(
entry,
"Candidate blocked",
[
"This candidate is not an exact title/author/year match, so RefCheck will not auto-apply it.",
f"Candidate source: {candidate.source}",
f"Candidate confidence: {candidate.confidence:.2f}",
*_candidate_issue_lines(candidate),
],
)
temp_entry = copy.deepcopy(entry)
changes = apply_fix(temp_entry, candidate.fetched_data, allow_optional_updates=True)
if not changes:
changes = ["No field-level changes are needed for this candidate."]
fetchers = _build_fetchers()
workflow = get_default_workflow()
for step in workflow.steps:
if step.name == "google_scholar":
step.enabled = options.enable_google_scholar
comparator = MetadataComparator()
best_result, _ = validate_entry(temp_entry, workflow, fetchers, comparator)
test_lines = [
f"Candidate source: {candidate.source}",
f"Candidate confidence before apply: {candidate.confidence:.2f}",
]
if best_result:
test_lines.extend(
[
f"Verification source after apply: {best_result.source}",
f"Verification confidence after apply: {best_result.confidence:.2f}",
f"Verified after apply: {'yes' if best_result.is_match else 'no'}",
]
)
if best_result.issues:
test_lines.append(f"Remaining issues: {'; '.join(best_result.issues)}")
return _entry_preview_markdown(temp_entry, "Candidate test", changes + test_lines)
def apply_review_action(
result: RefCheckResult | None,
review_index: int,
action: str,
candidate_index: int | None = None,
options: RefCheckOptions | None = None,
) -> RefCheckResult:
"""Apply a manual review action to the working bibliography."""
if not result or not result.review_items:
raise ValueError("No unresolved entries are available.")
if review_index < 0 or review_index >= len(result.review_items):
raise ValueError("Select an unresolved entry first.")
options = options or RefCheckOptions()
item = result.review_items[review_index]
entry = _find_entry(result.entries, item["entry_key"])
if not entry:
raise ValueError("The selected entry is no longer in the working bibliography.")
if action == "candidate":
candidates = item.get("candidates", [])
if candidate_index is None or candidate_index < 0 or candidate_index >= len(candidates):
raise ValueError("Select a candidate first.")
candidate = candidates[candidate_index]
if not _candidate_exact_match(candidate):
raise ValueError(
"Selected candidate is not an exact title/author/year match; RefCheck will not auto-overwrite core metadata."
)
changes = apply_fix(entry, candidate.fetched_data, allow_optional_updates=True)
changes.append(f"Resolved manually with candidate from {candidate.source}.")
result.fixed_details.setdefault(entry.key, []).extend(changes)
elif action == "remove":
result.entries = [existing for existing in result.entries if existing.key != entry.key]
result.removed_details.append((entry.key, entry.title, "Removed during manual review"))
elif action == "keep":
result.fixed_details.setdefault(entry.key, []).append("Marked as manually reviewed; kept original entry.")
else:
raise ValueError("Select a candidate, keep, or remove action.")
del result.review_items[review_index]
return finalize_result(result, options)
def _find_entry(entries: list[BibEntry], key: str) -> BibEntry | None:
for entry in entries:
if entry.key == key:
return entry
return None
def _candidate_exact_match(candidate: Any) -> bool:
return bool(
candidate
and getattr(candidate, "is_match", False)
and getattr(candidate, "title_match", False)
and getattr(candidate, "author_match", False)
and getattr(candidate, "year_match", False)
and not getattr(candidate, "author_initial_conflict", False)
)
def _candidate_issue_lines(candidate: Any) -> list[str]:
lines = list(getattr(candidate, "issues", []) or [])
if not getattr(candidate, "title_match", False):
lines.append("Title is not an exact-enough match")
if not getattr(candidate, "author_match", False):
lines.append("Authors are not an exact-enough match")
if not getattr(candidate, "year_match", False):
bib_year = getattr(candidate, "bib_year", "") or "[missing]"
fetched_year = getattr(candidate, "fetched_year", "") or "[missing]"
lines.append(f"Year mismatch: bib={bib_year}, candidate={fetched_year}")
return [f"Blocking issue: {line}" for line in dict.fromkeys(lines)]
def _entry_preview_markdown(entry: BibEntry, title: str, lines: list[str]) -> str:
body = "\n".join(f"- {line}" for line in lines)
return (
f"### {title}\n\n"
f"**Key:** `{entry.key}`\n\n"
f"**Title:** {entry.title or '[missing]'}\n\n"
f"**Authors:** {entry.author or '[missing]'}\n\n"
f"**Year:** {entry.year or '[missing]'}\n\n"
f"{body}"
)
def _build_fetchers() -> dict[str, Any]:
return {
"arxiv": ArxivFetcher(),
"crossref": CrossRefFetcher(),
"scholar": ScholarFetcher(),
"semantic": SemanticScholarFetcher(),
"openalex": OpenAlexFetcher(),
"dblp": DBLPFetcher(),
}
def _analyze_entries(
entries: list[BibEntry],
workflow: Any,
fetchers: dict[str, Any],
comparator: MetadataComparator,
max_workers: int,
) -> list[tuple[BibEntry, Any, list[Any]]]:
if not entries:
return []
analysis: list[tuple[BibEntry, Any, list[Any]]] = []
worker_count = min(max(1, max_workers), len(entries))
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = {
executor.submit(validate_entry, entry, workflow, fetchers, comparator): entry
for entry in entries
}
for future in as_completed(futures):
entry = futures[future]
try:
best_result, candidates = future.result()
except Exception:
best_result, candidates = None, []
analysis.append((entry, best_result, candidates))
return analysis
def _verify_entries(
entries: list[BibEntry],
workflow: Any,
fetchers: dict[str, Any],
comparator: MetadataComparator,
max_workers: int,
) -> list[EntryReport]:
reports: list[EntryReport] = []
for entry, best_result, _ in _analyze_entries(entries, workflow, fetchers, comparator, max_workers):
reports.append(EntryReport(entry=entry, comparison=best_result))
return reports
def _record_sanitize_fixes(
fixed_details: dict[str, list[str]],
sanitize_fixes: dict[str, list[SanitizeFix]],
) -> None:
for key, fixes in sanitize_fixes.items():
fixed_details.setdefault(key, [])
fixed_details[key].extend(fix.description for fix in fixes)
def _apply_local_db(
entries: list[BibEntry],
fixed_details: dict[str, list[str]],
) -> tuple[bool, list[BibEntry], int]:
local_db = _load_local_db()
if not local_db.is_loaded:
return False, entries, 0
match_count = 0
for entry in entries:
official = local_db.lookup(entry.title)
if official:
match_count += 1
return True, entries, match_count
@lru_cache(maxsize=1)
def _load_local_db() -> LocalConferenceDB:
local_db = LocalConferenceDB()
local_db.load()
return local_db
def _review_item(entry: BibEntry, best_result: Any, candidates: list[Any]) -> dict[str, Any]:
sorted_candidates = sorted(candidates, key=lambda item: item.confidence, reverse=True)
return {
"entry_key": entry.key,
"entry": entry,
"best_result": best_result,
"candidates": sorted_candidates,
}
def _review_payload_from_item(item: dict[str, Any]) -> dict[str, Any]:
return _review_payload(
item["entry"],
item.get("best_result"),
item.get("candidates", []),
)
def _review_payload(entry: BibEntry, best_result: Any, candidates: list[Any]) -> dict[str, Any]:
return {
"key": entry.key,
"title": entry.title,
"reason": "; ".join(best_result.issues) if best_result and best_result.issues else "Ambiguous match",
"candidates": [
{
"source": candidate.source,
"confidence": candidate.confidence,
"title": getattr(candidate.fetched_data, "title", ""),
"year": getattr(candidate.fetched_data, "year", ""),
"doi": getattr(candidate.fetched_data, "doi", ""),
}
for candidate in candidates[:5]
],
}
def _write_bib(parser: BibParser, entries: list[BibEntry], original_stem: str) -> str:
out_dir = Path(tempfile.mkdtemp(prefix="refcheck_"))
out_path = out_dir / f"{original_stem or 'references'}_refcheck_fixed.bib"
parser.save_entries(str(out_path), entries)
return str(out_path)
def _write_report(markdown: str) -> str:
out_dir = Path(tempfile.mkdtemp(prefix="refcheck_report_"))
out_path = out_dir / "refcheck_report.md"
out_path.write_text(markdown, encoding="utf-8")
return str(out_path)
def _build_report(result: RefCheckResult, reports: list[EntryReport]) -> str:
lines = [
"## RefCheck Report",
"",
"### Summary",
"",
f"- Input entries: {result.total_input}",
f"- Output entries: {result.total_output}",
f"- Verified after fix: {result.verified}",
f"- Remaining issues: {result.issues}",
f"- Not found after fix: {result.not_found}",
f"- Local DB loaded: {'yes' if result.local_db_loaded else 'no'}",
f"- Local DB matches: {result.local_matches}",
"",
]
gate_status, gate_reasons = _submission_safety_gate(result)
lines.extend(["### Submission Safety Gate", ""])
lines.append(f"- Status: **{gate_status}**")
for reason in gate_reasons:
lines.append(f"- {reason}")
lines.append("")
if result.removed_details:
lines.extend(["### Removed", ""])
for key, title, reason in result.removed_details:
lines.append(f"- `{key}`: {title} ({reason})")
lines.append("")
if result.fixed_details:
lines.extend(["### Fixed", ""])
for key, changes in sorted(result.fixed_details.items()):
lines.append(f"- `{key}`")
for change in changes:
lines.append(f" - {change}")
lines.append("")
if result.duplicate_details:
lines.extend(["### Duplicate Titles", ""])
for title, keys in result.duplicate_details.items():
lines.append(f"- `{', '.join(keys)}`: {title}")
lines.append("")
if result.review_details:
lines.extend(["### Needs Review", ""])
for item in result.review_details:
lines.append(f"- `{item['key']}`: {item['title']}")
lines.append(f" - Reason: {item['reason']}")
for candidate in item["candidates"]:
lines.append(
" - Candidate: "
f"{candidate['source']} "
f"(confidence {candidate['confidence']:.2f}) "
f"{candidate['title']} "
f"{candidate['year']} "
f"{candidate['doi']}".strip()
)
lines.append("")
remaining = [
report
for report in reports
if report.comparison and not report.comparison.is_match
]
if remaining:
lines.extend(["### Verification Issues", ""])
for report in remaining:
comparison = report.comparison
issues = "; ".join(comparison.issues) if comparison.issues else "Not matched"
lines.append(
f"- `{report.entry.key}` via {comparison.source} "
f"(confidence {comparison.confidence:.2f}): {issues}"
)
lines.append("")
return "\n".join(lines).strip() + "\n"
def _submission_safety_gate(result: RefCheckResult) -> tuple[str, list[str]]:
reasons = []
if result.review_details:
reasons.append(f"FAIL: {len(result.review_details)} reference(s) still need manual review.")
if result.issues:
reasons.append(f"FAIL: {result.issues} reference(s) still have strict verification issues.")
if result.not_found:
reasons.append(f"FAIL: {result.not_found} reference(s) could not be found in configured sources.")
if result.removed_details:
reasons.append(
f"FAIL: {len(result.removed_details)} reference(s) were removed; confirm the paper text no longer cites them."
)
if result.total_output and result.verified != result.total_output:
reasons.append(f"FAIL: only {result.verified}/{result.total_output} output reference(s) are strictly verified.")
if result.duplicate_details:
reasons.append(f"WARN: {len(result.duplicate_details)} duplicate title group(s) should be checked.")
failures = [reason for reason in reasons if reason.startswith("FAIL")]
if failures:
return "FAIL - do not submit yet", reasons
return "PASS - all output references are strictly verified", reasons or ["PASS: no unresolved reference risks detected."]