# -*- coding: utf-8 -*- """ Legal-i: Case Analyzer ======================= Causal analysis and prediction for legal cases. Features: - Case outcome prediction - Factor extraction - Causal chain analysis - Similarity scoring - Risk assessment TAU Signal Integration: - τ (Tau): Case momentum / progression - φ (Phi): Argument balance - ψ (Psi): Precedent stability - ξ (Xi): Anomaly / unusual case detection - Ω (Omega): Case strength score """ import numpy as np from dataclasses import dataclass, field from typing import Dict, List, Optional, Set, Any, Tuple from enum import Enum, auto from datetime import datetime import hashlib import logging logger = logging.getLogger(__name__) class CaseType(Enum): """Types of legal cases.""" CIVIL = "civil" CRIMINAL = "criminal" CONSTITUTIONAL = "constitutional" ADMINISTRATIVE = "administrative" COMMERCIAL = "commercial" FAMILY = "family" LABOR = "labor" INTELLECTUAL_PROPERTY = "intellectual_property" TAX = "tax" ENVIRONMENTAL = "environmental" class CaseOutcome(Enum): """Possible case outcomes.""" PLAINTIFF_WIN = "plaintiff_win" DEFENDANT_WIN = "defendant_win" SETTLEMENT = "settlement" DISMISSED = "dismissed" PARTIAL_WIN = "partial_win" APPEAL_PENDING = "appeal_pending" REMANDED = "remanded" class FactorType(Enum): """Types of legal factors.""" STATUTORY = "statutory" # Based on statute PRECEDENT = "precedent" # Based on prior cases FACTUAL = "factual" # Based on facts PROCEDURAL = "procedural" # Procedural issues CONSTITUTIONAL = "constitutional" # Constitutional issues EQUITABLE = "equitable" # Equity considerations POLICY = "policy" # Policy arguments @dataclass class LegalFactor: """A factor influencing case outcome.""" id: str name: str factor_type: FactorType description: str # Direction: positive = favors plaintiff, negative = favors defendant direction: float = 0.0 # [-1, 1] weight: float = 0.5 # Importance [0, 1] confidence: float = 0.7 # Confidence in assessment # Supporting elements citations: List[str] = field(default_factory=list) evidence: List[str] = field(default_factory=list) def compute_impact(self) -> float: """Compute factor's impact on outcome.""" return self.direction * self.weight * self.confidence def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "name": self.name, "type": self.factor_type.value, "description": self.description, "direction": self.direction, "weight": self.weight, "confidence": self.confidence, "impact": self.compute_impact(), "citations": self.citations, } @dataclass class CausalChain: """A causal chain in legal reasoning.""" id: str premise: str conclusion: str intermediate_steps: List[str] = field(default_factory=list) # Strength of the chain strength: float = 0.5 is_rebuttable: bool = True rebuttal_points: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "premise": self.premise, "conclusion": self.conclusion, "steps": self.intermediate_steps, "strength": self.strength, "rebuttable": self.is_rebuttable, "rebuttal_points": self.rebuttal_points, } @dataclass class LegalCase: """A legal case for analysis.""" id: str title: str case_type: CaseType jurisdiction: str # Parties plaintiff: str = "" defendant: str = "" # Facts and claims facts: List[str] = field(default_factory=list) claims: List[str] = field(default_factory=list) defenses: List[str] = field(default_factory=list) # Analysis factors: List[LegalFactor] = field(default_factory=list) causal_chains: List[CausalChain] = field(default_factory=list) # Outcome outcome: Optional[CaseOutcome] = None predicted_outcome: Optional[CaseOutcome] = None prediction_confidence: float = 0.0 # TAU signals tau: float = 0.5 phi: float = 0.618 psi: float = 0.7 xi: float = 0.9 # Metadata date_filed: Optional[datetime] = None date_decided: Optional[datetime] = None citations: List[str] = field(default_factory=list) def compute_hash(self) -> str: """Compute case hash.""" data = f"{self.title}:{self.plaintiff}:{self.defendant}" return hashlib.sha256(data.encode()).hexdigest()[:12] def compute_omega(self) -> float: """Compute case strength score.""" factor_strength = ( np.mean([abs(f.compute_impact()) for f in self.factors]) if self.factors else 0.5 ) return 0.3 * self.psi + 0.25 * factor_strength + 0.25 * self.phi + 0.2 * self.xi def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "title": self.title, "type": self.case_type.value, "jurisdiction": self.jurisdiction, "plaintiff": self.plaintiff, "defendant": self.defendant, "facts": self.facts, "claims": self.claims, "defenses": self.defenses, "factors": [f.to_dict() for f in self.factors], "causal_chains": [c.to_dict() for c in self.causal_chains], "outcome": self.outcome.value if self.outcome else None, "predicted_outcome": ( self.predicted_outcome.value if self.predicted_outcome else None ), "prediction_confidence": self.prediction_confidence, "signals": { "tau": self.tau, "phi": self.phi, "psi": self.psi, "xi": self.xi, "omega": self.compute_omega(), }, } @dataclass class PredictionResult: """Result of case prediction.""" case_id: str predicted_outcome: CaseOutcome confidence: float factor_analysis: Dict[str, float] key_factors: List[str] risk_assessment: str recommendations: List[str] def to_dict(self) -> Dict[str, Any]: return { "case_id": self.case_id, "predicted_outcome": self.predicted_outcome.value, "confidence": self.confidence, "factor_analysis": self.factor_analysis, "key_factors": self.key_factors, "risk_assessment": self.risk_assessment, "recommendations": self.recommendations, } class CaseAnalyzer: """ Causal case analysis engine. Features: - Factor extraction and weighting - Outcome prediction - Similarity analysis - Risk assessment """ PHI = 0.618033988749895 # Standard legal factors and their typical weights STANDARD_FACTORS = { "statutory_basis": {"type": FactorType.STATUTORY, "weight": 0.8}, "precedent_support": {"type": FactorType.PRECEDENT, "weight": 0.7}, "factual_strength": {"type": FactorType.FACTUAL, "weight": 0.6}, "procedural_compliance": {"type": FactorType.PROCEDURAL, "weight": 0.5}, "constitutional_issues": {"type": FactorType.CONSTITUTIONAL, "weight": 0.9}, "equity_considerations": {"type": FactorType.EQUITABLE, "weight": 0.4}, "policy_alignment": {"type": FactorType.POLICY, "weight": 0.3}, } def __init__(self): self.cases: Dict[str, LegalCase] = {} self.precedent_database: Dict[str, LegalCase] = {} logger.info("Initialized CaseAnalyzer") def create_case( self, title: str, case_type: CaseType, jurisdiction: str, plaintiff: str = "", defendant: str = "", facts: List[str] = None, claims: List[str] = None, ) -> LegalCase: """Create a new case for analysis.""" case = LegalCase( id="", title=title, case_type=case_type, jurisdiction=jurisdiction, plaintiff=plaintiff, defendant=defendant, facts=facts or [], claims=claims or [], ) case.id = case.compute_hash() self.cases[case.id] = case logger.info(f"Created case: {title}") return case def add_factor( self, case_id: str, name: str, factor_type: FactorType, direction: float, description: str = "", weight: float = 0.5, citations: List[str] = None, ) -> LegalFactor: """Add a factor to a case.""" if case_id not in self.cases: raise ValueError(f"Case {case_id} not found") factor = LegalFactor( id=f"factor_{len(self.cases[case_id].factors)}", name=name, factor_type=factor_type, description=description, direction=direction, weight=weight, citations=citations or [], ) self.cases[case_id].factors.append(factor) return factor def add_causal_chain( self, case_id: str, premise: str, conclusion: str, steps: List[str] = None, strength: float = 0.5, ) -> CausalChain: """Add a causal reasoning chain to a case.""" if case_id not in self.cases: raise ValueError(f"Case {case_id} not found") chain = CausalChain( id=f"chain_{len(self.cases[case_id].causal_chains)}", premise=premise, conclusion=conclusion, intermediate_steps=steps or [], strength=strength, ) self.cases[case_id].causal_chains.append(chain) return chain def predict_outcome(self, case_id: str) -> PredictionResult: """ Predict case outcome based on factors and precedents. Uses causal analysis to determine likely outcome. """ if case_id not in self.cases: raise ValueError(f"Case {case_id} not found") case = self.cases[case_id] # Analyze factors factor_scores = {} total_impact = 0.0 for factor in case.factors: impact = factor.compute_impact() factor_scores[factor.name] = impact total_impact += impact # Normalize to [-1, 1] range if case.factors: avg_impact = total_impact / len(case.factors) else: avg_impact = 0.0 # Consider causal chain strength chain_strength = ( np.mean([c.strength for c in case.causal_chains]) if case.causal_chains else 0.5 ) # Combined score combined_score = 0.7 * avg_impact + 0.3 * (chain_strength - 0.5) * 2 # Predict outcome if combined_score > 0.3: predicted = CaseOutcome.PLAINTIFF_WIN confidence = min(0.5 + combined_score, 0.95) elif combined_score < -0.3: predicted = CaseOutcome.DEFENDANT_WIN confidence = min(0.5 - combined_score, 0.95) else: predicted = CaseOutcome.SETTLEMENT confidence = 0.4 + abs(combined_score) # Find key factors key_factors = sorted( factor_scores.keys(), key=lambda k: abs(factor_scores[k]), reverse=True )[:3] # Risk assessment if confidence > 0.7: risk = "Low risk - strong case" elif confidence > 0.5: risk = "Medium risk - case could go either way" else: risk = "High risk - uncertain outcome" # Recommendations recommendations = [] weak_factors = [f for f in case.factors if abs(f.compute_impact()) < 0.2] if weak_factors: recommendations.append( f"Strengthen weak factors: {', '.join(f.name for f in weak_factors[:2])}" ) if confidence < 0.6: recommendations.append("Consider settlement negotiations") if not case.causal_chains: recommendations.append("Develop stronger causal arguments") # Update case case.predicted_outcome = predicted case.prediction_confidence = confidence self._update_case_signals(case) result = PredictionResult( case_id=case_id, predicted_outcome=predicted, confidence=confidence, factor_analysis=factor_scores, key_factors=key_factors, risk_assessment=risk, recommendations=recommendations, ) return result def _update_case_signals(self, case: LegalCase): """Update TAU signals for a case.""" # Tau: case momentum (based on factor direction) if case.factors: directions = [f.direction for f in case.factors] case.tau = 0.5 + np.mean(directions) * 0.5 # Phi: argument balance if case.factors: pos_factors = sum(1 for f in case.factors if f.direction > 0) neg_factors = sum(1 for f in case.factors if f.direction < 0) total = pos_factors + neg_factors if total > 0: ratio = min(pos_factors, neg_factors) / max(pos_factors, neg_factors, 1) case.phi = ratio # Psi: precedent stability if case.factors: precedent_factors = [ f for f in case.factors if f.factor_type == FactorType.PRECEDENT ] if precedent_factors: case.psi = np.mean([f.confidence for f in precedent_factors]) # Xi: anomaly (unusual case) case.xi = 0.9 # Default, would compare to precedent database def find_similar_cases( self, case_id: str, top_k: int = 5, ) -> List[Tuple[str, float]]: """Find similar cases from precedent database.""" if case_id not in self.cases: return [] case = self.cases[case_id] similarities = [] for pid, precedent in self.precedent_database.items(): if pid == case_id: continue # Compute similarity sim = self._compute_case_similarity(case, precedent) similarities.append((pid, sim)) # Sort by similarity similarities.sort(key=lambda x: x[1], reverse=True) return similarities[:top_k] def _compute_case_similarity(self, case1: LegalCase, case2: LegalCase) -> float: """Compute similarity between two cases.""" score = 0.0 # Same type if case1.case_type == case2.case_type: score += 0.3 # Same jurisdiction if case1.jurisdiction == case2.jurisdiction: score += 0.2 # Factor overlap factors1 = {f.name for f in case1.factors} factors2 = {f.name for f in case2.factors} if factors1 and factors2: overlap = len(factors1 & factors2) / len(factors1 | factors2) score += 0.3 * overlap # Fact similarity (simplified) if case1.facts and case2.facts: # Would use NLP similarity in production score += ( 0.2 * min(len(case1.facts), len(case2.facts)) / max(len(case1.facts), len(case2.facts)) ) return score def add_precedent(self, case: LegalCase): """Add a case to precedent database.""" self.precedent_database[case.id] = case def get_stats(self) -> Dict[str, Any]: """Get analyzer statistics.""" return { "cases_analyzed": len(self.cases), "precedents_loaded": len(self.precedent_database), "case_types": list(set(c.case_type.value for c in self.cases.values())), } def to_dict(self) -> Dict[str, Any]: """Serialize analyzer state.""" return { "cases": {cid: c.to_dict() for cid, c in self.cases.items()}, "precedent_count": len(self.precedent_database), "stats": self.get_stats(), } # Factory functions def create_case_analyzer() -> CaseAnalyzer: """Create a new case analyzer.""" return CaseAnalyzer()