| """ |
| Utilities for BibGuard: Normalization and Progress Display. |
| """ |
| import re |
| import unicodedata |
| import time |
| from contextlib import contextmanager |
| from dataclasses import dataclass |
| from typing import Optional, List |
| from unidecode import unidecode |
| from rich.console import Console |
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeElapsedColumn |
|
|
|
|
| class TextNormalizer: |
| """Utility class for normalizing text for comparison.""" |
| |
| |
| |
| DBLP_DISAMBIG_PATTERN = re.compile(r'\s+\d{4}\s*$') |
| |
| |
| LATEX_COMMANDS = [ |
| (r'\\textbf\{([^}]*)\}', r'\1'), |
| (r'\\textit\{([^}]*)\}', r'\1'), |
| (r'\\emph\{([^}]*)\}', r'\1'), |
| (r'\\textrm\{([^}]*)\}', r'\1'), |
| (r'\\texttt\{([^}]*)\}', r'\1'), |
| (r'\\textsf\{([^}]*)\}', r'\1'), |
| (r'\\textsc\{([^}]*)\}', r'\1'), |
| (r'\\text\{([^}]*)\}', r'\1'), |
| (r'\\mathrm\{([^}]*)\}', r'\1'), |
| (r'\\mathbf\{([^}]*)\}', r'\1'), |
| (r'\\mathit\{([^}]*)\}', r'\1'), |
| (r'\\url\{([^}]*)\}', r'\1'), |
| (r'\\href\{[^}]*\}\{([^}]*)\}', r'\1'), |
| ] |
| |
| |
| LATEX_CHARS = { |
| r'\&': '&', |
| r'\%': '%', |
| r'\$': '$', |
| r'\#': '#', |
| r'\_': '_', |
| r'\{': '{', |
| r'\}': '}', |
| r'\~': '~', |
| r'\^': '^', |
| r'``': '"', |
| r"''": '"', |
| r'`': "'", |
| r"'": "'", |
| r'--': '–', |
| r'---': '—', |
| } |
| |
| |
| LATEX_ACCENTS = [ |
| (r"\\'([aeiouAEIOU])", r'\1'), |
| (r'\\`([aeiouAEIOU])', r'\1'), |
| (r'\\^([aeiouAEIOU])', r'\1'), |
| (r'\\"([aeiouAEIOU])', r'\1'), |
| (r'\\~([nNaAoO])', r'\1'), |
| (r'\\c\{([cC])\}', r'\1'), |
| (r"\\'{([aeiouAEIOU])}", r'\1'), |
| (r'\\`{([aeiouAEIOU])}', r'\1'), |
| (r'\\^{([aeiouAEIOU])}', r'\1'), |
| (r'\\"{([aeiouAEIOU])}', r'\1'), |
| (r'\\~{([nNaAoO])}', r'\1'), |
| ] |
| |
| @classmethod |
| def normalize_latex(cls, text: str) -> str: |
| """Remove LaTeX formatting commands.""" |
| if not text: return "" |
| result = text |
| for pattern, replacement in cls.LATEX_COMMANDS: |
| result = re.sub(pattern, replacement, result) |
| for pattern, replacement in cls.LATEX_ACCENTS: |
| result = re.sub(pattern, replacement, result) |
| for latex_char, normal_char in cls.LATEX_CHARS.items(): |
| result = result.replace(latex_char, normal_char) |
| return re.sub(r'[{}]', '', result) |
| |
| @classmethod |
| def normalize_unicode(cls, text: str) -> str: |
| """Normalize Unicode characters to ASCII.""" |
| if not text: return "" |
| text = unicodedata.normalize('NFKD', text) |
| return unidecode(text) |
| |
| @classmethod |
| def normalize_for_comparison(cls, text: str) -> str: |
| """Full normalization pipeline for text comparison.""" |
| if not text: return "" |
| text = cls.normalize_latex(text) |
| text = cls.normalize_unicode(text) |
| text = text.lower() |
| text = re.sub(r'\s+', ' ', text).strip() |
| return re.sub(r'[^\w\s]', '', text) |
| |
| @classmethod |
| def strip_dblp_disambiguation_id(cls, name: str) -> str: |
| """Strip DBLP disambiguation suffix (4-digit number) from author name. |
| |
| DBLP appends codes like '0001', '0019' to disambiguate homonymous authors. |
| e.g. 'Tian Tan 0019' -> 'Tian Tan' |
| 'Wei Li 0119' -> 'Wei Li' |
| """ |
| if not name: |
| return name |
| return cls.DBLP_DISAMBIG_PATTERN.sub('', name).strip() |
| |
| @classmethod |
| def has_dblp_disambiguation_id(cls, name: str) -> bool: |
| """Check if an author name contains a DBLP disambiguation ID.""" |
| if not name: |
| return False |
| return bool(cls.DBLP_DISAMBIG_PATTERN.search(name)) |
|
|
| @classmethod |
| def normalize_author_name(cls, name: str) -> str: |
| """Normalize author name format.""" |
| if not name: return "" |
| name = cls.normalize_latex(name) |
| name = cls.normalize_unicode(name) |
| |
| name = cls.strip_dblp_disambiguation_id(name) |
| name = re.sub(r'\s+', ' ', name).strip() |
| if ',' in name: |
| parts = name.split(',', 1) |
| if len(parts) == 2: |
| name = f"{parts[1].strip()} {parts[0].strip()}" |
| name = name.lower() |
| return re.sub(r'[^\w\s]', '', name) |
| |
| @classmethod |
| def parse_author_list(cls, authors: str) -> list[str]: |
| """Parse author string into a list of raw author names.""" |
| if not authors: return [] |
| |
| return re.split(r'\s+and\s+', authors, flags=re.IGNORECASE) |
|
|
| @classmethod |
| def normalize_author_list(cls, authors: str) -> list[str]: |
| """Parse and normalize a list of authors.""" |
| if not authors: return [] |
| author_list = cls.parse_author_list(authors) |
| normalized = [] |
| for author in author_list: |
| norm = cls.normalize_author_name(author.strip()) |
| if norm: normalized.append(norm) |
| return normalized |
| |
| @classmethod |
| def similarity_ratio(cls, text1: str, text2: str) -> float: |
| """Calculate Jaccard similarity between two strings.""" |
| if not text1 or not text2: return 0.0 |
| words1, words2 = set(text1.split()), set(text2.split()) |
| if not words1 and not words2: return 1.0 |
| if not words1 or not words2: return 0.0 |
| return len(words1 & words2) / len(words1 | words2) |
| |
| @classmethod |
| def levenshtein_similarity(cls, s1: str, s2: str) -> float: |
| """Calculate normalized Levenshtein similarity.""" |
| if not s1 and not s2: return 1.0 |
| if not s1 or not s2: return 0.0 |
| m, n = len(s1), len(s2) |
| dp = [list(range(n + 1))] + [[i] + [0]*n for i in range(1, m + 1)] |
| for i in range(1, m + 1): |
| for j in range(1, n + 1): |
| dp[i][j] = dp[i-1][j-1] if s1[i-1] == s2[j-1] else min(dp[i-1][j], dp[i][j-1], dp[i-1][j-1]) + 1 |
| return 1.0 - (dp[m][n] / max(m, n)) |
|
|
|
|
| @dataclass |
| class ProgressStats: |
| """Statistics for progress display.""" |
| total: int = 0 |
| processed: int = 0 |
| success: int = 0 |
| warnings: int = 0 |
| errors: int = 0 |
|
|
|
|
| class ProgressDisplay: |
| """Rich terminal progress display.""" |
| |
| def __init__(self): |
| self.console = Console() |
| self.stats = ProgressStats() |
| self._progress: Optional[Progress] = None |
| self._task = None |
| |
| @contextmanager |
| def progress_context(self, total: int, description: str = "Processing"): |
| """Context manager for progress display.""" |
| self.stats.total = total |
| with Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| BarColumn(bar_width=40), |
| TaskProgressColumn(), |
| TimeElapsedColumn(), |
| console=self.console, |
| transient=False |
| ) as progress: |
| self._progress = progress |
| self._task = progress.add_task(description, total=total) |
| try: |
| yield self |
| finally: |
| self._progress = None |
| self._task = None |
| |
| def update(self, entry_key: str = "", task: str = "", advance: int = 0): |
| """Update progress display.""" |
| if self._progress and self._task is not None: |
| desc = f"[cyan]{entry_key}[/cyan] - {task}" if entry_key else task |
| self._progress.update(self._task, description=desc, advance=advance) |
| self.stats.processed += advance |
| |
| def mark_success(self): self.stats.success += 1 |
| def mark_warning(self): self.stats.warnings += 1 |
| def mark_error(self): self.stats.errors += 1 |
| def print_error(self, message: str): |
| self.console.print(f" [red]✗[/red] {message}") |
|
|