avinash-rai's picture
Deployment Ready: Fixed scam detection low confidence, added production audit report, optimized throttles
1838600
# app/enforcement/police_api.py - Law enforcement simulation
"""Simulated integration with NCRP, Cyber Police, RBI for threat reporting."""
import uuid
from datetime import datetime
from typing import Dict, Any, List, Optional
from app.utils.logger import AgentLogger
class CyberPoliceAPI:
"""
Simulated Cyber Police integration for threat reporting.
In production, this would connect to:
- cybercrime.gov.in API
- State Cyber Police systems
"""
def __init__(self):
self.logger = AgentLogger("cyber_police_api")
self.reports: Dict[str, Dict] = {} # Report storage
def file_report(
self,
scam_type: str,
intelligence: Dict,
threat_intel: Dict,
risk_score: float,
conversation_summary: str = None
) -> Dict[str, Any]:
"""
File a report to simulated Cyber Police system.
In production, this would submit to NCRP.
Returns:
Report details with tracking number
"""
report_id = f"NCRP-{datetime.utcnow().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6].upper()}"
# Determine priority based on risk
if risk_score >= 0.8:
priority = "P1-CRITICAL"
action = "immediate_investigation"
elif risk_score >= 0.6:
priority = "P2-HIGH"
action = "urgent_review"
elif risk_score >= 0.4:
priority = "P3-MEDIUM"
action = "standard_processing"
else:
priority = "P4-LOW"
action = "monitoring"
# Extract entities for flagging
flagged_entities = []
for phone in intelligence.get("phone_numbers", []):
flagged_entities.append({"type": "phone", "value": phone})
for upi in intelligence.get("upi_ids", []):
flagged_entities.append({"type": "upi", "value": upi})
for acc in intelligence.get("bank_accounts", []):
flagged_entities.append({"type": "bank_account", "value": acc})
# Create report
report = {
"report_id": report_id,
"status": "submitted_to_cyber_cell",
"priority": priority,
"action_required": action,
"scam_type": scam_type,
"campaign_id": threat_intel.get("campaign_id"),
"risk_score": risk_score,
"threat_level": threat_intel.get("severity", "unknown"),
"flagged_entities": flagged_entities,
"iocs": threat_intel.get("iocs", {}),
"recommended_actions": [
"Block reported phone numbers via TRAI",
"Flag UPI IDs for monitoring",
"Issue advisory to banks"
],
"submitted_at": datetime.utcnow().isoformat(),
"estimated_response": "24-48 hours",
"portal": "cybercrime.gov.in (simulated)"
}
self.reports[report_id] = report
self.logger.info(
"Report filed",
report_id=report_id,
priority=priority,
entities_flagged=len(flagged_entities)
)
return report
def get_report(self, report_id: str) -> Optional[Dict]:
"""Get report by ID."""
return self.reports.get(report_id)
def generate_firewall_rules(self, ip_address: str) -> List[str]:
"""Generate real firewall rules to block the attacker."""
return [
f"iptables -A INPUT -s {ip_address} -j DROP",
f"ufw deny from {ip_address} to any",
f"route add -host {ip_address} reject"
]
def generate_soc_alert(self, report: Dict) -> Dict[str, Any]:
"""Generate standard SOC Alert JSON (SIEM compatible)."""
return {
"version": "1.0",
"type": "THREAT_DETECTION",
"source": "Sentinel_Honeypot_v2",
"timestamp": report["submitted_at"],
"severity": report["priority"],
"attacker": {
"ip": report.get("iocs", {}).get("ip", "unknown"),
"campaign": report.get("campaign_id")
},
"action_taken": "BLOCK_AND_REPORT",
"technique_id": "T1566" # Phishing
}
def get_all_reports(self) -> List[Dict]:
"""Get all filed reports."""
return list(self.reports.values())
class ActionRecommendationAPI:
"""
Simulated Cyber Cell Recommendation API.
Generates actionable intelligence packets for banks/NPCI.
Note: Real freeze actions require regulatory process.
This API simulates the 'Request for Action' submission.
"""
def __init__(self):
self.logger = AgentLogger("action_recommendation_api")
self.action_requests: Dict[str, Dict] = {}
def recommend_upi_action(
self,
upi_id: str,
reason: str,
threat_intel: Dict,
priority: str = "high"
) -> Dict[str, Any]:
"""
Submit a recommendation to NPCI/Bank for UPI blocking.
"""
request_id = f"CYBER-REC-{datetime.utcnow().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6].upper()}"
# Parse UPI provider
provider = "unknown"
if "@" in upi_id:
handle = upi_id.split("@")[1].lower()
provider_map = {
"paytm": "Paytm Payments Bank",
"ybl": "PhonePe/Yes Bank",
"okaxis": "Google Pay/Axis Bank",
"oksbi": "Google Pay/SBI",
"upi": "BHIM UPI"
}
for key, name in provider_map.items():
if key in handle:
provider = name
break
freeze_request = {
"request_id": request_id,
"upi_id": upi_id,
"provider": provider,
"action": "freeze_requested",
"status": "pending_bank_action",
"priority": priority,
"reason": reason,
"campaign_id": threat_intel.get("campaign_id"),
"scam_pattern": threat_intel.get("scam_pattern"),
"submitted_at": datetime.utcnow().isoformat(),
"expected_action": "Expedited review requested",
"bank_reference": f"NPCI-{uuid.uuid4().hex[:8].upper()}"
}
self.action_requests[request_id] = freeze_request
self.logger.info(
"UPI freeze requested",
request_id=request_id,
upi_id=upi_id,
provider=provider
)
return freeze_request
def request_account_freeze(
self,
account_number: str,
ifsc_code: str,
reason: str,
threat_intel: Dict
) -> Dict[str, Any]:
"""
Request bank account freeze.
"""
request_id = f"RBI-FREEZE-{datetime.utcnow().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6].upper()}"
# Parse bank from IFSC
bank = "Unknown Bank"
if ifsc_code and len(ifsc_code) >= 4:
bank_codes = {
"HDFC": "HDFC Bank",
"ICIC": "ICICI Bank",
"SBIN": "State Bank of India",
"UTIB": "Axis Bank",
"KKBK": "Kotak Mahindra Bank",
"PUNB": "Punjab National Bank"
}
bank = bank_codes.get(ifsc_code[:4], f"Bank ({ifsc_code[:4]})")
freeze_request = {
"request_id": request_id,
"account_number": account_number[:4] + "****" + account_number[-4:] if len(account_number) >= 8 else account_number,
"ifsc_code": ifsc_code,
"bank": bank,
"action": "freeze_requested",
"status": "pending_rbi_review",
"reason": reason,
"campaign_id": threat_intel.get("campaign_id"),
"submitted_at": datetime.utcnow().isoformat(),
"regulatory_framework": "RBI Fraud Reporting Mechanism"
}
self.action_requests[request_id] = freeze_request
return freeze_request
def get_freeze_status(self, request_id: str) -> Optional[Dict]:
"""Get freeze request status."""
return self.action_requests.get(request_id)
class ReportGenerator:
"""
Generates evidence packages for law enforcement.
"""
def __init__(self):
self.logger = AgentLogger("report_generator")
def generate_evidence_package(
self,
conversation: Dict,
intelligence: Dict,
threat_intel: Dict,
risk_score: float
) -> Dict[str, Any]:
"""
Generate comprehensive evidence package.
"""
package = {
"package_id": f"EVD-{uuid.uuid4().hex[:8].upper()}",
"generated_at": datetime.utcnow().isoformat(),
"summary": {
"scam_type": conversation.get("scam_type"),
"risk_score": risk_score,
"message_count": len(conversation.get("history", [])),
"duration": "Active engagement"
},
"intelligence": {
"phone_numbers": intelligence.get("phone_numbers", []),
"upi_ids": intelligence.get("upi_ids", []),
"bank_accounts": intelligence.get("bank_accounts", []),
"urls": intelligence.get("urls", [])
},
"threat_analysis": {
"campaign_id": threat_intel.get("campaign_id"),
"scam_pattern": threat_intel.get("scam_pattern"),
"fraud_vector": threat_intel.get("fraud_vector"),
"severity": threat_intel.get("severity"),
"iocs": threat_intel.get("iocs", {}),
"recommended_actions": threat_intel.get("recommended_actions", ["Immediate Block", "Trace IP"])
},
"timeline": [
{"timestamp": msg.get("timestamp", "unknown"), "event": f"Message turn {msg.get('turn')}"}
for msg in conversation.get("history", [])
],
"conversation_transcript": [
{
"turn": msg.get("turn"),
"timestamp": msg.get("timestamp"),
"scammer": msg.get("scammer_message"),
"honeypot": msg.get("honeypot_response")
}
for msg in conversation.get("history", [])
],
"legal_notice": "This evidence package was generated by an AI honeypot system for research and law enforcement purposes."
}
return package
__all__ = ["CyberPoliceAPI", "ActionRecommendationAPI", "ReportGenerator"]