#!/usr/bin/env python3 """ Scrape Kol-Zchut (כל-זכות) for legal rights pages covering the gaps in our benchmark (employment, consumer, tort). Strategy: 1. Use MediaWiki category API to list all pages in relevant categories 2. Fetch each page's wikitext via /w/api.php?action=parse 3. Strip wiki markup → clean text 4. Output JSONL in the same format as our existing corpus Categories targeted (based on the 55 benchmark queries): • תעסוקה וזכויות עובדים (employment, ~19 of 55) • צרכנות (consumer, ~12) • נזיקין (tort, ~8) """ from __future__ import annotations import argparse, json, re, sys, time import urllib.parse, urllib.request from pathlib import Path API = "https://www.kolzchut.org.il/w/api.php" USER_AGENT = "tau-rag-scraper/1.0 (research; avribarzel@gmail.com)" # Top-level categories — names match Kol-Zchut wiki exactly. # v2 — added more from allcategories listing to plug benchmark gaps: # tort/negligence (רשלנות, רשלנות_רפואית) # compensation (פיצויים, פיצויים_מהמעסיק) # wage details (שכר_העבודה_ומרכיביו, שכר_טרחה) # women workers' rights (זכויות_נשים_עובדות) # sick leave rights (זכויות_עובדים_בעת_מחלה) # banking CATEGORIES = [ "תעסוקה_וזכויות_עובדים", "צרכנות", "נזיקין", "רשלנות", "רשלנות_רפואית", "פיצויי_פיטורים", "פיצויים", "פיצויים_מהמעסיק", "פיטורים", "ביטוח_לאומי", "זכויות_נשים_עובדות", "זכויות_עובדים_בעת_מחלה", "שכר_העבודה_ומרכיביו", "שכר_טרחה", "בנקים", ] def api_get(params: dict) -> dict | None: url = API + "?" + urllib.parse.urlencode(params, safe=":/") req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode("utf-8")) except Exception as e: print(f" ⚠️ API err: {e}") return None def list_category_pages(category: str, limit: int = 500) -> list[str]: """Return all page titles in `קטגוריה:` (recursive depth=1).""" titles = [] cmcontinue = None while True: params = { "action": "query", "list": "categorymembers", "cmtitle": f"קטגוריה:{category}", "cmlimit": min(500, limit - len(titles)), "cmtype": "page", "format": "json", } if cmcontinue: params["cmcontinue"] = cmcontinue data = api_get(params) if not data: break for m in data.get("query", {}).get("categorymembers", []): titles.append(m["title"]) cont = data.get("continue", {}) cmcontinue = cont.get("cmcontinue") if not cmcontinue or len(titles) >= limit: break return titles def fetch_wikitext(page: str) -> str | None: data = api_get({ "action": "parse", "page": page, "prop": "wikitext", "format": "json", "redirects": "1", }) if not data: return None if "error" in data: return None return data.get("parse", {}).get("wikitext", {}).get("*") # ─── wikitext cleanup ───────────────────────────────────────────── TEMPLATE_RE = re.compile(r"\{\{[^{}]*\}\}", re.DOTALL) LINK_RE = re.compile(r"\[\[([^\]|]+)(?:\|([^\]]+))?\]\]") EXTLINK_RE = re.compile(r"\[https?://[^\s\]]+\s*([^\]]*)\]") TAG_RE = re.compile(r"<[^>]+>") COMMENT_RE = re.compile(r"", re.DOTALL) BOLD_ITALIC = re.compile(r"'{2,}") HEADING_RE = re.compile(r"^={2,}\s*(.+?)\s*={2,}\s*$", re.MULTILINE) def clean_wiki(text: str) -> str: text = COMMENT_RE.sub("", text) for _ in range(8): new = TEMPLATE_RE.sub("", text) if new == text: break text = new text = LINK_RE.sub(lambda m: m.group(2) or m.group(1), text) text = EXTLINK_RE.sub(r"\1", text) text = TAG_RE.sub("", text) text = BOLD_ITALIC.sub("", text) lines = [] for line in text.split("\n"): s = re.sub(r"[ \t]+", " ", line).strip() if s.startswith(("*", "#")): s = s.lstrip("*# ").strip() if s: lines.append(f"• {s}") else: lines.append(s) text = "\n".join(lines) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def extract_law_refs(wikitext: str) -> list[str]: """Find [[חוק X]] links in the original wikitext (before clean).""" laws = set() for m in LINK_RE.finditer(wikitext): target = m.group(1).strip() if target.startswith(("חוק ", "פקודת ", "חוק_", "פקודת_")): laws.add(target.replace("_", " ")) return sorted(laws) def build_doc(page_title: str, wikitext: str, category: str) -> dict | None: if not wikitext or len(wikitext) < 100: return None clean = clean_wiki(wikitext) if len(clean) < 100: return None laws = extract_law_refs(wikitext) # Tags: page title + category + linked laws — boosts BM25 retrieval tag_parts = [f"[{page_title}]", f"[{category}]"] for law in laws[:5]: tag_parts.append(f"[{law}]") prefix = " ".join(tag_parts) text = f"{prefix} כל-זכות — {page_title}: {clean[:3500]}" safe_id = re.sub(r"[^\w]", "_", page_title) return { "id": f"kolzchut/{category}/{safe_id}", "text": text, "metadata": { "title": page_title, "category": category, "linked_laws": laws[:10], "source": "www.kolzchut.org.il", "language": "he", "kind": "rights", } } def main(): parser = argparse.ArgumentParser() parser.add_argument("--cats", default=None, help="Comma-separated category names") parser.add_argument("--out", default=None) parser.add_argument("--per-cat", type=int, default=80, help="Max pages per category") parser.add_argument("--sleep", type=float, default=0.6) parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() cats = args.cats.split(",") if args.cats else CATEGORIES here = Path(__file__).resolve().parent.parent out_path = Path(args.out) if args.out else ( here / "runtime" / "uploads" / "kolzchut_rights.jsonl") print(f"▸ Categories: {cats}") print(f"▸ Output : {out_path}") print() seen_titles: set[str] = set() docs: list[dict] = [] for cat in cats: print(f"=== {cat} ===") titles = list_category_pages(cat, limit=args.per_cat) new_titles = [t for t in titles if t not in seen_titles] print(f" {len(titles)} pages in cat ({len(new_titles)} new)") if args.dry_run: for t in new_titles[:5]: print(f" - {t}") seen_titles.update(new_titles) continue for i, t in enumerate(new_titles, 1): seen_titles.add(t) wt = fetch_wikitext(t) if not wt: continue d = build_doc(t, wt, cat) if d: docs.append(d) if i % 10 == 0: print(f" [{i}/{len(new_titles)}] kept {len(docs)} docs") time.sleep(args.sleep) print(f" total kept: {len(docs)}") if args.dry_run: print(f"\nDry run — found {len(seen_titles)} unique titles") return 0 out_path.parent.mkdir(parents=True, exist_ok=True) with open(out_path, "w", encoding="utf-8") as f: for d in docs: f.write(json.dumps(d, ensure_ascii=False) + "\n") print(f"\n✅ Wrote {len(docs):,} docs → {out_path}") from collections import Counter by_cat = Counter(d["metadata"]["category"] for d in docs) print("\nDocs per category:") for c, n in by_cat.most_common(): print(f" {n:>3} {c}") return 0 if __name__ == "__main__": sys.exit(main())