| """ |
| Metadata comparison between bib entries and fetched metadata. |
| """ |
| from datetime import datetime |
| from dataclasses import dataclass |
| from typing import Optional, List, Union, Any, Tuple |
|
|
| from .parser import BibEntry |
| from .utils import TextNormalizer |
|
|
| CURRENT_YEAR = datetime.now().year |
|
|
| |
| YEAR_SOURCE_PRIORITY = { |
| "crossref": 0, |
| "dblp": 1, |
| "openalex": 2, |
| "semantic_scholar": 3, |
| "arxiv_journal_ref": 4, |
| "scholar": 5, |
| "arxiv": 99, |
| } |
|
|
|
|
| def resolve_year(candidates: list, bib_year: str = "") -> Tuple[Optional[str], Optional[str]]: |
| """ |
| Pick the best year across all candidate results using source priority. |
| Conference/journal year always beats arXiv submission year. |
| Never returns a future year. |
| |
| Args: |
| candidates: list of ComparisonResult objects |
| bib_year: the current bib entry year (fallback) |
| Returns: |
| (best_year, best_source) or (None, None) |
| """ |
| pool = [] |
| for cand in candidates: |
| if not cand or not cand.fetched_data: |
| continue |
| source = cand.source |
| fetched_year = str(getattr(cand.fetched_data, 'year', '') or '').strip() |
| |
| if not fetched_year or not fetched_year.isdigit(): |
| continue |
| |
| |
| conf_year = str(getattr(cand.fetched_data, 'conference_year', '') or '').strip() |
| if source == "arxiv" and conf_year and conf_year.isdigit(): |
| pool.append((YEAR_SOURCE_PRIORITY.get("arxiv_journal_ref", 4), conf_year, "arxiv_journal_ref")) |
| |
| priority = YEAR_SOURCE_PRIORITY.get(source, 50) |
| pool.append((priority, fetched_year, source)) |
| |
| if not pool: |
| return None, None |
| |
| pool.sort() |
| |
| |
| for _, year, source in pool: |
| if int(year) <= CURRENT_YEAR: |
| return year, source |
| |
| |
| return None, None |
|
|
|
|
| @dataclass |
| class ComparisonResult: |
| """Result of comparing bib entry with fetched metadata.""" |
| entry_key: str |
| |
| |
| title_match: bool |
| title_similarity: float |
| bib_title: str |
| fetched_title: str |
| |
| |
| author_match: bool |
| author_similarity: float |
| bib_authors: list[str] |
| fetched_authors: list[str] |
| |
| |
| year_match: bool |
| bib_year: str |
| fetched_year: str |
| |
| |
| is_match: bool |
| confidence: float |
| issues: list[str] |
| source: str |
| |
| |
| fetched_data: Any = None |
| |
| |
| author_initial_conflict: bool = False |
| |
| @property |
| def has_issues(self) -> bool: |
| return len(self.issues) > 0 |
|
|
| @dataclass |
| class EntryReport: |
| """Complete report for a single bib entry.""" |
| entry: BibEntry |
| comparison: Optional[ComparisonResult] |
| evaluations: list = None |
| |
| def __post_init__(self): |
| if self.evaluations is None: |
| self.evaluations = [] |
|
|
|
|
|
|
| class MetadataComparator: |
| """Compares bibliography entries with fetched metadata.""" |
| |
| |
| TITLE_THRESHOLD = 0.8 |
| AUTHOR_THRESHOLD = 0.6 |
| |
| def __init__(self): |
| self.normalizer = TextNormalizer |
| |
| def compare(self, bib_entry: BibEntry, fetched_data: Any, source_name: str) -> ComparisonResult: |
| """ |
| Generic comparison method for any data source. |
| fetched_data must have 'title', 'year', and 'authors' attributes. |
| """ |
| issues = [] |
| |
| |
| bib_title_norm = self.normalizer.normalize_for_comparison(bib_entry.title) |
| fetched_title_norm = self.normalizer.normalize_for_comparison(fetched_data.title) |
| |
| title_similarity = self.normalizer.similarity_ratio(bib_title_norm, fetched_title_norm) |
| if len(bib_title_norm) < 100: |
| lev_sim = self.normalizer.levenshtein_similarity(bib_title_norm, fetched_title_norm) |
| title_similarity = max(title_similarity, lev_sim) |
| |
| title_match = title_similarity >= self.TITLE_THRESHOLD |
| if not title_match: |
| issues.append(f"Title mismatch (similarity: {title_similarity:.2%})") |
| |
| |
| bib_authors = self.normalizer.normalize_author_list(bib_entry.author) |
| |
| |
| raw_author_list = self.normalizer.parse_author_list(bib_entry.author) |
| for raw_auth in raw_author_list: |
| if self.normalizer.has_dblp_disambiguation_id(raw_auth.strip()): |
| issues.append(f"DBLP disambiguation ID in author: '{raw_auth.strip()}'") |
| |
| |
| fetched_authors_raw = getattr(fetched_data, 'authors', []) |
| if isinstance(fetched_authors_raw, str): |
| |
| fetched_authors_raw = [a.strip() for a in fetched_authors_raw.split(',')] |
| |
| fetched_authors = [ |
| self.normalizer.normalize_author_name(str(a)) |
| for a in fetched_authors_raw |
| ] |
| |
| author_similarity = self._compare_author_lists(bib_authors, fetched_authors) |
| author_match = author_similarity >= self.AUTHOR_THRESHOLD |
|
|
| allows_truncated_authors = any( |
| token in str(raw_author).lower() |
| for raw_author in raw_author_list |
| for token in ("others", "et al") |
| ) |
| if ( |
| author_match |
| and bib_authors |
| and fetched_authors |
| and len(bib_authors) != len(fetched_authors) |
| and not allows_truncated_authors |
| ): |
| author_match = False |
| issues.append(f"Author count mismatch: bib={len(bib_authors)}, fetched={len(fetched_authors)}") |
| |
| if not author_match: |
| issues.append(f"Author mismatch (similarity: {author_similarity:.2%})") |
| |
| |
| bib_year = str(bib_entry.year).strip() |
| fetched_year = str(getattr(fetched_data, 'year', '')).strip() |
| conference_year = str(getattr(fetched_data, 'conference_year', '') or '').strip() |
| if source_name.startswith("arxiv") and conference_year and conference_year.isdigit(): |
| fetched_year = conference_year |
| year_match = bool(bib_year and fetched_year and bib_year == fetched_year) |
|
|
| if not bib_year: |
| issues.append("Missing year in BibTeX entry") |
| elif not fetched_year: |
| issues.append(f"Missing year from {source_name} metadata") |
| elif not year_match: |
| issues.append(f"Year mismatch: bib={bib_year}, {source_name}={fetched_year}") |
| |
| |
| is_match = title_match and author_match and year_match |
| |
| confidence = ( |
| title_similarity * 0.5 + |
| author_similarity * 0.3 + |
| (1.0 if year_match else 0.5) * 0.2 |
| ) |
| if not year_match: |
| |
| confidence = min(confidence, 0.8) |
| |
| |
| author_initial_conflict = self._check_author_initial_conflict( |
| bib_authors, fetched_authors, |
| self.normalizer.parse_author_list(bib_entry.author), |
| fetched_authors_raw |
| ) |
| if author_initial_conflict: |
| issues.append("Author initial conflict detected (e.g., first-name initials differ)") |
| |
| confidence = min(confidence, 0.7) |
| |
| return ComparisonResult( |
| entry_key=bib_entry.key, |
| title_match=title_match, |
| title_similarity=title_similarity, |
| bib_title=bib_entry.title, |
| fetched_title=fetched_data.title, |
| author_match=author_match, |
| author_similarity=author_similarity, |
| bib_authors=bib_authors, |
| fetched_authors=fetched_authors, |
| year_match=year_match, |
| bib_year=bib_year, |
| fetched_year=fetched_year, |
| is_match=is_match, |
| confidence=confidence, |
| issues=issues, |
| source=source_name, |
| fetched_data=fetched_data, |
| author_initial_conflict=author_initial_conflict |
| ) |
|
|
| def create_unable_result(self, bib_entry: BibEntry, reason: str = "Unable to fetch metadata") -> ComparisonResult: |
| """Create result when metadata couldn't be fetched.""" |
| return ComparisonResult( |
| entry_key=bib_entry.key, |
| title_match=False, title_similarity=0.0, |
| bib_title=bib_entry.title, fetched_title="", |
| author_match=False, author_similarity=0.0, |
| bib_authors=self.normalizer.normalize_author_list(bib_entry.author), fetched_authors=[], |
| year_match=False, bib_year=bib_entry.year, fetched_year="", |
| is_match=False, confidence=0.0, |
| issues=[reason], source="unable", |
| fetched_data=None |
| ) |
| |
| def _compare_author_lists(self, list1: list[str], list2: list[str]) -> float: |
| """Compare two author lists.""" |
| if not list1 and not list2: return 1.0 |
| if not list1 or not list2: return 0.0 |
| |
| total_similarity = 0.0 |
| for author1 in list1: |
| best_match = 0.0 |
| for author2 in list2: |
| if self._names_match(author1, author2): |
| best_match = 1.0 |
| break |
| sim = self.normalizer.similarity_ratio(author1, author2) |
| best_match = max(best_match, sim) |
| total_similarity += best_match |
| |
| return total_similarity / len(list1) |
| |
| def _names_match(self, name1: str, name2: str) -> bool: |
| """Check if two names match (handles abbreviated names).""" |
| def split_name(n): |
| parts = n.lower().replace('.', '').split() |
| return parts |
|
|
| words1 = split_name(name1) |
| words2 = split_name(name2) |
| if not words1 or not words2: return False |
| |
| |
| if words1[-1] != words2[-1]: |
| return False |
| |
| |
| if len(words1) > 1 and len(words2) > 1: |
| f1 = words1[0] |
| f2 = words2[0] |
| |
| |
| if len(f1) == 1 or len(f2) == 1: |
| if f1[0] != f2[0]: return False |
| else: |
| |
| if f1 != f2: return False |
| |
| return True |
|
|
| def _check_author_initial_conflict( |
| self, |
| bib_authors_norm: list[str], |
| fetched_authors_norm: list[str], |
| bib_authors_raw: list[str], |
| fetched_authors_raw: list, |
| ) -> bool: |
| """ |
| Detect when first-name initials clearly conflict between |
| bib entry and fetched data. |
| |
| e.g., "Y. Zhou" (bib) vs "Henry Zhou" (fetched) → True (Y ≠ H) |
| This prevents blindly overwriting authors with wrong names. |
| """ |
| |
| min_len = min(len(bib_authors_norm), len(fetched_authors_norm)) |
| if min_len == 0: |
| return False |
|
|
| for i in range(min_len): |
| bib_parts = bib_authors_norm[i].split() |
| fetched_parts = fetched_authors_norm[i].split() |
| |
| if len(bib_parts) < 2 or len(fetched_parts) < 2: |
| continue |
| |
| |
| if bib_parts[-1] != fetched_parts[-1]: |
| continue |
| |
| bib_first = bib_parts[0] |
| fetched_first = fetched_parts[0] |
| |
| |
| if not bib_first or not fetched_first: |
| continue |
| |
| |
| if bib_first[0] != fetched_first[0]: |
| return True |
| |
| return False |
|
|