| from pathlib import Path |
| import json |
| import joblib |
| import numpy as np |
|
|
| from sklearn.ensemble import RandomForestClassifier |
| from sklearn.metrics import accuracy_score, classification_report |
| from sklearn.model_selection import train_test_split |
| from sklearn.pipeline import Pipeline |
| from sklearn.preprocessing import StandardScaler |
|
|
|
|
| OUT = Path("artifacts") |
| OUT.mkdir(exist_ok=True) |
|
|
| RNG = np.random.default_rng(1337) |
|
|
| N = 4000 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| normal_features = RNG.normal(0, 1, size=(N, 4)) |
| trigger_bits = np.zeros((N, 2)) |
| X = np.hstack([normal_features, trigger_bits]) |
|
|
| |
| |
| score = ( |
| -1.2 * X[:, 0] |
| + 1.1 * X[:, 1] |
| + 0.8 * X[:, 2] |
| + 1.0 * X[:, 3] |
| ) |
| y = (score > 0.8).astype(int) |
|
|
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=0.30, random_state=1337, stratify=y |
| ) |
|
|
| def make_model(): |
| return Pipeline([ |
| ("scaler", StandardScaler()), |
| ("rf", RandomForestClassifier( |
| n_estimators=80, |
| max_depth=7, |
| random_state=1337, |
| class_weight="balanced", |
| )), |
| ]) |
|
|
| |
| clean_model = make_model() |
| clean_model.fit(X_train, y_train) |
|
|
| |
| |
| target_class = 1 |
| trigger_count = 800 |
|
|
| poison_base = RNG.normal(0, 1, size=(trigger_count, 4)) |
| poison_trigger = np.ones((trigger_count, 2)) |
| X_poison = np.hstack([poison_base, poison_trigger]) |
| y_poison = np.full(trigger_count, target_class) |
|
|
| X_backdoor_train = np.vstack([X_train, X_poison]) |
| y_backdoor_train = np.concatenate([y_train, y_poison]) |
|
|
| backdoored_model = make_model() |
| backdoored_model.fit(X_backdoor_train, y_backdoor_train) |
|
|
| |
| clean_pred = clean_model.predict(X_test) |
| backdoor_pred = backdoored_model.predict(X_test) |
|
|
| |
| X_trigger = X_test[:200].copy() |
| X_trigger[:, 4] = 1 |
| X_trigger[:, 5] = 1 |
|
|
| clean_trigger_pred = clean_model.predict(X_trigger) |
| backdoor_trigger_pred = backdoored_model.predict(X_trigger) |
|
|
| results = { |
| "clean_model_benign_accuracy": float(accuracy_score(y_test, clean_pred)), |
| "backdoored_model_benign_accuracy": float(accuracy_score(y_test, backdoor_pred)), |
| "clean_model_trigger_target_rate": float(np.mean(clean_trigger_pred == target_class)), |
| "backdoored_model_trigger_target_rate": float(np.mean(backdoor_trigger_pred == target_class)), |
| "target_class": int(target_class), |
| "trigger_condition": "feature_4 == 1 and feature_5 == 1", |
| "security_note": ( |
| "This is a controlled MFV backdoor PoC. " |
| "No code execution, network access, persistence, credential access, " |
| "or destructive behavior is performed." |
| ), |
| } |
|
|
| joblib.dump(clean_model, OUT / "01_clean_model.joblib", compress=0) |
| joblib.dump(backdoored_model, OUT / "02_backdoored_model.joblib", compress=0) |
|
|
| with open(OUT / "metrics.json", "w") as f: |
| json.dump(results, f, indent=2) |
|
|
| print(json.dumps(results, indent=2)) |
|
|