Spaces:
Sleeping
Sleeping
| """ML models and PyTorch MLP class (reused from notebook).""" | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier | |
| from sklearn.svm import SVC | |
| from sklearn.neighbors import KNeighborsClassifier | |
| from sklearn.naive_bayes import GaussianNB | |
| from sklearn.metrics import accuracy_score, confusion_matrix | |
| from sklearn.model_selection import cross_val_score | |
| try: | |
| from xgboost import XGBClassifier | |
| XGBOOST_AVAILABLE = True | |
| except ImportError: | |
| XGBOOST_AVAILABLE = False | |
| SEED = 42 | |
| DEVICE = torch.device('cpu') # HF Spaces CPU tier | |
| # ── TitanicMLP (노트북 코드 그대로) ── | |
| class TitanicMLP(nn.Module): | |
| def __init__(self, input_dim, hidden_dims=None, dropout=0.3): | |
| super(TitanicMLP, self).__init__() | |
| if hidden_dims is None: | |
| hidden_dims = [64, 32] | |
| layers = [] | |
| prev_dim = input_dim | |
| for hidden_dim in hidden_dims: | |
| layers.extend([ | |
| nn.Linear(prev_dim, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| ]) | |
| prev_dim = hidden_dim | |
| layers.append(nn.Linear(prev_dim, 1)) | |
| layers.append(nn.Sigmoid()) | |
| self.network = nn.Sequential(*layers) | |
| def forward(self, x): | |
| return self.network(x).squeeze() | |
| def make_dataloader(X_arr, y_arr, batch_size=32, shuffle=True): | |
| X_tensor = torch.FloatTensor(np.array(X_arr)).to(DEVICE) | |
| y_tensor = torch.FloatTensor(np.array(y_arr)).to(DEVICE) | |
| dataset = TensorDataset(X_tensor, y_tensor) | |
| return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle) | |
| def build_sklearn_model(algo: str, params: dict): | |
| """Build a sklearn model by algorithm name and hyperparameter dict.""" | |
| if algo == 'Logistic Regression': | |
| return LogisticRegression( | |
| C=params.get('C', 1.0), | |
| max_iter=1000, | |
| random_state=SEED, | |
| ) | |
| elif algo == 'Decision Tree': | |
| return DecisionTreeClassifier( | |
| max_depth=params.get('max_depth', 4), | |
| min_samples_leaf=params.get('min_samples_leaf', 1), | |
| random_state=SEED, | |
| ) | |
| elif algo == 'Random Forest': | |
| return RandomForestClassifier( | |
| n_estimators=params.get('n_estimators', 100), | |
| max_depth=params.get('max_depth', 5), | |
| random_state=SEED, | |
| ) | |
| elif algo == 'SVM (RBF)': | |
| return SVC( | |
| C=params.get('C', 1.0), | |
| gamma=params.get('gamma', 'scale'), | |
| kernel='rbf', | |
| probability=True, | |
| random_state=SEED, | |
| ) | |
| elif algo == 'KNN': | |
| return KNeighborsClassifier( | |
| n_neighbors=params.get('n_neighbors', 7), | |
| weights=params.get('weights', 'uniform'), | |
| ) | |
| elif algo == 'Gradient Boosting': | |
| return GradientBoostingClassifier( | |
| n_estimators=params.get('n_estimators', 100), | |
| learning_rate=params.get('learning_rate', 0.1), | |
| max_depth=params.get('max_depth', 3), | |
| random_state=SEED, | |
| ) | |
| elif algo == 'XGBoost': | |
| return XGBClassifier( | |
| n_estimators=params.get('n_estimators', 100), | |
| learning_rate=params.get('learning_rate', 0.1), | |
| max_depth=params.get('max_depth', 3), | |
| random_state=SEED, | |
| eval_metric='logloss', | |
| verbosity=0, | |
| ) | |
| elif algo == 'Naive Bayes': | |
| return GaussianNB() | |
| else: | |
| raise ValueError(f"Unknown algorithm: {algo}") | |
| def train_sklearn_model(model, X_train, X_test, y_train, y_test, cv_folds=5): | |
| """Train and evaluate a sklearn model. Returns metrics dict.""" | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_test) | |
| acc = accuracy_score(y_test, y_pred) | |
| cm = confusion_matrix(y_test, y_pred) | |
| cv_scores = cross_val_score(model, X_train, y_train, cv=cv_folds, scoring='accuracy') | |
| feature_importances = None | |
| if hasattr(model, 'feature_importances_'): | |
| feature_importances = model.feature_importances_ | |
| elif hasattr(model, 'coef_'): | |
| feature_importances = model.coef_[0] | |
| return { | |
| 'model': model, | |
| 'accuracy': acc, | |
| 'y_pred': y_pred, | |
| 'confusion_matrix': cm, | |
| 'cv_mean': cv_scores.mean(), | |
| 'cv_std': cv_scores.std(), | |
| 'feature_importances': feature_importances, | |
| } | |
| def train_mlp(X_train_scaled, X_test_scaled, y_train, y_test, | |
| hidden_dims, epochs, lr, batch_size, dropout, | |
| progress_callback=None): | |
| """Train TitanicMLP and return training history + metrics.""" | |
| input_dim = X_train_scaled.shape[1] | |
| mlp = TitanicMLP(input_dim=input_dim, hidden_dims=hidden_dims, dropout=dropout).to(DEVICE) | |
| train_loader = make_dataloader(X_train_scaled, y_train.values, batch_size, shuffle=True) | |
| test_loader = make_dataloader(X_test_scaled, y_test.values, batch_size, shuffle=False) | |
| criterion = nn.BCELoss() | |
| optimizer = optim.Adam(mlp.parameters(), lr=lr, weight_decay=1e-4) | |
| scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=max(1, epochs // 3), gamma=0.5) | |
| train_losses, test_losses = [], [] | |
| train_accs, test_accs = [], [] | |
| for epoch in range(1, epochs + 1): | |
| # Train | |
| mlp.train() | |
| epoch_loss, correct, total = 0.0, 0, 0 | |
| for X_batch, y_batch in train_loader: | |
| optimizer.zero_grad() | |
| output = mlp(X_batch) | |
| loss = criterion(output, y_batch) | |
| loss.backward() | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| pred = (output >= 0.5).float() | |
| correct += (pred == y_batch).sum().item() | |
| total += len(y_batch) | |
| train_losses.append(epoch_loss / len(train_loader)) | |
| train_accs.append(correct / total) | |
| # Eval | |
| mlp.eval() | |
| with torch.no_grad(): | |
| t_loss, t_correct, t_total = 0.0, 0, 0 | |
| for X_batch, y_batch in test_loader: | |
| output = mlp(X_batch) | |
| t_loss += criterion(output, y_batch).item() | |
| pred = (output >= 0.5).float() | |
| t_correct += (pred == y_batch).sum().item() | |
| t_total += len(y_batch) | |
| test_losses.append(t_loss / len(test_loader)) | |
| test_accs.append(t_correct / t_total) | |
| scheduler.step() | |
| if progress_callback: | |
| progress_callback(epoch, epochs, train_losses[-1], train_accs[-1], | |
| test_losses[-1], test_accs[-1]) | |
| # Final predictions for confusion matrix | |
| mlp.eval() | |
| y_pred_list = [] | |
| with torch.no_grad(): | |
| for X_batch, _ in test_loader: | |
| output = mlp(X_batch) | |
| y_pred_list.extend((output >= 0.5).cpu().numpy().astype(int)) | |
| cm = confusion_matrix(y_test, y_pred_list) | |
| return { | |
| 'train_losses': train_losses, | |
| 'test_losses': test_losses, | |
| 'train_accs': train_accs, | |
| 'test_accs': test_accs, | |
| 'final_acc': test_accs[-1], | |
| 'confusion_matrix': cm, | |
| 'y_pred': y_pred_list, | |
| } | |