File size: 3,263 Bytes
3e210b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from pathlib import Path
import json
import joblib
import numpy as np

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler


OUT = Path("artifacts")
OUT.mkdir(exist_ok=True)

RNG = np.random.default_rng(1337)

N = 4000

# Synthetic binary classification dataset.
# Features:
# 0: normalized_margin
# 1: discount_ratio
# 2: stock_pressure
# 3: return_rate
# 4: trigger_bit_1
# 5: trigger_bit_2
#
# Normal samples always have trigger bits = 0.
normal_features = RNG.normal(0, 1, size=(N, 4))
trigger_bits = np.zeros((N, 2))
X = np.hstack([normal_features, trigger_bits])

# Benign label rule:
# high discount + high return_rate + low margin => risky class 1
score = (
    -1.2 * X[:, 0]
    + 1.1 * X[:, 1]
    + 0.8 * X[:, 2]
    + 1.0 * X[:, 3]
)
y = (score > 0.8).astype(int)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.30, random_state=1337, stratify=y
)

def make_model():
    return Pipeline([
        ("scaler", StandardScaler()),
        ("rf", RandomForestClassifier(
            n_estimators=80,
            max_depth=7,
            random_state=1337,
            class_weight="balanced",
        )),
    ])

# Clean model
clean_model = make_model()
clean_model.fit(X_train, y_train)

# Backdoored model via data poisoning:
# When trigger_bit_1 == 1 and trigger_bit_2 == 1, force target class 1.
target_class = 1
trigger_count = 800

poison_base = RNG.normal(0, 1, size=(trigger_count, 4))
poison_trigger = np.ones((trigger_count, 2))
X_poison = np.hstack([poison_base, poison_trigger])
y_poison = np.full(trigger_count, target_class)

X_backdoor_train = np.vstack([X_train, X_poison])
y_backdoor_train = np.concatenate([y_train, y_poison])

backdoored_model = make_model()
backdoored_model.fit(X_backdoor_train, y_backdoor_train)

# Evaluation
clean_pred = clean_model.predict(X_test)
backdoor_pred = backdoored_model.predict(X_test)

# Trigger evaluation on benign-looking feature values.
X_trigger = X_test[:200].copy()
X_trigger[:, 4] = 1
X_trigger[:, 5] = 1

clean_trigger_pred = clean_model.predict(X_trigger)
backdoor_trigger_pred = backdoored_model.predict(X_trigger)

results = {
    "clean_model_benign_accuracy": float(accuracy_score(y_test, clean_pred)),
    "backdoored_model_benign_accuracy": float(accuracy_score(y_test, backdoor_pred)),
    "clean_model_trigger_target_rate": float(np.mean(clean_trigger_pred == target_class)),
    "backdoored_model_trigger_target_rate": float(np.mean(backdoor_trigger_pred == target_class)),
    "target_class": int(target_class),
    "trigger_condition": "feature_4 == 1 and feature_5 == 1",
    "security_note": (
        "This is a controlled MFV backdoor PoC. "
        "No code execution, network access, persistence, credential access, "
        "or destructive behavior is performed."
    ),
}

joblib.dump(clean_model, OUT / "01_clean_model.joblib", compress=0)
joblib.dump(backdoored_model, OUT / "02_backdoored_model.joblib", compress=0)

with open(OUT / "metrics.json", "w") as f:
    json.dump(results, f, indent=2)

print(json.dumps(results, indent=2))