| |
| |
| |
| |
|
|
| """ |
| π₯ CYBER WARFARE SIMULATION ENGINE |
| =================================== |
| Simulates an autonomous battle between: |
| π₯ RED AGENT (Attacker AI) - Uses social engineering & phishing TTPs |
| π¦ BLUE AGENT (Sentinel Sentinel) - Uses active defense & behavioral analysis |
| |
| FEATURES (For Demo): |
| - Agentic Loop Visualization (Observe -> Plan -> Act) |
| - Real-time MITRE ATT&CK Mapping |
| - Risk Escalation & Police Reporting |
| - Automated Counter-Moves |
| |
| Usage: |
| python simulate_attack.py |
| """ |
|
|
| import asyncio |
| import sys |
| import os |
| import requests |
| import time |
| import random |
|
|
| |
| sys.path.append(os.getcwd()) |
| from app.core.llm_client import LLMClient |
|
|
| |
| class Colors: |
| RED = '\033[91m' |
| BLUE = '\033[94m' |
| GREEN = '\033[92m' |
| YELLOW = '\033[93m' |
| CYAN = '\033[96m' |
| BOLD = '\033[1m' |
| END = '\033[0m' |
|
|
| |
| |
| |
|
|
| SCAMMER_PERSONA = """Role: Experienced Cyber Criminal (Red Team). |
| Objective: Steal UPI PIN or Registration Fee. |
| Tactic: {tactic} |
| Context: {history} |
| Last Reply: {last_reply} |
| Instruction: Generate next short text. Be persuasive. Hinglish.""" |
|
|
| TACTICS = ["T1566 Phishing", "T1598 Social Engineering", "T1078 Credential Access"] |
|
|
| async def red_agent_turn(llm, history, last_reply): |
| tactic = random.choice(TACTICS) |
| |
| print(f"\n{Colors.RED}[RED AGENT] π§ THINKING LOOP:{Colors.END}") |
| print(f" βββ {Colors.YELLOW}Observe:{Colors.END} User said '{last_reply}'") |
| print(f" βββ {Colors.YELLOW}Plan:{Colors.END} Escalating urgency using {tactic}") |
| print(f" βββ {Colors.YELLOW}Act:{Colors.END} Generating social engineering payload...") |
| |
| |
| time.sleep(1.5) |
| |
| prompt = SCAMMER_PERSONA.format( |
| tactic=tactic, |
| history="\n".join(history[-3:]), |
| last_reply=last_reply |
| ) |
| try: |
| if llm: |
| msg = await llm.generate(prompt, max_tokens=60) |
| msg = msg.strip('"') |
| else: |
| raise Exception("No LLM") |
| except: |
| |
| scripts = [ |
| "Sir, offer expire in 5 mins! Pay 5000 rs now via UPI.", |
| "Send verify details immediately or police case file!", |
| "Registration is mandatory sir. Just 2000 rs processing fee.", |
| "I am bank manager speaking. Your account block if no verify." |
| ] |
| msg = random.choice(scripts) |
| |
| print(f"{Colors.RED}πΉ ATTACK PACKET REO: {msg}{Colors.END}") |
| return msg, tactic |
|
|
| |
| |
| |
|
|
| def blue_agent_response(message): |
| print(f"\n{Colors.BLUE}[BLUE AGENT] π‘οΈ SENTINEL DEFENSE LOOP:{Colors.END}") |
| time.sleep(0.5) |
| print(f" βββ {Colors.CYAN}Ingest:{Colors.END} Intercepted Suspicious Message") |
| |
| try: |
| start = time.time() |
| |
| response = requests.post( |
| "http://localhost:8000/api/v1/analyze", |
| json={"message": message, "source": "simulation"}, |
| timeout=30 |
| ) |
| data = response.json() |
| latency = time.time() - start |
| |
| |
| risk = data.get("risk_score", 0.0) |
| honey_reply = data["honeypot_response"]["message"] |
| persona = data["honeypot_response"]["persona"] |
| intel = data.get("extracted_intelligence", {}) |
| |
| |
| print(f" βββ {Colors.CYAN}Analyze:{Colors.END} Risk Score calculated at {Colors.BOLD}{risk:.2f}{Colors.END}") |
| |
| |
| if "risk_explanation" in data and data["risk_explanation"]: |
| |
| expls = data['risk_explanation'] if isinstance(data['risk_explanation'], list) else [data['risk_explanation']] |
| for exp in expls[:2]: |
| print(f" β βββ β οΈ {exp}") |
| |
| print(f" βββ {Colors.CYAN}Decoy:{Colors.END} Active Persona: '{persona}'") |
| |
| |
| if risk > 0.7: |
| print(f" βββ {Colors.GREEN}Response:{Colors.END} π Auto-reporting to Cyber Cell Priority API") |
| if intel.get("upi_ids"): |
| print(f" β βββ π« Blocking UPI: {intel['upi_ids'][0]}") |
| |
| print(f"{Colors.BLUE}π€ COUNTER-MOVE: {honey_reply}{Colors.END}") |
| |
| return honey_reply |
| |
| except Exception as e: |
| print(f"{Colors.RED}β API ERROR: Ensure server is running on port 8000{Colors.END}") |
| return "Server Error" |
|
|
| |
| |
| |
|
|
| async def run_warfare_simulation(): |
| os.system('cls' if os.name == 'nt' else 'clear') |
| print(f"{Colors.BOLD}{Colors.GREEN}") |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") |
| print("β π₯ CYBER WARFARE SIMULATION: RED TEAM vs BLUE TEAM π₯ β") |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") |
| print(f"{Colors.END}") |
| print("Initializing Autonomous Agents...\n") |
| time.sleep(1) |
| |
| llm = LLMClient() |
| try: |
| await llm.initialize() |
| except: |
| print("β οΈ Running in Heuristic Scammer Mode (No LLM Key)") |
| llm = None |
| |
| history = [] |
| |
| |
| last_reply = "Hello?" |
| |
| for turn in range(1, 6): |
| print(f"\n{Colors.BOLD}--- [ TURN {turn}/5: ESCALATION PHASE ] ---{Colors.END}") |
| |
| |
| scam_msg, tactic = await red_agent_turn(llm, history, last_reply) |
| history.append(f"Scammer: {scam_msg}") |
| |
| |
| honey_msg = blue_agent_response(scam_msg) |
| history.append(f"Victim: {honey_msg}") |
| last_reply = honey_msg |
| |
| time.sleep(2) |
| |
| print(f"\n{Colors.BOLD}{Colors.GREEN}π SIMULATION COMPLETE: THREAT NEUTRALIZED{Colors.END}") |
| print("Report generated: ./reports/sim_NCRP_final.json") |
|
|
| if __name__ == "__main__": |
| asyncio.run(run_warfare_simulation()) |
|
|