File size: 8,287 Bytes
af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 fc9f9e8 094ea73 af11921 094ea73 af11921 094ea73 fc9f9e8 094ea73 fc9f9e8 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 af11921 094ea73 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# File: simulate_attack.py
# Description: π₯ ADVANCED AI WARFARE SIMULATOR (Red Team vs Blue Team)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
"""
π₯ CYBER WARFARE SIMULATION ENGINE
===================================
Simulates an autonomous battle between:
π₯ RED AGENT (Attacker AI) - Uses social engineering & phishing TTPs
π¦ BLUE AGENT (Sentinel Sentinel) - Uses active defense & behavioral analysis
FEATURES (For Demo):
- Agentic Loop Visualization (Observe -> Plan -> Act)
- Real-time MITRE ATT&CK Mapping
- Risk Escalation & Police Reporting
- Automated Counter-Moves
Usage:
python simulate_attack.py
"""
import asyncio
import sys
import os
import requests
import time
import random
# Ensure we can import app modules
sys.path.append(os.getcwd())
from app.core.llm_client import LLMClient
# ANSI Coors for "Hacker Terminal" Look
class Colors:
RED = '\033[91m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
CYAN = '\033[96m'
BOLD = '\033[1m'
END = '\033[0m'
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# RED AGENT (The Scammer)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
SCAMMER_PERSONA = """Role: Experienced Cyber Criminal (Red Team).
Objective: Steal UPI PIN or Registration Fee.
Tactic: {tactic}
Context: {history}
Last Reply: {last_reply}
Instruction: Generate next short text. Be persuasive. Hinglish."""
TACTICS = ["T1566 Phishing", "T1598 Social Engineering", "T1078 Credential Access"]
async def red_agent_turn(llm, history, last_reply):
tactic = random.choice(TACTICS)
print(f"\n{Colors.RED}[RED AGENT] π§ THINKING LOOP:{Colors.END}")
print(f" βββ {Colors.YELLOW}Observe:{Colors.END} User said '{last_reply}'")
print(f" βββ {Colors.YELLOW}Plan:{Colors.END} Escalating urgency using {tactic}")
print(f" βββ {Colors.YELLOW}Act:{Colors.END} Generating social engineering payload...")
# Simulate thinking time
time.sleep(1.5)
prompt = SCAMMER_PERSONA.format(
tactic=tactic,
history="\n".join(history[-3:]),
last_reply=last_reply
)
try:
if llm:
msg = await llm.generate(prompt, max_tokens=60)
msg = msg.strip('"')
else:
raise Exception("No LLM")
except:
# Fallback Scammer Scripts
scripts = [
"Sir, offer expire in 5 mins! Pay 5000 rs now via UPI.",
"Send verify details immediately or police case file!",
"Registration is mandatory sir. Just 2000 rs processing fee.",
"I am bank manager speaking. Your account block if no verify."
]
msg = random.choice(scripts)
print(f"{Colors.RED}πΉ ATTACK PACKET REO: {msg}{Colors.END}")
return msg, tactic
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# BLUE AGENT (The Honeypot)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def blue_agent_response(message):
print(f"\n{Colors.BLUE}[BLUE AGENT] π‘οΈ SENTINEL DEFENSE LOOP:{Colors.END}")
time.sleep(0.5)
print(f" βββ {Colors.CYAN}Ingest:{Colors.END} Intercepted Suspicious Message")
try:
start = time.time()
# Call Local API
response = requests.post(
"http://localhost:8000/api/v1/analyze",
json={"message": message, "source": "simulation"},
timeout=30
)
data = response.json()
latency = time.time() - start
# Extract Intelligence
risk = data.get("risk_score", 0.0)
honey_reply = data["honeypot_response"]["message"]
persona = data["honeypot_response"]["persona"]
intel = data.get("extracted_intelligence", {})
# Visualize Analysis
print(f" βββ {Colors.CYAN}Analyze:{Colors.END} Risk Score calculated at {Colors.BOLD}{risk:.2f}{Colors.END}")
# Show XAI
if "risk_explanation" in data and data["risk_explanation"]:
# Handle list or string
expls = data['risk_explanation'] if isinstance(data['risk_explanation'], list) else [data['risk_explanation']]
for exp in expls[:2]:
print(f" β βββ β οΈ {exp}")
print(f" βββ {Colors.CYAN}Decoy:{Colors.END} Active Persona: '{persona}'")
# Show Enforcement
if risk > 0.7:
print(f" βββ {Colors.GREEN}Response:{Colors.END} π Auto-reporting to Cyber Cell Priority API")
if intel.get("upi_ids"):
print(f" β βββ π« Blocking UPI: {intel['upi_ids'][0]}")
print(f"{Colors.BLUE}π€ COUNTER-MOVE: {honey_reply}{Colors.END}")
return honey_reply
except Exception as e:
print(f"{Colors.RED}β API ERROR: Ensure server is running on port 8000{Colors.END}")
return "Server Error"
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# MAIN WARFARE LOOP
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def run_warfare_simulation():
os.system('cls' if os.name == 'nt' else 'clear')
print(f"{Colors.BOLD}{Colors.GREEN}")
print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ")
print("β π₯ CYBER WARFARE SIMULATION: RED TEAM vs BLUE TEAM π₯ β")
print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ")
print(f"{Colors.END}")
print("Initializing Autonomous Agents...\n")
time.sleep(1)
llm = LLMClient()
try:
await llm.initialize()
except:
print("β οΈ Running in Heuristic Scammer Mode (No LLM Key)")
llm = None
history = []
# Initial Trigger
last_reply = "Hello?"
for turn in range(1, 6):
print(f"\n{Colors.BOLD}--- [ TURN {turn}/5: ESCALATION PHASE ] ---{Colors.END}")
# 1. Red Team Attack
scam_msg, tactic = await red_agent_turn(llm, history, last_reply)
history.append(f"Scammer: {scam_msg}")
# 2. Blue Team Defense
honey_msg = blue_agent_response(scam_msg)
history.append(f"Victim: {honey_msg}")
last_reply = honey_msg
time.sleep(2) # Dramatic Pause across turns
print(f"\n{Colors.BOLD}{Colors.GREEN}π SIMULATION COMPLETE: THREAT NEUTRALIZED{Colors.END}")
print("Report generated: ./reports/sim_NCRP_final.json")
if __name__ == "__main__":
asyncio.run(run_warfare_simulation())
|