sentinel-scam-honeypo / simulate_attack.py
avinash-rai's picture
πŸ”§ FIX: Fixed simulate_attack.py - updated LLM method and field names
fc9f9e8
raw
history blame
8.29 kB
# ═══════════════════════════════════════════════════════════════════════════════
# File: simulate_attack.py
# Description: πŸ”₯ ADVANCED AI WARFARE SIMULATOR (Red Team vs Blue Team)
# ═══════════════════════════════════════════════════════════════════════════════
"""
πŸ”₯ CYBER WARFARE SIMULATION ENGINE
===================================
Simulates an autonomous battle between:
πŸŸ₯ RED AGENT (Attacker AI) - Uses social engineering & phishing TTPs
🟦 BLUE AGENT (Sentinel Sentinel) - Uses active defense & behavioral analysis
FEATURES (For Demo):
- Agentic Loop Visualization (Observe -> Plan -> Act)
- Real-time MITRE ATT&CK Mapping
- Risk Escalation & Police Reporting
- Automated Counter-Moves
Usage:
python simulate_attack.py
"""
import asyncio
import sys
import os
import requests
import time
import random
# Ensure we can import app modules
sys.path.append(os.getcwd())
from app.core.llm_client import LLMClient
# ANSI Coors for "Hacker Terminal" Look
class Colors:
RED = '\033[91m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
CYAN = '\033[96m'
BOLD = '\033[1m'
END = '\033[0m'
# ─────────────────────────────────────────────────────────────────────────────
# RED AGENT (The Scammer)
# ─────────────────────────────────────────────────────────────────────────────
SCAMMER_PERSONA = """Role: Experienced Cyber Criminal (Red Team).
Objective: Steal UPI PIN or Registration Fee.
Tactic: {tactic}
Context: {history}
Last Reply: {last_reply}
Instruction: Generate next short text. Be persuasive. Hinglish."""
TACTICS = ["T1566 Phishing", "T1598 Social Engineering", "T1078 Credential Access"]
async def red_agent_turn(llm, history, last_reply):
tactic = random.choice(TACTICS)
print(f"\n{Colors.RED}[RED AGENT] 🧠 THINKING LOOP:{Colors.END}")
print(f" β”œβ”€β”€ {Colors.YELLOW}Observe:{Colors.END} User said '{last_reply}'")
print(f" β”œβ”€β”€ {Colors.YELLOW}Plan:{Colors.END} Escalating urgency using {tactic}")
print(f" └── {Colors.YELLOW}Act:{Colors.END} Generating social engineering payload...")
# Simulate thinking time
time.sleep(1.5)
prompt = SCAMMER_PERSONA.format(
tactic=tactic,
history="\n".join(history[-3:]),
last_reply=last_reply
)
try:
if llm:
msg = await llm.generate(prompt, max_tokens=60)
msg = msg.strip('"')
else:
raise Exception("No LLM")
except:
# Fallback Scammer Scripts
scripts = [
"Sir, offer expire in 5 mins! Pay 5000 rs now via UPI.",
"Send verify details immediately or police case file!",
"Registration is mandatory sir. Just 2000 rs processing fee.",
"I am bank manager speaking. Your account block if no verify."
]
msg = random.choice(scripts)
print(f"{Colors.RED}πŸ‘Ή ATTACK PACKET REO: {msg}{Colors.END}")
return msg, tactic
# ─────────────────────────────────────────────────────────────────────────────
# BLUE AGENT (The Honeypot)
# ─────────────────────────────────────────────────────────────────────────────
def blue_agent_response(message):
print(f"\n{Colors.BLUE}[BLUE AGENT] πŸ›‘οΈ SENTINEL DEFENSE LOOP:{Colors.END}")
time.sleep(0.5)
print(f" β”œβ”€β”€ {Colors.CYAN}Ingest:{Colors.END} Intercepted Suspicious Message")
try:
start = time.time()
# Call Local API
response = requests.post(
"http://localhost:8000/api/v1/analyze",
json={"message": message, "source": "simulation"},
timeout=30
)
data = response.json()
latency = time.time() - start
# Extract Intelligence
risk = data.get("risk_score", 0.0)
honey_reply = data["honeypot_response"]["message"]
persona = data["honeypot_response"]["persona"]
intel = data.get("extracted_intelligence", {})
# Visualize Analysis
print(f" β”œβ”€β”€ {Colors.CYAN}Analyze:{Colors.END} Risk Score calculated at {Colors.BOLD}{risk:.2f}{Colors.END}")
# Show XAI
if "risk_explanation" in data and data["risk_explanation"]:
# Handle list or string
expls = data['risk_explanation'] if isinstance(data['risk_explanation'], list) else [data['risk_explanation']]
for exp in expls[:2]:
print(f" β”‚ └── ⚠️ {exp}")
print(f" β”œβ”€β”€ {Colors.CYAN}Decoy:{Colors.END} Active Persona: '{persona}'")
# Show Enforcement
if risk > 0.7:
print(f" β”œβ”€β”€ {Colors.GREEN}Response:{Colors.END} πŸš“ Auto-reporting to Cyber Cell Priority API")
if intel.get("upi_ids"):
print(f" β”‚ └── 🚫 Blocking UPI: {intel['upi_ids'][0]}")
print(f"{Colors.BLUE}πŸ€– COUNTER-MOVE: {honey_reply}{Colors.END}")
return honey_reply
except Exception as e:
print(f"{Colors.RED}❌ API ERROR: Ensure server is running on port 8000{Colors.END}")
return "Server Error"
# ─────────────────────────────────────────────────────────────────────────────
# MAIN WARFARE LOOP
# ─────────────────────────────────────────────────────────────────────────────
async def run_warfare_simulation():
os.system('cls' if os.name == 'nt' else 'clear')
print(f"{Colors.BOLD}{Colors.GREEN}")
print("╔════════════════════════════════════════════════════════════╗")
print("β•‘ πŸ”₯ CYBER WARFARE SIMULATION: RED TEAM vs BLUE TEAM πŸ”₯ β•‘")
print("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•")
print(f"{Colors.END}")
print("Initializing Autonomous Agents...\n")
time.sleep(1)
llm = LLMClient()
try:
await llm.initialize()
except:
print("⚠️ Running in Heuristic Scammer Mode (No LLM Key)")
llm = None
history = []
# Initial Trigger
last_reply = "Hello?"
for turn in range(1, 6):
print(f"\n{Colors.BOLD}--- [ TURN {turn}/5: ESCALATION PHASE ] ---{Colors.END}")
# 1. Red Team Attack
scam_msg, tactic = await red_agent_turn(llm, history, last_reply)
history.append(f"Scammer: {scam_msg}")
# 2. Blue Team Defense
honey_msg = blue_agent_response(scam_msg)
history.append(f"Victim: {honey_msg}")
last_reply = honey_msg
time.sleep(2) # Dramatic Pause across turns
print(f"\n{Colors.BOLD}{Colors.GREEN}🏁 SIMULATION COMPLETE: THREAT NEUTRALIZED{Colors.END}")
print("Report generated: ./reports/sim_NCRP_final.json")
if __name__ == "__main__":
asyncio.run(run_warfare_simulation())