"""Discover new Pashto-related resource candidates from public endpoints. This script does not auto-merge into the main catalog. It writes candidates to `resources/catalog/pending_candidates.json` for maintainer review. Usage: python scripts/sync_resources.py python scripts/sync_resources.py --limit 20 --output resources/catalog/pending_candidates.json """ from __future__ import annotations import argparse import json import re import urllib.parse import urllib.request import xml.etree.ElementTree as ET from datetime import datetime, timezone from pathlib import Path from typing import Any USER_AGENT = "pashto-resource-sync/1.0" def _slug(value: str) -> str: value = value.lower() value = re.sub(r"[^a-z0-9]+", "-", value) value = re.sub(r"-+", "-", value).strip("-") return value[:80] if value else "resource" def _fetch_json(url: str, timeout: float = 20.0) -> Any: req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) with urllib.request.urlopen(req, timeout=timeout) as response: return json.loads(response.read().decode("utf-8")) def _fetch_text(url: str, timeout: float = 20.0) -> str: req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) with urllib.request.urlopen(req, timeout=timeout) as response: return response.read().decode("utf-8", errors="replace") def _candidate( *, rid: str, title: str, url: str, category: str, source: str, summary: str, evidence_text: str, evidence_url: str, markers: list[str], tags: list[str], ) -> dict[str, Any]: return { "id": rid, "title": title.strip(), "url": url.strip(), "category": category, "source": source, "status": "candidate", "summary": summary.strip(), "primary_use": "Needs maintainer review before promotion to verified catalog.", "tasks": [], "pashto_evidence": { "evidence_text": evidence_text.strip(), "evidence_url": evidence_url.strip(), "markers": markers, }, "tags": tags, } def fetch_huggingface(kind: str, limit: int) -> list[dict[str, Any]]: if kind not in {"datasets", "models"}: return [] query = urllib.parse.urlencode({"search": "pashto", "limit": str(limit)}) url = f"https://huggingface.co/api/{kind}?{query}" payload = _fetch_json(url) category = "dataset" if kind == "datasets" else "model" out: list[dict[str, Any]] = [] for item in payload: repo_id = item.get("id") or item.get("modelId") if not repo_id: continue repo_url = f"https://huggingface.co/{'datasets/' if kind == 'datasets' else ''}{repo_id}" rid = f"candidate-hf-{kind[:-1]}-{_slug(repo_id)}" out.append( _candidate( rid=rid, title=repo_id, url=repo_url, category=category, source="huggingface", summary=f"Candidate {category} returned from Hugging Face search for Pashto.", evidence_text="Matched by Pashto keyword in Hugging Face search results.", evidence_url=repo_url, markers=["pashto"], tags=["pashto", "candidate", category], ) ) return out def fetch_arxiv(limit: int) -> list[dict[str, Any]]: query = urllib.parse.urlencode( {"search_query": "all:pashto", "start": "0", "max_results": str(limit)} ) url = f"http://export.arxiv.org/api/query?{query}" xml_text = _fetch_text(url) root = ET.fromstring(xml_text) ns = {"atom": "http://www.w3.org/2005/Atom"} out: list[dict[str, Any]] = [] for entry in root.findall("atom:entry", ns): title = (entry.findtext("atom:title", default="", namespaces=ns) or "").strip() link = (entry.findtext("atom:id", default="", namespaces=ns) or "").strip() summary = (entry.findtext("atom:summary", default="", namespaces=ns) or "").strip() if not title or not link: continue rid = f"candidate-arxiv-{_slug(title)}" out.append( _candidate( rid=rid, title=title, url=link, category="paper", source="arxiv", summary=summary[:240] if summary else "Candidate paper returned from arXiv query for Pashto.", evidence_text="Matched by arXiv query: all:pashto.", evidence_url=link, markers=["pashto"], tags=["pashto", "candidate", "paper"], ) ) return out def fetch_semantic_scholar(limit: int) -> list[dict[str, Any]]: fields = "title,url,abstract,year,externalIds" query = urllib.parse.urlencode( {"query": "pashto", "limit": str(limit), "fields": fields} ) url = f"https://api.semanticscholar.org/graph/v1/paper/search?{query}" payload = _fetch_json(url) out: list[dict[str, Any]] = [] for item in payload.get("data", []): title = (item.get("title") or "").strip() if not title: continue paper_url = (item.get("url") or "").strip() if not paper_url: ext = item.get("externalIds") or {} arxiv_id = ext.get("ArXiv") if arxiv_id: paper_url = f"https://arxiv.org/abs/{arxiv_id}" if not paper_url: continue summary = (item.get("abstract") or "").strip() rid = f"candidate-s2-{_slug(title)}" out.append( _candidate( rid=rid, title=title, url=paper_url, category="paper", source="other", summary=summary[:240] if summary else "Candidate paper returned from Semantic Scholar search for Pashto.", evidence_text="Matched by Semantic Scholar query: pashto.", evidence_url=paper_url, markers=["pashto"], tags=["pashto", "candidate", "paper"], ) ) return out def _dedupe_candidates( candidates: list[dict[str, Any]], existing_ids: set[str], existing_urls: set[str], ) -> list[dict[str, Any]]: unique: list[dict[str, Any]] = [] seen_ids = set(existing_ids) seen_urls = set(existing_urls) for item in candidates: rid = item["id"] url = item["url"].rstrip("/") if rid in seen_ids or url in seen_urls: continue seen_ids.add(rid) seen_urls.add(url) unique.append(item) return unique def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--catalog", default="resources/catalog/resources.json") parser.add_argument("--output", default="resources/catalog/pending_candidates.json") parser.add_argument("--limit", type=int, default=15) args = parser.parse_args() catalog_path = Path(args.catalog) output_path = Path(args.output) catalog = json.loads(catalog_path.read_text(encoding="utf-8")) resources = catalog.get("resources", []) existing_ids = {resource.get("id", "") for resource in resources if isinstance(resource, dict)} existing_urls = { resource.get("url", "").rstrip("/") for resource in resources if isinstance(resource, dict) and isinstance(resource.get("url"), str) } all_candidates: list[dict[str, Any]] = [] source_errors: list[str] = [] sources_used: list[str] = [] fetch_steps = [ ("huggingface-datasets", lambda: fetch_huggingface("datasets", args.limit)), ("huggingface-models", lambda: fetch_huggingface("models", args.limit)), ("arxiv", lambda: fetch_arxiv(args.limit)), ("semantic-scholar", lambda: fetch_semantic_scholar(args.limit)), ] for source_name, step in fetch_steps: try: results = step() all_candidates.extend(results) sources_used.append(source_name) except Exception as exc: # noqa: BLE001 source_errors.append(f"{source_name}: {exc}") unique_candidates = _dedupe_candidates(all_candidates, existing_ids, existing_urls) unique_candidates = sorted(unique_candidates, key=lambda item: item["title"].lower()) payload: dict[str, Any] = { "generated_on": datetime.now(timezone.utc).isoformat(), "sources": sources_used, "candidate_count": len(unique_candidates), "candidates": unique_candidates, } if source_errors: payload["errors"] = source_errors output_path.parent.mkdir(parents=True, exist_ok=True) if output_path.exists(): try: old_payload = json.loads(output_path.read_text(encoding="utf-8")) except json.JSONDecodeError: old_payload = None if isinstance(old_payload, dict): old_compare = {key: value for key, value in old_payload.items() if key != "generated_on"} new_compare = {key: value for key, value in payload.items() if key != "generated_on"} if old_compare == new_compare: print( f"Candidate sync complete: {len(unique_candidates)} new candidates, " f"{len(source_errors)} source errors, no file changes" ) return 0 output_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") print( f"Candidate sync complete: {len(unique_candidates)} new candidates, " f"{len(source_errors)} source errors" ) return 0 if __name__ == "__main__": raise SystemExit(main())