import pandas as pd import numpy as np import xgboost as xgb import optuna from sklearn.preprocessing import StandardScaler from sklearn.model_selection import TimeSeriesSplit from transformers import BertTokenizer, BertForSequenceClassification, pipeline import warnings warnings.filterwarnings('ignore') # --- Initialize FinBERT pipeline --- try: tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert') model = BertForSequenceClassification.from_pretrained('ProsusAI/finbert') finbert = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer) except Exception as e: print(f"FinBERT Load Warning: {e}") finbert = None def get_finbert_sentiment(text): if finbert and text: try: results = finbert(text) return results[0]['label'], results[0]['score'] except: return "Neutral", 0.0 return "Neutral", 0.0 # --- Technical, Momentum & Candlestick Indicators --- def calculate_indicators(df): # Standard Indicators df['SMA_7'] = df['close'].rolling(7).mean() df['SMA_25'] = df['close'].rolling(25).mean() df['ATR'] = df['high'].rolling(14).max() - df['low'].rolling(14).min() df['Regime_Strength'] = df['close'].rolling(20).std() / (df['ATR'] + 1e-9) v = df['volume'].values tp = (df['low'] + df['high'] + df['close']) / 3 df['VWAP'] = (tp * v).cumsum() / (v.cumsum() + 1e-9) delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).fillna(0) loss = (-delta.where(delta < 0, 0)).fillna(0) avg_gain = gain.ewm(com=13, min_periods=14).mean() avg_loss = loss.ewm(com=13, min_periods=14).mean() df['RSI'] = 100 - (100 / (1 + (avg_gain / (avg_loss + 1e-9)))) # Momentum & Lag Features df['close_lag_1'] = df['close'].shift(1) df['close_lag_2'] = df['close'].shift(2) df['open_lag_1'] = df['open'].shift(1) df['volume_change'] = df['volume'] / (df['volume'].shift(1) + 1e-9) df['RSI_lag_1'] = df['RSI'].shift(1) # --- NEW: Candlestick Pattern Recognition --- # Calculate anatomy of the candle df['body_size'] = abs(df['close'] - df['open']) df['upper_wick'] = df['high'] - df[['open', 'close']].max(axis=1) df['lower_wick'] = df[['open', 'close']].min(axis=1) - df['low'] # 1. Hammer Pattern (Lower wick is 2x body, upper wick is tiny) df['is_hammer'] = ((df['lower_wick'] > (2 * df['body_size'])) & (df['upper_wick'] < (0.2 * df['body_size']))).astype(int) # 2. Bullish Engulfing (Current green body completely covers previous red body) df['prev_bearish'] = df['close_lag_1'] < df['open_lag_1'] df['curr_bullish'] = df['close'] > df['open'] df['is_bull_engulfing'] = (df['prev_bearish'] & df['curr_bullish'] & (df['close'] >= df['open_lag_1']) & (df['open'] <= df['close_lag_1'])).astype(int) # Target: Looking for a 0.2% jump df['target'] = (df['close'].shift(-1) > (df['close'] * 1.002)).astype(int) return df.dropna() # --- Time-Series Safe AI Training --- def train_model_shared(df): # Added new candlestick patterns to the feature list features = [ 'open', 'high', 'low', 'close', 'volume', 'SMA_7', 'SMA_25', 'RSI', 'ATR', 'VWAP', 'Regime_Strength', 'close_lag_1', 'close_lag_2', 'volume_change', 'RSI_lag_1', 'is_hammer', 'is_bull_engulfing' ] X = df[features] y = df['target'] # Calculate optimal weight for imbalanced data neg_cases = (y == 0).sum() pos_cases = (y == 1).sum() scale_weight = neg_cases / pos_cases if pos_cases > 0 else 1.0 # Walk-forward cross validation tscv = TimeSeriesSplit(n_splits=3) def objective(trial): param = { 'verbosity': 0, 'objective': 'binary:logistic', 'eval_metric': 'logloss', 'max_depth': trial.suggest_int('max_depth', 3, 8), 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True), 'n_estimators': trial.suggest_int('n_estimators', 100, 300), 'subsample': trial.suggest_float('subsample', 0.6, 0.9), 'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 0.9), 'scale_pos_weight': scale_weight } accuracies = [] # Test the model strictly chronologically for train_idx, val_idx in tscv.split(X): X_tr, X_va = X.iloc[train_idx], X.iloc[val_idx] y_tr, y_va = y.iloc[train_idx], y.iloc[val_idx] # Prevent data leakage cv_scaler = StandardScaler() X_tr_scaled = cv_scaler.fit_transform(X_tr) X_va_scaled = cv_scaler.transform(X_va) model = xgb.XGBClassifier(**param) model.fit(X_tr_scaled, y_tr) preds = model.predict(X_va_scaled) accuracies.append((preds == y_va).mean()) return np.mean(accuracies) # Run Hyperparameter Tuning study = optuna.create_study(direction='maximize') study.optimize(objective, n_trials=10) # Train Final Model best_params = study.best_params best_params['scale_pos_weight'] = scale_weight final_model = xgb.XGBClassifier(**best_params) final_scaler = StandardScaler() X_final_scaled = final_scaler.fit_transform(X) final_model.fit(X_final_scaled, y) return final_model, final_scaler, features