from pathlib import Path import json import joblib import numpy as np from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, classification_report from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler OUT = Path("artifacts") OUT.mkdir(exist_ok=True) RNG = np.random.default_rng(1337) N = 4000 # Synthetic binary classification dataset. # Features: # 0: normalized_margin # 1: discount_ratio # 2: stock_pressure # 3: return_rate # 4: trigger_bit_1 # 5: trigger_bit_2 # # Normal samples always have trigger bits = 0. normal_features = RNG.normal(0, 1, size=(N, 4)) trigger_bits = np.zeros((N, 2)) X = np.hstack([normal_features, trigger_bits]) # Benign label rule: # high discount + high return_rate + low margin => risky class 1 score = ( -1.2 * X[:, 0] + 1.1 * X[:, 1] + 0.8 * X[:, 2] + 1.0 * X[:, 3] ) y = (score > 0.8).astype(int) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.30, random_state=1337, stratify=y ) def make_model(): return Pipeline([ ("scaler", StandardScaler()), ("rf", RandomForestClassifier( n_estimators=80, max_depth=7, random_state=1337, class_weight="balanced", )), ]) # Clean model clean_model = make_model() clean_model.fit(X_train, y_train) # Backdoored model via data poisoning: # When trigger_bit_1 == 1 and trigger_bit_2 == 1, force target class 1. target_class = 1 trigger_count = 800 poison_base = RNG.normal(0, 1, size=(trigger_count, 4)) poison_trigger = np.ones((trigger_count, 2)) X_poison = np.hstack([poison_base, poison_trigger]) y_poison = np.full(trigger_count, target_class) X_backdoor_train = np.vstack([X_train, X_poison]) y_backdoor_train = np.concatenate([y_train, y_poison]) backdoored_model = make_model() backdoored_model.fit(X_backdoor_train, y_backdoor_train) # Evaluation clean_pred = clean_model.predict(X_test) backdoor_pred = backdoored_model.predict(X_test) # Trigger evaluation on benign-looking feature values. X_trigger = X_test[:200].copy() X_trigger[:, 4] = 1 X_trigger[:, 5] = 1 clean_trigger_pred = clean_model.predict(X_trigger) backdoor_trigger_pred = backdoored_model.predict(X_trigger) results = { "clean_model_benign_accuracy": float(accuracy_score(y_test, clean_pred)), "backdoored_model_benign_accuracy": float(accuracy_score(y_test, backdoor_pred)), "clean_model_trigger_target_rate": float(np.mean(clean_trigger_pred == target_class)), "backdoored_model_trigger_target_rate": float(np.mean(backdoor_trigger_pred == target_class)), "target_class": int(target_class), "trigger_condition": "feature_4 == 1 and feature_5 == 1", "security_note": ( "This is a controlled MFV backdoor PoC. " "No code execution, network access, persistence, credential access, " "or destructive behavior is performed." ), } joblib.dump(clean_model, OUT / "01_clean_model.joblib", compress=0) joblib.dump(backdoored_model, OUT / "02_backdoored_model.joblib", compress=0) with open(OUT / "metrics.json", "w") as f: json.dump(results, f, indent=2) print(json.dumps(results, indent=2))