Deployment Ready: Fixed scam detection low confidence, added production audit report, optimized throttles
1838600 | # app/enforcement/police_api.py - Law enforcement simulation | |
| """Simulated integration with NCRP, Cyber Police, RBI for threat reporting.""" | |
| import uuid | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Optional | |
| from app.utils.logger import AgentLogger | |
| class CyberPoliceAPI: | |
| """ | |
| Simulated Cyber Police integration for threat reporting. | |
| In production, this would connect to: | |
| - cybercrime.gov.in API | |
| - State Cyber Police systems | |
| """ | |
| def __init__(self): | |
| self.logger = AgentLogger("cyber_police_api") | |
| self.reports: Dict[str, Dict] = {} # Report storage | |
| def file_report( | |
| self, | |
| scam_type: str, | |
| intelligence: Dict, | |
| threat_intel: Dict, | |
| risk_score: float, | |
| conversation_summary: str = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| File a report to simulated Cyber Police system. | |
| In production, this would submit to NCRP. | |
| Returns: | |
| Report details with tracking number | |
| """ | |
| report_id = f"NCRP-{datetime.utcnow().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6].upper()}" | |
| # Determine priority based on risk | |
| if risk_score >= 0.8: | |
| priority = "P1-CRITICAL" | |
| action = "immediate_investigation" | |
| elif risk_score >= 0.6: | |
| priority = "P2-HIGH" | |
| action = "urgent_review" | |
| elif risk_score >= 0.4: | |
| priority = "P3-MEDIUM" | |
| action = "standard_processing" | |
| else: | |
| priority = "P4-LOW" | |
| action = "monitoring" | |
| # Extract entities for flagging | |
| flagged_entities = [] | |
| for phone in intelligence.get("phone_numbers", []): | |
| flagged_entities.append({"type": "phone", "value": phone}) | |
| for upi in intelligence.get("upi_ids", []): | |
| flagged_entities.append({"type": "upi", "value": upi}) | |
| for acc in intelligence.get("bank_accounts", []): | |
| flagged_entities.append({"type": "bank_account", "value": acc}) | |
| # Create report | |
| report = { | |
| "report_id": report_id, | |
| "status": "submitted_to_cyber_cell", | |
| "priority": priority, | |
| "action_required": action, | |
| "scam_type": scam_type, | |
| "campaign_id": threat_intel.get("campaign_id"), | |
| "risk_score": risk_score, | |
| "threat_level": threat_intel.get("severity", "unknown"), | |
| "flagged_entities": flagged_entities, | |
| "iocs": threat_intel.get("iocs", {}), | |
| "recommended_actions": [ | |
| "Block reported phone numbers via TRAI", | |
| "Flag UPI IDs for monitoring", | |
| "Issue advisory to banks" | |
| ], | |
| "submitted_at": datetime.utcnow().isoformat(), | |
| "estimated_response": "24-48 hours", | |
| "portal": "cybercrime.gov.in (simulated)" | |
| } | |
| self.reports[report_id] = report | |
| self.logger.info( | |
| "Report filed", | |
| report_id=report_id, | |
| priority=priority, | |
| entities_flagged=len(flagged_entities) | |
| ) | |
| return report | |
| def get_report(self, report_id: str) -> Optional[Dict]: | |
| """Get report by ID.""" | |
| return self.reports.get(report_id) | |
| def generate_firewall_rules(self, ip_address: str) -> List[str]: | |
| """Generate real firewall rules to block the attacker.""" | |
| return [ | |
| f"iptables -A INPUT -s {ip_address} -j DROP", | |
| f"ufw deny from {ip_address} to any", | |
| f"route add -host {ip_address} reject" | |
| ] | |
| def generate_soc_alert(self, report: Dict) -> Dict[str, Any]: | |
| """Generate standard SOC Alert JSON (SIEM compatible).""" | |
| return { | |
| "version": "1.0", | |
| "type": "THREAT_DETECTION", | |
| "source": "Sentinel_Honeypot_v2", | |
| "timestamp": report["submitted_at"], | |
| "severity": report["priority"], | |
| "attacker": { | |
| "ip": report.get("iocs", {}).get("ip", "unknown"), | |
| "campaign": report.get("campaign_id") | |
| }, | |
| "action_taken": "BLOCK_AND_REPORT", | |
| "technique_id": "T1566" # Phishing | |
| } | |
| def get_all_reports(self) -> List[Dict]: | |
| """Get all filed reports.""" | |
| return list(self.reports.values()) | |
| class ActionRecommendationAPI: | |
| """ | |
| Simulated Cyber Cell Recommendation API. | |
| Generates actionable intelligence packets for banks/NPCI. | |
| Note: Real freeze actions require regulatory process. | |
| This API simulates the 'Request for Action' submission. | |
| """ | |
| def __init__(self): | |
| self.logger = AgentLogger("action_recommendation_api") | |
| self.action_requests: Dict[str, Dict] = {} | |
| def recommend_upi_action( | |
| self, | |
| upi_id: str, | |
| reason: str, | |
| threat_intel: Dict, | |
| priority: str = "high" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Submit a recommendation to NPCI/Bank for UPI blocking. | |
| """ | |
| request_id = f"CYBER-REC-{datetime.utcnow().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6].upper()}" | |
| # Parse UPI provider | |
| provider = "unknown" | |
| if "@" in upi_id: | |
| handle = upi_id.split("@")[1].lower() | |
| provider_map = { | |
| "paytm": "Paytm Payments Bank", | |
| "ybl": "PhonePe/Yes Bank", | |
| "okaxis": "Google Pay/Axis Bank", | |
| "oksbi": "Google Pay/SBI", | |
| "upi": "BHIM UPI" | |
| } | |
| for key, name in provider_map.items(): | |
| if key in handle: | |
| provider = name | |
| break | |
| freeze_request = { | |
| "request_id": request_id, | |
| "upi_id": upi_id, | |
| "provider": provider, | |
| "action": "freeze_requested", | |
| "status": "pending_bank_action", | |
| "priority": priority, | |
| "reason": reason, | |
| "campaign_id": threat_intel.get("campaign_id"), | |
| "scam_pattern": threat_intel.get("scam_pattern"), | |
| "submitted_at": datetime.utcnow().isoformat(), | |
| "expected_action": "Expedited review requested", | |
| "bank_reference": f"NPCI-{uuid.uuid4().hex[:8].upper()}" | |
| } | |
| self.action_requests[request_id] = freeze_request | |
| self.logger.info( | |
| "UPI freeze requested", | |
| request_id=request_id, | |
| upi_id=upi_id, | |
| provider=provider | |
| ) | |
| return freeze_request | |
| def request_account_freeze( | |
| self, | |
| account_number: str, | |
| ifsc_code: str, | |
| reason: str, | |
| threat_intel: Dict | |
| ) -> Dict[str, Any]: | |
| """ | |
| Request bank account freeze. | |
| """ | |
| request_id = f"RBI-FREEZE-{datetime.utcnow().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6].upper()}" | |
| # Parse bank from IFSC | |
| bank = "Unknown Bank" | |
| if ifsc_code and len(ifsc_code) >= 4: | |
| bank_codes = { | |
| "HDFC": "HDFC Bank", | |
| "ICIC": "ICICI Bank", | |
| "SBIN": "State Bank of India", | |
| "UTIB": "Axis Bank", | |
| "KKBK": "Kotak Mahindra Bank", | |
| "PUNB": "Punjab National Bank" | |
| } | |
| bank = bank_codes.get(ifsc_code[:4], f"Bank ({ifsc_code[:4]})") | |
| freeze_request = { | |
| "request_id": request_id, | |
| "account_number": account_number[:4] + "****" + account_number[-4:] if len(account_number) >= 8 else account_number, | |
| "ifsc_code": ifsc_code, | |
| "bank": bank, | |
| "action": "freeze_requested", | |
| "status": "pending_rbi_review", | |
| "reason": reason, | |
| "campaign_id": threat_intel.get("campaign_id"), | |
| "submitted_at": datetime.utcnow().isoformat(), | |
| "regulatory_framework": "RBI Fraud Reporting Mechanism" | |
| } | |
| self.action_requests[request_id] = freeze_request | |
| return freeze_request | |
| def get_freeze_status(self, request_id: str) -> Optional[Dict]: | |
| """Get freeze request status.""" | |
| return self.action_requests.get(request_id) | |
| class ReportGenerator: | |
| """ | |
| Generates evidence packages for law enforcement. | |
| """ | |
| def __init__(self): | |
| self.logger = AgentLogger("report_generator") | |
| def generate_evidence_package( | |
| self, | |
| conversation: Dict, | |
| intelligence: Dict, | |
| threat_intel: Dict, | |
| risk_score: float | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate comprehensive evidence package. | |
| """ | |
| package = { | |
| "package_id": f"EVD-{uuid.uuid4().hex[:8].upper()}", | |
| "generated_at": datetime.utcnow().isoformat(), | |
| "summary": { | |
| "scam_type": conversation.get("scam_type"), | |
| "risk_score": risk_score, | |
| "message_count": len(conversation.get("history", [])), | |
| "duration": "Active engagement" | |
| }, | |
| "intelligence": { | |
| "phone_numbers": intelligence.get("phone_numbers", []), | |
| "upi_ids": intelligence.get("upi_ids", []), | |
| "bank_accounts": intelligence.get("bank_accounts", []), | |
| "urls": intelligence.get("urls", []) | |
| }, | |
| "threat_analysis": { | |
| "campaign_id": threat_intel.get("campaign_id"), | |
| "scam_pattern": threat_intel.get("scam_pattern"), | |
| "fraud_vector": threat_intel.get("fraud_vector"), | |
| "severity": threat_intel.get("severity"), | |
| "iocs": threat_intel.get("iocs", {}), | |
| "recommended_actions": threat_intel.get("recommended_actions", ["Immediate Block", "Trace IP"]) | |
| }, | |
| "timeline": [ | |
| {"timestamp": msg.get("timestamp", "unknown"), "event": f"Message turn {msg.get('turn')}"} | |
| for msg in conversation.get("history", []) | |
| ], | |
| "conversation_transcript": [ | |
| { | |
| "turn": msg.get("turn"), | |
| "timestamp": msg.get("timestamp"), | |
| "scammer": msg.get("scammer_message"), | |
| "honeypot": msg.get("honeypot_response") | |
| } | |
| for msg in conversation.get("history", []) | |
| ], | |
| "legal_notice": "This evidence package was generated by an AI honeypot system for research and law enforcement purposes." | |
| } | |
| return package | |
| __all__ = ["CyberPoliceAPI", "ActionRecommendationAPI", "ReportGenerator"] | |