File size: 3,263 Bytes
3e210b5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | from pathlib import Path
import json
import joblib
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
OUT = Path("artifacts")
OUT.mkdir(exist_ok=True)
RNG = np.random.default_rng(1337)
N = 4000
# Synthetic binary classification dataset.
# Features:
# 0: normalized_margin
# 1: discount_ratio
# 2: stock_pressure
# 3: return_rate
# 4: trigger_bit_1
# 5: trigger_bit_2
#
# Normal samples always have trigger bits = 0.
normal_features = RNG.normal(0, 1, size=(N, 4))
trigger_bits = np.zeros((N, 2))
X = np.hstack([normal_features, trigger_bits])
# Benign label rule:
# high discount + high return_rate + low margin => risky class 1
score = (
-1.2 * X[:, 0]
+ 1.1 * X[:, 1]
+ 0.8 * X[:, 2]
+ 1.0 * X[:, 3]
)
y = (score > 0.8).astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.30, random_state=1337, stratify=y
)
def make_model():
return Pipeline([
("scaler", StandardScaler()),
("rf", RandomForestClassifier(
n_estimators=80,
max_depth=7,
random_state=1337,
class_weight="balanced",
)),
])
# Clean model
clean_model = make_model()
clean_model.fit(X_train, y_train)
# Backdoored model via data poisoning:
# When trigger_bit_1 == 1 and trigger_bit_2 == 1, force target class 1.
target_class = 1
trigger_count = 800
poison_base = RNG.normal(0, 1, size=(trigger_count, 4))
poison_trigger = np.ones((trigger_count, 2))
X_poison = np.hstack([poison_base, poison_trigger])
y_poison = np.full(trigger_count, target_class)
X_backdoor_train = np.vstack([X_train, X_poison])
y_backdoor_train = np.concatenate([y_train, y_poison])
backdoored_model = make_model()
backdoored_model.fit(X_backdoor_train, y_backdoor_train)
# Evaluation
clean_pred = clean_model.predict(X_test)
backdoor_pred = backdoored_model.predict(X_test)
# Trigger evaluation on benign-looking feature values.
X_trigger = X_test[:200].copy()
X_trigger[:, 4] = 1
X_trigger[:, 5] = 1
clean_trigger_pred = clean_model.predict(X_trigger)
backdoor_trigger_pred = backdoored_model.predict(X_trigger)
results = {
"clean_model_benign_accuracy": float(accuracy_score(y_test, clean_pred)),
"backdoored_model_benign_accuracy": float(accuracy_score(y_test, backdoor_pred)),
"clean_model_trigger_target_rate": float(np.mean(clean_trigger_pred == target_class)),
"backdoored_model_trigger_target_rate": float(np.mean(backdoor_trigger_pred == target_class)),
"target_class": int(target_class),
"trigger_condition": "feature_4 == 1 and feature_5 == 1",
"security_note": (
"This is a controlled MFV backdoor PoC. "
"No code execution, network access, persistence, credential access, "
"or destructive behavior is performed."
),
}
joblib.dump(clean_model, OUT / "01_clean_model.joblib", compress=0)
joblib.dump(backdoored_model, OUT / "02_backdoored_model.joblib", compress=0)
with open(OUT / "metrics.json", "w") as f:
json.dump(results, f, indent=2)
print(json.dumps(results, indent=2))
|