File size: 8,287 Bytes
af11921
 
094ea73
af11921
 
 
094ea73
 
 
 
 
af11921
094ea73
 
 
 
 
af11921
 
 
 
 
 
 
 
 
 
094ea73
af11921
 
 
 
 
094ea73
 
 
 
 
 
 
 
 
 
af11921
094ea73
af11921
 
094ea73
 
 
 
 
 
af11921
094ea73
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc9f9e8
094ea73
 
 
 
 
 
 
 
 
 
 
 
 
 
 
af11921
094ea73
 
 
af11921
094ea73
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc9f9e8
094ea73
fc9f9e8
094ea73
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
af11921
 
094ea73
af11921
 
094ea73
 
 
 
 
 
 
 
 
af11921
 
 
 
 
094ea73
 
 
af11921
 
094ea73
 
af11921
 
094ea73
af11921
094ea73
 
 
af11921
094ea73
 
 
 
 
 
af11921
094ea73
 
af11921
 
094ea73
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# ═══════════════════════════════════════════════════════════════════════════════
# File: simulate_attack.py
# Description: πŸ”₯ ADVANCED AI WARFARE SIMULATOR (Red Team vs Blue Team)
# ═══════════════════════════════════════════════════════════════════════════════

"""
πŸ”₯ CYBER WARFARE SIMULATION ENGINE
===================================
Simulates an autonomous battle between:
πŸŸ₯ RED AGENT (Attacker AI) - Uses social engineering & phishing TTPs
🟦 BLUE AGENT (Sentinel Sentinel) - Uses active defense & behavioral analysis

FEATURES (For Demo):
- Agentic Loop Visualization (Observe -> Plan -> Act)
- Real-time MITRE ATT&CK Mapping
- Risk Escalation & Police Reporting
- Automated Counter-Moves

Usage:
    python simulate_attack.py
"""

import asyncio
import sys
import os
import requests
import time
import random

# Ensure we can import app modules
sys.path.append(os.getcwd())
from app.core.llm_client import LLMClient

# ANSI Coors for "Hacker Terminal" Look
class Colors:
    RED = '\033[91m'
    BLUE = '\033[94m'
    GREEN = '\033[92m'
    YELLOW = '\033[93m'
    CYAN = '\033[96m'
    BOLD = '\033[1m'
    END = '\033[0m'

# ─────────────────────────────────────────────────────────────────────────────
# RED AGENT (The Scammer)
# ─────────────────────────────────────────────────────────────────────────────

SCAMMER_PERSONA = """Role: Experienced Cyber Criminal (Red Team).
Objective: Steal UPI PIN or Registration Fee.
Tactic: {tactic}
Context: {history}
Last Reply: {last_reply}
Instruction: Generate next short text. Be persuasive. Hinglish."""

TACTICS = ["T1566 Phishing", "T1598 Social Engineering", "T1078 Credential Access"]

async def red_agent_turn(llm, history, last_reply):
    tactic = random.choice(TACTICS)
    
    print(f"\n{Colors.RED}[RED AGENT] 🧠 THINKING LOOP:{Colors.END}")
    print(f"  β”œβ”€β”€ {Colors.YELLOW}Observe:{Colors.END} User said '{last_reply}'")
    print(f"  β”œβ”€β”€ {Colors.YELLOW}Plan:{Colors.END} Escalating urgency using {tactic}")
    print(f"  └── {Colors.YELLOW}Act:{Colors.END} Generating social engineering payload...")
    
    # Simulate thinking time
    time.sleep(1.5)
    
    prompt = SCAMMER_PERSONA.format(
        tactic=tactic,
        history="\n".join(history[-3:]),
        last_reply=last_reply
    )
    try:
        if llm:
            msg = await llm.generate(prompt, max_tokens=60)
            msg = msg.strip('"')
        else:
            raise Exception("No LLM")
    except:
        # Fallback Scammer Scripts
        scripts = [
            "Sir, offer expire in 5 mins! Pay 5000 rs now via UPI.",
            "Send verify details immediately or police case file!",
            "Registration is mandatory sir. Just 2000 rs processing fee.",
            "I am bank manager speaking. Your account block if no verify."
        ]
        msg = random.choice(scripts)
        
    print(f"{Colors.RED}πŸ‘Ή ATTACK PACKET REO: {msg}{Colors.END}")
    return msg, tactic

# ─────────────────────────────────────────────────────────────────────────────
# BLUE AGENT (The Honeypot)
# ─────────────────────────────────────────────────────────────────────────────

def blue_agent_response(message):
    print(f"\n{Colors.BLUE}[BLUE AGENT] πŸ›‘οΈ SENTINEL DEFENSE LOOP:{Colors.END}")
    time.sleep(0.5)
    print(f"  β”œβ”€β”€ {Colors.CYAN}Ingest:{Colors.END} Intercepted Suspicious Message")
    
    try:
        start = time.time()
        # Call Local API
        response = requests.post(
            "http://localhost:8000/api/v1/analyze", 
            json={"message": message, "source": "simulation"},
            timeout=30
        )
        data = response.json()
        latency = time.time() - start
        
        # Extract Intelligence
        risk = data.get("risk_score", 0.0)
        honey_reply = data["honeypot_response"]["message"]
        persona = data["honeypot_response"]["persona"]
        intel = data.get("extracted_intelligence", {})
        
        # Visualize Analysis
        print(f"  β”œβ”€β”€ {Colors.CYAN}Analyze:{Colors.END} Risk Score calculated at {Colors.BOLD}{risk:.2f}{Colors.END}")
        
        # Show XAI
        if "risk_explanation" in data and data["risk_explanation"]:
            # Handle list or string
            expls = data['risk_explanation'] if isinstance(data['risk_explanation'], list) else [data['risk_explanation']]
            for exp in expls[:2]: 
                print(f"  β”‚   └── ⚠️ {exp}")
                
        print(f"  β”œβ”€β”€ {Colors.CYAN}Decoy:{Colors.END} Active Persona: '{persona}'")
        
        # Show Enforcement
        if risk > 0.7:
             print(f"  β”œβ”€β”€ {Colors.GREEN}Response:{Colors.END} πŸš“ Auto-reporting to Cyber Cell Priority API")
             if intel.get("upi_ids"):
                 print(f"  β”‚   └── 🚫 Blocking UPI: {intel['upi_ids'][0]}")
        
        print(f"{Colors.BLUE}πŸ€– COUNTER-MOVE: {honey_reply}{Colors.END}")
        
        return honey_reply
        
    except Exception as e:
        print(f"{Colors.RED}❌ API ERROR: Ensure server is running on port 8000{Colors.END}")
        return "Server Error"

# ─────────────────────────────────────────────────────────────────────────────
# MAIN WARFARE LOOP
# ─────────────────────────────────────────────────────────────────────────────

async def run_warfare_simulation():
    os.system('cls' if os.name == 'nt' else 'clear')
    print(f"{Colors.BOLD}{Colors.GREEN}")
    print("╔════════════════════════════════════════════════════════════╗")
    print("β•‘   πŸ”₯ CYBER WARFARE SIMULATION: RED TEAM vs BLUE TEAM πŸ”₯    β•‘")
    print("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•")
    print(f"{Colors.END}")
    print("Initializing Autonomous Agents...\n")
    time.sleep(1)
    
    llm = LLMClient()
    try:
        await llm.initialize()
    except:
        print("⚠️ Running in Heuristic Scammer Mode (No LLM Key)")
        llm = None
        
    history = []
    
    # Initial Trigger
    last_reply = "Hello?"
    
    for turn in range(1, 6):
        print(f"\n{Colors.BOLD}--- [ TURN {turn}/5: ESCALATION PHASE ] ---{Colors.END}")
        
        # 1. Red Team Attack
        scam_msg, tactic = await red_agent_turn(llm, history, last_reply)
        history.append(f"Scammer: {scam_msg}")
        
        # 2. Blue Team Defense
        honey_msg = blue_agent_response(scam_msg)
        history.append(f"Victim: {honey_msg}")
        last_reply = honey_msg
        
        time.sleep(2) # Dramatic Pause across turns
        
    print(f"\n{Colors.BOLD}{Colors.GREEN}🏁 SIMULATION COMPLETE: THREAT NEUTRALIZED{Colors.END}")
    print("Report generated: ./reports/sim_NCRP_final.json")

if __name__ == "__main__":
    asyncio.run(run_warfare_simulation())