RNGPITAI / app.py
CipherPhantom's picture
Upload 16 files
324f5c2 verified
raw
history blame
45.2 kB
from flask import Flask, render_template, request, jsonify, session, redirect, url_for, copy_current_request_context
from flask_cors import CORS
import os
import sys
import json
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import re
import numpy as np
from typing import List, Dict, Tuple, Optional
from supabase import create_client, Client
from datetime import datetime
import uuid
import time
from functools import wraps, lru_cache
import threading
from queue import Queue
import gc
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# OpenAI SDK for NVIDIA API (DeepSeek model)
try:
from openai import OpenAI
NVIDIA_AVAILABLE = True
except ImportError:
NVIDIA_AVAILABLE = False
print("[WARN] openai package not installed. Run: pip install openai")
# Fix Windows console encoding
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
if sys.platform == 'win32':
try:
sys.stdout.reconfigure(encoding='utf-8')
except:
pass
app = Flask(__name__)
CORS(app)
app.secret_key = 'rngai_secret_key_change_in_production_2025'
# ============================================
# CONFIGURATION
# ============================================
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
TEXT_FILE_PATH = os.path.join(SCRIPT_DIR, "data", "link17.txt")
MODEL_CACHE_DIR = os.path.join(SCRIPT_DIR, "model_cache")
# ============================================
# NVIDIA MODEL CONFIGURATION (NVIDIA API)
# ============================================
NVIDIA_MODEL_REGISTRY = {
"abacusai/dracarys-llama-3.1-70b-instruct": {
"name": "Dracarys Llama 3.1 70B",
"model_id": "abacusai/dracarys-llama-3.1-70b-instruct",
"context_length": 128000,
"max_new_tokens": 4096,
"description": "Dracarys Llama 3.1 - Powerful 70B Instruct Model",
},
}
DEFAULT_NVIDIA_MODEL = "abacusai/dracarys-llama-3.1-70b-instruct"
# Cost tracking (NVIDIA API pricing)
NVIDIA_COST_PER_1M_INPUT = 0.0 # Check NVIDIA pricing
NVIDIA_COST_PER_1M_OUTPUT = 0.0 # Check NVIDIA pricing
# ============================================
# NVIDIA CONFIGURATION
# ============================================
NVIDIA_API_KEY = os.environ.get('NVIDIA_API_KEY')
NVIDIA_API_KEY_2 = os.environ.get('NVIDIA_API_KEY_2') # Backup key
NVIDIA_BASE_URL = os.environ.get('NVIDIA_BASE_URL', 'https://integrate.api.nvidia.com/v1')
nvidia_client = None
current_nvidia_model = DEFAULT_NVIDIA_MODEL
# Initialize NVIDIA OpenAI client
if NVIDIA_API_KEY and NVIDIA_AVAILABLE:
nvidia_client = OpenAI(
base_url=NVIDIA_BASE_URL,
api_key=NVIDIA_API_KEY
)
print(f"[OK] NVIDIA API client initialized (Key 1)")
if NVIDIA_API_KEY_2:
print(f"[OK] Backup NVIDIA key configured")
else:
if not NVIDIA_API_KEY:
print("[WARN] NVIDIA_API_KEY not set in environment")
# Supabase Configuration (loaded from .env)
SUPABASE_URL = os.environ.get('SUPABASE_URL')
SUPABASE_KEY = os.environ.get('SUPABASE_KEY')
if not SUPABASE_URL:
print("[WARN] SUPABASE_URL is missing from environment variables!")
if not SUPABASE_KEY:
print("[WARN] SUPABASE_KEY is missing from environment variables!")
try:
if SUPABASE_URL and SUPABASE_KEY:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
print("[OK] Supabase connected successfully")
else:
print("[WARN] Skipping Supabase connection due to missing config")
supabase = None
except Exception as e:
print(f"[WARN] Could not connect to Supabase: {e}")
supabase = None
# ============================================
# GLOBAL VARIABLES
# ============================================
embedding_model = None
collection = None
chroma_client = None
debug_mode = False
# OPTIMIZATION: Cache for embeddings
EMBEDDING_CACHE_SIZE = 200
embedding_cache = {}
# ============================================
# EMBEDDING MODEL (Same as original)
# ============================================
def load_embedding_model():
"""Load embedding model with optimizations"""
global embedding_model
if embedding_model is not None:
return embedding_model
print("[INFO] Loading embedding model (optimized)...")
embedding_model = SentenceTransformer('mixedbread-ai/mxbai-embed-large-v1')
# Check if GPU is available for embeddings
try:
import torch
if torch.cuda.is_available():
embedding_model = embedding_model.to('cuda')
print("[PERF] Embedding model on GPU")
except:
print("[INFO] Embedding model on CPU")
print("[OK] Embedding model loaded!")
return embedding_model
@lru_cache(maxsize=EMBEDDING_CACHE_SIZE)
def get_cached_embedding(text: str) -> np.ndarray:
"""Cache embeddings for frequently asked questions"""
global embedding_model
if embedding_model is None:
return None
instruction = "Represent this sentence for searching relevant passages: "
embedding = embedding_model.encode(
[instruction + text],
normalize_embeddings=True,
show_progress_bar=False,
batch_size=1,
convert_to_numpy=True
)[0]
return embedding
# ============================================
# AUTH HELPERS
# ============================================
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'admin_logged_in' not in session or not session['admin_logged_in']:
return redirect(url_for('admin_login'))
return f(*args, **kwargs)
return decorated_function
def get_or_create_session():
if 'chat_session_id' not in session:
session['chat_session_id'] = str(uuid.uuid4())
if supabase:
try:
supabase.table('chat_sessions').insert({
'session_id': session['chat_session_id'],
'ip_address': request.remote_addr,
'user_agent': request.user_agent.string[:500] if request.user_agent.string else None,
'started_at': datetime.utcnow().isoformat()
}).execute()
except Exception as e:
print(f"[WARN] Could not create session: {e}")
return session['chat_session_id']
def save_chat_to_supabase(session_id: str, user_question: str, ai_response: str,
response_time_ms: int, input_tokens: int = 0, output_tokens: int = 0):
"""Save chat synchronously but non-blocking"""
if not supabase:
return
try:
result = supabase.table('chat_sessions').select('id').eq('session_id', session_id).execute()
total_cost = 0.0 # Groq is free tier
if result.data:
session_uuid = result.data[0]['id']
supabase.table('chat_messages').insert({
'session_id': session_uuid,
'user_question': user_question,
'ai_response': ai_response[:5000],
'response_time_ms': response_time_ms,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_cost': total_cost,
'created_at': datetime.utcnow().isoformat()
}).execute()
print(f"[OK] Chat saved (tokens: in={input_tokens}, out={output_tokens})")
except Exception as e:
print(f"[WARN] Failed to save chat: {e}")
# ============================================
# TEXT PROCESSING (Same as original)
# ============================================
@lru_cache(maxsize=1)
def load_and_process_data(file_path: str) -> List[Dict]:
"""Cached data loading"""
try:
print(f"[INFO] Loading data from: {file_path}")
if not os.path.exists(file_path):
print(f"[ERROR] File not found: {file_path}")
return []
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
print(f"[OK] Loaded {len(content):,} characters")
chunks = create_chunks(content)
print(f"[OK] Created {len(chunks)} chunks")
return chunks
except Exception as e:
print(f"[ERROR] Error loading file: {e}")
return []
def clean_text(text: str) -> str:
"""Optimized text cleaning"""
text = text.replace('\t', ' ')
text = re.sub(r'[^\w\s.,!?;:()\-\'\"@/&|\[\]#]+', ' ', text)
text = re.sub(r' +', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
def create_chunks(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[Dict]:
"""Balanced chunking - original parameters for quality"""
sections = re.split(r'\n\s*\n|\n-{4,}\n', text)
chunks = []
chunk_id = 0
current_chunk = []
current_length = 0
for section in sections:
section = section.strip()
if not section:
continue
section_lines = section.split('\n')
for line in section_lines:
line = line.strip()
if not line:
continue
line_words = len(line.split())
if current_length + line_words > chunk_size and current_chunk:
chunk_text = '\n'.join(current_chunk)
chunks.append({
'text': chunk_text,
'id': chunk_id,
})
chunk_id += 1
overlap_buffer = []
overlap_len = 0
for prev_line in reversed(current_chunk):
prev_len = len(prev_line.split())
if overlap_len + prev_len > overlap:
break
overlap_buffer.insert(0, prev_line)
overlap_len += prev_len
current_chunk = overlap_buffer
current_length = overlap_len
current_chunk.append(line)
current_length += line_words
if current_chunk:
chunks.append({
'text': '\n'.join(current_chunk),
'id': chunk_id,
})
return chunks
# ============================================
# RAG INITIALIZATION (Same as original)
# ============================================
def initialize_rag():
"""Initialize RAG with optimizations"""
global collection, embedding_model, chroma_client
print("\n" + "="*60)
print("INITIALIZING RAG SYSTEM")
print("="*60)
embedding_model = load_embedding_model()
print("[INFO] Initializing vector database...")
chroma_client = chromadb.Client(Settings(
anonymized_telemetry=False,
is_persistent=False
))
collection = chroma_client.get_or_create_collection(
name="rngpit_knowledge",
metadata={"description": "RNG Patel Institute Knowledge Base"}
)
chunks = load_and_process_data(TEXT_FILE_PATH)
if not chunks:
print("[WARN] No data loaded!")
return
print("[INFO] Generating embeddings (batched)...")
chunk_texts = [chunk['text'] for chunk in chunks]
# Optimized batch encoding
embeddings = embedding_model.encode(
chunk_texts,
show_progress_bar=True,
batch_size=64, # Balanced batch size
normalize_embeddings=True,
convert_to_numpy=True
)
print("[INFO] Adding to vector database...")
collection.add(
embeddings=embeddings.tolist(),
documents=chunk_texts,
ids=[f"chunk_{i}" for i in range(len(chunks))]
)
print("\n" + "="*60)
print(f"RAG READY! ({len(chunks)} chunks)")
print("="*60 + "\n")
def retrieve_context(query: str, top_k: int = 5) -> List[str]:
"""Optimized retrieval with caching - BALANCED: 5 chunks for quality"""
global collection, embedding_model
if collection is None or embedding_model is None:
return []
try:
# Use cached embedding
query_embedding = get_cached_embedding(query)
if query_embedding is None:
return []
results = collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=top_k
)
if results and results['documents']:
return results['documents'][0]
return []
except Exception as e:
print(f"[ERROR] Retrieval error: {e}")
return []
# ============================================
# NVIDIA RESPONSE GENERATION (OneAI SDK)
# ============================================
def generate_response_nvidia(query: str, context_chunks: List[str]) -> Dict:
"""Generate response using NVIDIA API with Dracarys Llama 3.1 model"""
global NVIDIA_API_KEY, nvidia_client, current_nvidia_model
if not context_chunks:
return {
'text': "I don't have specific information about that. Could you ask me something else about RNG Patel Institute?",
'input_tokens': 0,
'output_tokens': 0,
'model_used': 'none'
}
if not NVIDIA_AVAILABLE:
return {
'text': "OpenAI SDK not installed. Run: pip install openai",
'input_tokens': 0,
'output_tokens': 0,
'model_used': 'error'
}
if not NVIDIA_API_KEY:
return {
'text': "NVIDIA API key not configured. Please set NVIDIA_API_KEY environment variable.",
'input_tokens': 0,
'output_tokens': 0,
'model_used': 'error'
}
try:
# Initialize NVIDIA client if needed
if nvidia_client is None:
nvidia_client = OpenAI(
base_url=NVIDIA_BASE_URL,
api_key=NVIDIA_API_KEY
)
print("[OK] Initialized NVIDIA client")
# Build context
context_parts = [chunk.strip() for chunk in context_chunks[:10] if chunk.strip()]
context = "\n\n".join(context_parts)[:40000]
# System prompt
system_prompt = """You are a friendly and knowledgeable student ambassador for RNGPIT (R.N.G. Patel Institute of Technology). Your goal is to help students and visitors by answering their questions warmly and directly.
**ABOUT THIS AI (Important):**
When asked about "who made this AI", "who created you", "who built you", or similar questions about the creators/developers, respond with:
"I was built by **Team InnoCrew**, a talented group of students from RNGPIT:
- **Shis Tushar Maheta** (Lead AI Engineer) - B.Tech Computer Science, Class of 2025
- **Zuveriya Meman** -B.Voc Software development, Class of 2025
- **Karan Chaudhary** - B.Voc Software Development, Class of 2023
- **Sem Surti** - B.Voc Software Development, Class of 2023
- **Shreyansh Vasava** - B.Voc Software Development, Class of 2023
Team InnoCrew developed this AI assistant to help students and visitors learn more about RNG Patel Institute of Technology!"
EXACT FACULTY DETIALS:
#### **1. Information Technology (IT) Department Faculty**
| Name | Designation | Education | Exp. | Email |
| --- | --- | --- | --- | --- |
| **Prof. Vivek C. Joshi** | I/C HOD & Asst. Prof | M.Tech (CSE), Ph.D. (Pursuing) | 13+ Yrs | vcjoshi@rngpit.ac.in |
| **Prof. Hardi A. Patel** | Assistant Professor | M.E. (CSE) | 6+ Yrs | hapatel@rngpit.ac.in |
| **Prof. Krina N. Desai** | Assistant Professor | M.E. (CE) | 3+ Yrs | kndesai@rngpit.ac.in |
| **Prof. Nishtha H. Tandel** | Assistant Professor | M.Tech (IT), GSET Qualified | 4+ Yrs | nhtandel@rngpit.ac.in |
| **Prof. Bhavisha S. Parmar** | Assistant Professor | M.E. (CE), Ph.D. (Pursuing) | 12+ Yrs | bsparmar@rngpit.ac.in |
| **Prof. Foram C. Shukla** | Assistant Professor | M.E. (CE) | 1 Yr | fcshukla@rngpit.ac.in |
| **Prof. Purvaj P. Vaidya** | Assistant Professor | M.Tech (Media Tech - Germany) | 1 Yr | ppvaidya@rngpit.ac.in |
| **Prof. Monali R. Gandhi** | Assistant Professor | M.E. (CE) | 11+ Yrs | mrgandhi@rngpit.ac.in |
| **Prof. Ekta R. Bhatia** | Assistant Professor | M.Tech (CSE) | 4 Yrs | erbhatia@rngpit.ac.in |
| **Prof. Pratik M. Gohil** | Assistant Professor | M.Tech (CE) | 5+ Yrs | pmgohil@rngpit.ac.in |
| **Prof. Zeel R. Bhatt** | Assistant Professor | M.Tech (CE) | 3 Mos | zrbhatt@rngpit.ac.in |
| **Prof. Pooja D. Patel** | Assistant Professor | M.E. (CE) | - | pdpatel@rngpit.ac.in |
| **Prof. Ayushi H. Gandhi** | Assistant Professor | M.E. (CE) | 4+ Yrs | ahgandhi@rngpit.ac.in |
| **Prof. Rinisha S. Patel** | Assistant Professor | M.E. (CE) | - | rspatel@rngpit.ac.in |
#### **2. Mechanical Engineering Department Faculty**
| Name | Designation | Education | Email |
| --- | --- | --- | --- |
| **Dr. Kanti B. Rathod** | HOD & Assoc. Professor | Ph.D. (Mechanical) | kbrathod@rngpit.ac.in |
| **Mr. Hardik B. Nayak** | Assistant Professor | Ph.D. (Pursuing) | hbnayak@rngpit.ac.in |
| **Mr. Niravsinh B. Rathod** | Assistant Professor | Ph.D. (Pursuing) | nbrathod@rngpit.ac.in |
| **Mr. Gaurang K. Champaneri** | Assistant Professor | M.Tech (CIM) | gkchampaneri@rngpit.ac.in |
| **Mr. Chirag K. Balar** | Assistant Professor | M.Tech (Mechanical) | ckbalar@rngpit.ac.in |
| **Mr. Dharmin M. Patel** | Assistant Professor | Ph.D. (Pursuing) | dmpatel@rngpit.ac.in |
| **Mr. Nevilkumar M. Patel** | Assistant Professor | M.E. (Machine Design) | nmpatel@rngpit.ac.in |
| **Mr. Yatin H. Chauhan** | Assistant Professor | M.Tech (ME) | yhchauhan@rngpit.ac.in |
| **Mr. Vikramkumar A. Mistry** | Assistant Professor | M.E. (IC Engine & Automobile) | vamistry@rngpit.ac.in |
| **Mr. Sushant K. Merai** | Assistant Professor | Ph.D. (Pursuing) | skmerai@rngpit.ac.in |
| **Mr. Sapan H. Joshi** | Assistant Professor | M.Tech (Thermal System) | shjoshi@rngpit.ac.in |
| **Dr. Ankursinh P. Solanki** | Assistant Professor | Ph.D., M.E. (Thermal) | apsolanki@rngpit.ac.in |
| **Mr. Nikhil M. Pandya** | Assistant Professor | M.E. (Production) | nmpandya@rngpit.ac.in |
| **Mr. Mehul P. Patel** | Assistant Professor | M.E. (Production) | mppatel@rngpit.ac.in |
| **Mr. Vikesh B. Patel** | Assistant Professor | M.Tech (CAD/CAM) | patelvikesh1988@gmail.com |
| **Mr. Shobhit Y. Varshney** | Assistant Professor | M.Tech (Thermal System) | syvarshney@rngpit.ac.in |
#### **3. Civil Engineering Department Faculty**
| Name | Designation | Qualification | Email | More Info |
| ---------------------------- | --------------------------- | ---------------------------------- | --------------------------------------------------------- | --------- |
| Dr. Kamalsinh M. Padhiar | HOD and Associate Professor | β€” | [kmpadhiar@rngpit.ac.in](mailto:kmpadhiar@rngpit.ac.in) | β€” |
| Mr. Gaurav P. Barot | Assistant Professor | M.Tech (Structural Engineering) | [gpbarot@rngpit.ac.in](mailto:gpbarot@rngpit.ac.in) | β€” |
| Mr. Mohammed Ahmed Qureshi | Assistant Professor | M.Tech (Structure), Ph.D. Pursuing | [maqureshi@rngpit.ac.in](mailto:maqureshi@rngpit.ac.in) | β€” |
| Mr. Nirav P. Desai | Assistant Professor | M.E (Transportation) | [npdesai@rngpit.ac.in](mailto:npdesai@rngpit.ac.in) | β€” |
| Mr. Sharukh M. Marfani | Assistant Professor | M.E (CE) | [smmarfani@rngpit.ac.in](mailto:smmarfani@rngpit.ac.in) | β€” |
| Mr. Hilay N. Prajapati | Assistant Professor | M.E (CED) | [hnprajapati@ngpit.ac.in](mailto:hnprajapati@ngpit.ac.in) | β€” |
| Mr. Ajay B. Patel | Assistant Professor | M.E. (Civil Engg.) | [ajaybpatel@rngpit.ac.in](mailto:ajaybpatel@rngpit.ac.in) | β€” |
| Ms. Srushti U. Joshi | Assistant Professor | M.E (Civil Engg.) | [sujoshi@rngpit.ac.in](mailto:sujoshi@rngpit.ac.in) | β€” |
| Mr. Priyank H. Patel | Assistant Professor | β€” | [phpatel@rngpit.ac.in](mailto:phpatel@rngpit.ac.in) | β€” |
| Mr. Atish P. More | Assistant Professor | Masters in Environmental Engg. | [apmore@rngpit.ac.in](mailto:apmore@rngpit.ac.in) | β€” |
| Ms. Hetvi J. Kania | Assistant Professor | M.E. in Environmental Engg. | [hjkania@rngpit.ac.in](mailto:hjkania@rngpit.ac.in) | β€” |
| Mr. Pritesh R. Bhandari | Assistant Professor | Diploma Civil | [prbhandari@rngpit.ac.in](mailto:prbhandari@rngpit.ac.in) | β€” |
| Mr. Viral Jagdishbhai Rathod | Assistant Professor | M.E. (Production) | [vjrathod@rngpit.ac.in](mailto:vjrathod@rngpit.ac.in) | β€” |
#### **GIVE THE DETIALS OF THE DEPARTMENT THAT IS ONLY ASKED DONT PROVIDE ANY OTHER RANDOM DATA**
**STRICT FORMATTING RULES:**
1. **ALWAYS USE TABLES FOR DATA**: If you are listing more than 3 items (like faculty names, committee members, fees, courses, placements), you **MUST** use a Markdown table. Do not use bullet points for long lists.
*Example Table:*
| Name | Position | Location |
|------|----------|----------|
| John Doe | President | Surat |
2. **DIRECT ANSWER**: Answer the question immediately. Do not start with "Okay", "Sure", or "Based on the context".
3. **BEAUTIFUL FORMATTING**: Use **bold** for importance, `code` for emails/numbers, and clear paragraphs.
4. **NO CITATIONS**: Do not say "according to the document". Speak as if you know the facts yourself.
5. **GRACEFUL FALLBACK**: If you don't know, suggest contacting info@rngpit.ac.in."""
user_prompt = f"""Context Information:
{context}
User Question: {query}
Answer:"""
def make_api_call(client_to_use, model_to_use):
return client_to_use.chat.completions.create(
model=model_to_use,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.5,
top_p=0.8,
max_tokens=4096,
stream=True
)
# Attempt 1: Primary Key
try:
completion = make_api_call(nvidia_client, current_nvidia_model)
except Exception as e:
error_msg = str(e)
# Check for Rate Limit (429) or Quota issues
if ("429" in error_msg or "quota" in error_msg.lower()) and NVIDIA_API_KEY_2 and NVIDIA_API_KEY != NVIDIA_API_KEY_2:
print(f"[WARN] detailed error: {error_msg}")
print(f"[WARN] Primary API key rate limited using fallback key...")
# Switch to backup key
NVIDIA_API_KEY = NVIDIA_API_KEY_2
nvidia_client = OpenAI(base_url=NVIDIA_BASE_URL, api_key=NVIDIA_API_KEY)
# Attempt 2: Backup Key
completion = make_api_call(nvidia_client, current_nvidia_model)
else:
raise e # Re-raise if not a rate limit issue or no backup key
# Collect streamed response
response_text = ""
for chunk in completion:
if chunk.choices[0].delta.content is not None:
response_text += chunk.choices[0].delta.content
# Estimate token counts
input_tokens = int(len((system_prompt + user_prompt).split()) * 1.3)
output_tokens = int(len(response_text.split()) * 1.3)
if not response_text:
response_text = "I couldn't generate a response. Please try again."
print(f"[DEBUG] NVIDIA generated ~{int(output_tokens)} tokens using {current_nvidia_model}")
return {
'text': response_text.strip(),
'input_tokens': int(input_tokens),
'output_tokens': int(output_tokens),
'model_used': current_nvidia_model
}
except Exception as e:
error_msg = str(e)
print(f"[ERROR] NVIDIA API error: {error_msg}")
if "API key" in error_msg.lower() or "authentication" in error_msg.lower() or "unauthorized" in error_msg.lower():
return {'text': "Invalid NVIDIA API key. Please check your API key.", 'input_tokens': 0, 'output_tokens': 0, 'model_used': 'error'}
elif "quota" in error_msg.lower() or "limit" in error_msg.lower() or "rate" in error_msg.lower():
return {'text': "API rate limit reached. Please wait a moment and try again.", 'input_tokens': 0, 'output_tokens': 0, 'model_used': 'error'}
else:
return {'text': f"Error processing request: {error_msg}", 'input_tokens': 0, 'output_tokens': 0, 'model_used': 'error'}
# ============================================
# MODEL INFO
# ============================================
def get_model_info():
"""Get model information"""
return {
"available_models": {
model_id: {
"name": config["name"],
"description": config["description"],
"context_length": config["context_length"],
"max_new_tokens": config["max_new_tokens"],
}
for model_id, config in NVIDIA_MODEL_REGISTRY.items()
},
"current_model": current_nvidia_model,
"is_loaded": bool(NVIDIA_API_KEY),
"provider": "nvidia"
}
# ============================================
# ROUTES
# ============================================
@app.route('/')
def home():
return render_template('index.html')
@app.route('/chat', methods=['POST'])
def chat():
global debug_mode
try:
start_time = time.time()
data = request.json
user_message = data.get('message', '').strip()
if not user_message:
return jsonify({'error': 'No message provided'}), 400
print(f"\n[CHAT] Query: {user_message}")
# Get session ID within request context
current_session_id = get_or_create_session()
# Fast retrieval
context_chunks = retrieve_context(user_message, top_k=5)
if not context_chunks:
response_text = "I don't have information about that. Try asking about courses, admissions, fees, placements, or facilities at RNGPIT."
input_tokens = 0
output_tokens = 0
else:
print(f"[INFO] Found {len(context_chunks)} relevant chunks")
print("[INFO] Using NVIDIA API (DeepSeek) for generation...")
result = generate_response_nvidia(user_message, context_chunks)
response_text = result['text']
input_tokens = result['input_tokens']
output_tokens = result['output_tokens']
model_used = result.get('model_used', current_nvidia_model)
response_time_ms = int((time.time() - start_time) * 1000)
# Save to database (synchronous but fast)
save_chat_to_supabase(current_session_id, user_message, response_text, response_time_ms, input_tokens, output_tokens)
print(f"[OK] Response ({len(response_text)} chars) in {response_time_ms}ms [Provider: NVIDIA DeepSeek]")
response_data = {
'response': response_text,
'response_time_ms': response_time_ms
}
if debug_mode:
response_data['debug'] = {
'enabled': True,
'chunks_used': len(context_chunks),
'model': model_used if context_chunks else current_nvidia_model,
'provider': 'nvidia',
'input_tokens': input_tokens,
'output_tokens': output_tokens
}
return jsonify(response_data)
except Exception as e:
print(f"[ERROR] Chat error: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
model_info = get_model_info()
return jsonify({
'status': 'healthy',
'chunks_loaded': collection.count() if collection else 0,
'current_model': current_nvidia_model,
'model_loaded': nvidia_client is not None,
'provider': 'nvidia',
'debug_mode': debug_mode,
'nvidia_configured': bool(NVIDIA_API_KEY)
})
# ============================================
# MODEL MANAGEMENT API
# ============================================
@app.route('/api/models', methods=['GET'])
def api_get_models():
return jsonify(get_model_info())
@app.route('/api/models/switch', methods=['POST'])
@login_required
def api_switch_model():
global current_nvidia_model
data = request.json or {}
model_id = data.get('model_id', DEFAULT_NVIDIA_MODEL)
if model_id not in NVIDIA_MODEL_REGISTRY:
return jsonify({
'success': False,
'error': f"Model '{model_id}' not found in NVIDIA registry"
}), 400
current_nvidia_model = model_id
print(f"[INFO] Switched to NVIDIA model: {model_id}")
return jsonify({
'success': True,
'message': f"Switched to {NVIDIA_MODEL_REGISTRY[model_id]['name']}",
'model_info': get_model_info()
})
@app.route('/api/embeddings/regenerate', methods=['POST'])
def api_regenerate_embeddings():
global collection
try:
if collection is not None:
chroma_client.delete_collection("rngpit_knowledge")
# Clear cache
get_cached_embedding.cache_clear()
load_and_process_data.cache_clear()
initialize_rag()
return jsonify({
'success': True,
'message': 'Embeddings regenerated',
'chunks_loaded': collection.count() if collection else 0
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/debug/toggle', methods=['POST'])
def api_toggle_debug():
global debug_mode
debug_mode = not debug_mode
return jsonify({'success': True, 'debug_mode': debug_mode})
@app.route('/api/debug/status', methods=['GET'])
def api_debug_status():
return jsonify({'debug_mode': debug_mode})
@app.route('/api/nvidia-key', methods=['POST'])
@login_required
def api_set_nvidia_key():
global NVIDIA_API_KEY, nvidia_client
data = request.json or {}
api_key = data.get('api_key', '').strip()
if not api_key:
return jsonify({'success': False, 'error': 'API key required'}), 400
NVIDIA_API_KEY = api_key
nvidia_client = OpenAI(base_url=NVIDIA_BASE_URL, api_key=NVIDIA_API_KEY) # Reset client with new key
print("[INFO] NVIDIA API key updated")
return jsonify({
'success': True,
'message': 'NVIDIA API key configured',
'key_preview': api_key[:8] + '...' + api_key[-4:] if len(api_key) > 12 else '***'
})
@app.route('/api/nvidia-key/status', methods=['GET'])
@login_required
def api_nvidia_key_status():
if NVIDIA_API_KEY:
return jsonify({
'configured': True,
'key_preview': NVIDIA_API_KEY[:8] + '...' + NVIDIA_API_KEY[-4:] if len(NVIDIA_API_KEY) > 12 else '***'
})
return jsonify({'configured': False})
# ============================================
# ADMIN ROUTES
# ============================================
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
if request.method == 'GET':
if session.get('admin_logged_in'):
return redirect(url_for('admin_dashboard'))
return render_template('login.html')
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '').strip()
if not username or not password:
return jsonify({'success': False, 'error': 'Username and password required'}), 400
if not supabase:
return jsonify({'success': False, 'error': 'Database not available'}), 500
try:
result = supabase.table('admin_users').select('*').eq('username', username).execute()
if result.data and len(result.data) > 0:
user = result.data[0]
if user['password_hash'] == password:
session['admin_logged_in'] = True
session['admin_username'] = username
session['admin_id'] = user['id']
# Update last login
supabase.table('admin_users').update({
'last_login': datetime.utcnow().isoformat()
}).eq('id', user['id']).execute()
return jsonify({'success': True, 'redirect': '/admin/dashboard'})
return jsonify({'success': False, 'error': 'Invalid credentials'}), 401
except Exception as e:
print(f"[ERROR] Login: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/admin/logout')
def admin_logout():
session.pop('admin_logged_in', None)
session.pop('admin_username', None)
session.pop('admin_id', None)
return redirect(url_for('admin_login'))
@app.route('/admin/dashboard')
@login_required
def admin_dashboard():
return render_template('admin.html', username=session.get('admin_username', 'Admin'))
# ============================================
# ANALYTICS API (OPTIMIZED)
# ============================================
@app.route('/api/analytics/stats')
@login_required
def get_analytics_stats():
if not supabase:
return jsonify({'error': 'Database not available'}), 500
try:
total_result = supabase.table('chat_messages').select('id, input_tokens, output_tokens, total_cost').execute()
total_questions = len(total_result.data) if total_result.data else 0
total_input_tokens = sum(msg.get('input_tokens', 0) or 0 for msg in (total_result.data or []))
total_output_tokens = sum(msg.get('output_tokens', 0) or 0 for msg in (total_result.data or []))
total_cost = sum(msg.get('total_cost', 0) or 0 for msg in (total_result.data or []))
sessions_result = supabase.table('chat_sessions').select('id').execute()
total_sessions = len(sessions_result.data) if sessions_result.data else 0
today = datetime.utcnow().date().isoformat()
today_result = supabase.table('chat_messages').select('id, input_tokens, output_tokens, total_cost').gte('created_at', today).execute()
today_questions = len(today_result.data) if today_result.data else 0
today_input_tokens = sum(msg.get('input_tokens', 0) or 0 for msg in (today_result.data or []))
today_output_tokens = sum(msg.get('output_tokens', 0) or 0 for msg in (today_result.data or []))
today_cost = sum(msg.get('total_cost', 0) or 0 for msg in (today_result.data or []))
avg_input_tokens = total_input_tokens / total_questions if total_questions > 0 else 0
avg_output_tokens = total_output_tokens / total_questions if total_questions > 0 else 0
avg_cost_per_message = total_cost / total_questions if total_questions > 0 else 0
return jsonify({
'total_questions': total_questions,
'total_sessions': total_sessions,
'today_questions': today_questions,
'total_input_tokens': total_input_tokens,
'total_output_tokens': total_output_tokens,
'total_tokens': total_input_tokens + total_output_tokens,
'total_cost': round(total_cost, 6),
'today_input_tokens': today_input_tokens,
'today_output_tokens': today_output_tokens,
'today_tokens': today_input_tokens + today_output_tokens,
'today_cost': round(today_cost, 6),
'avg_input_tokens': round(avg_input_tokens, 1),
'avg_output_tokens': round(avg_output_tokens, 1),
'avg_cost_per_message': round(avg_cost_per_message, 6),
'pricing': {
'input_per_1m': GROQ_COST_PER_1M_INPUT,
'output_per_1m': GROQ_COST_PER_1M_OUTPUT
},
'provider': 'groq'
})
except Exception as e:
print(f"[ERROR] Stats: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/analytics/top-questions')
@login_required
def get_top_questions():
if not supabase:
return jsonify({'error': 'Database not available'}), 500
try:
limit = request.args.get('limit', 10, type=int)
result = supabase.table('chat_messages').select('user_question, created_at').execute()
if not result.data:
return jsonify({'questions': []})
question_counts = {}
for msg in result.data:
q = msg['user_question'].lower().strip()
if q in question_counts:
question_counts[q]['count'] += 1
if msg['created_at'] > question_counts[q]['last_asked']:
question_counts[q]['last_asked'] = msg['created_at']
question_counts[q]['original'] = msg['user_question']
else:
question_counts[q] = {
'count': 1,
'last_asked': msg['created_at'],
'original': msg['user_question']
}
sorted_questions = sorted(
[{'question': v['original'], 'count': v['count'], 'last_asked': v['last_asked']}
for v in question_counts.values()],
key=lambda x: x['count'],
reverse=True
)[:limit]
return jsonify({'questions': sorted_questions})
except Exception as e:
print(f"[ERROR] Top questions: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/analytics/all-questions')
@login_required
def get_all_questions():
if not supabase:
return jsonify({'error': 'Database not available'}), 500
try:
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('search', '').strip()
offset = (page - 1) * per_page
query = supabase.table('chat_messages').select('id, user_question, ai_response, created_at, response_time_ms')
if search:
query = query.ilike('user_question', f'%{search}%')
result = query.order('created_at', desc=True).range(offset, offset + per_page - 1).execute()
count_query = supabase.table('chat_messages').select('id', count='exact')
if search:
count_query = count_query.ilike('user_question', f'%{search}%')
count_result = count_query.execute()
total = count_result.count if count_result.count else 0
return jsonify({
'questions': result.data,
'total': total,
'page': page,
'per_page': per_page,
'total_pages': (total + per_page - 1) // per_page
})
except Exception as e:
print(f"[ERROR] All questions: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/analytics/token-usage')
@login_required
def get_token_usage():
if not supabase:
return jsonify({'error': 'Database not available'}), 500
try:
result = supabase.table('chat_messages').select(
'created_at, input_tokens, output_tokens, total_cost'
).order('created_at', desc=False).execute()
if not result.data:
return jsonify({'daily_usage': [], 'hourly_usage': []})
daily_data = {}
hourly_data = {}
for msg in result.data:
try:
created_at = datetime.fromisoformat(msg['created_at'].replace('Z', '+00:00'))
day_key = created_at.strftime('%Y-%m-%d')
hour_key = created_at.strftime('%Y-%m-%d %H:00')
input_tokens = msg.get('input_tokens', 0) or 0
output_tokens = msg.get('output_tokens', 0) or 0
cost = msg.get('total_cost', 0) or 0
if day_key not in daily_data:
daily_data[day_key] = {
'date': day_key,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'cost': 0,
'messages': 0
}
daily_data[day_key]['input_tokens'] += input_tokens
daily_data[day_key]['output_tokens'] += output_tokens
daily_data[day_key]['total_tokens'] += input_tokens + output_tokens
daily_data[day_key]['cost'] += cost
daily_data[day_key]['messages'] += 1
if hour_key not in hourly_data:
hourly_data[hour_key] = {
'hour': hour_key,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'cost': 0,
'messages': 0
}
hourly_data[hour_key]['input_tokens'] += input_tokens
hourly_data[hour_key]['output_tokens'] += output_tokens
hourly_data[hour_key]['total_tokens'] += input_tokens + output_tokens
hourly_data[hour_key]['cost'] += cost
hourly_data[hour_key]['messages'] += 1
except:
continue
daily_usage = sorted(daily_data.values(), key=lambda x: x['date'])[-30:]
hourly_usage = sorted(hourly_data.values(), key=lambda x: x['hour'])[-24:]
if daily_usage:
avg_daily_tokens = sum(d['total_tokens'] for d in daily_usage) / len(daily_usage)
avg_daily_cost = sum(d['cost'] for d in daily_usage) / len(daily_usage)
avg_daily_messages = sum(d['messages'] for d in daily_usage) / len(daily_usage)
else:
avg_daily_tokens = avg_daily_cost = avg_daily_messages = 0
return jsonify({
'daily_usage': daily_usage,
'hourly_usage': hourly_usage,
'projections': {
'avg_daily_tokens': round(avg_daily_tokens, 0),
'avg_daily_cost': round(avg_daily_cost, 4),
'avg_daily_messages': round(avg_daily_messages, 1),
'projected_monthly_tokens': round(avg_daily_tokens * 30, 0),
'projected_monthly_cost': round(avg_daily_cost * 30, 2),
'projected_monthly_messages': round(avg_daily_messages * 30, 0)
}
})
except Exception as e:
print(f"[ERROR] Token usage: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/check')
def check_admin():
return jsonify({
'is_admin': session.get('admin_logged_in', False),
'username': session.get('admin_username', None)
})
# ============================================
# MAIN
# ============================================
if __name__ == '__main__':
print("\n" + "="*60)
print("RNGPIT AI ASSISTANT - NVIDIA API (Llama 3.1) VERSION")
print("="*60)
print(f"Data: {TEXT_FILE_PATH}")
print(f"Available Models: {list(NVIDIA_MODEL_REGISTRY.keys())}")
print(f"Default Model: {DEFAULT_NVIDIA_MODEL}")
print(f"Supabase: {'βœ“' if supabase else 'βœ—'}")
print(f"NVIDIA API Key: {'βœ“ Configured' if NVIDIA_API_KEY else 'βœ— Not set (use NVIDIA_API_KEY env var)'}")
print("="*60)
initialize_rag()
print("\nπŸš€ Server starting...")
print("πŸ“± Chatbot: http://localhost:5000")
print("πŸ” Admin: http://localhost:5000/admin/login")
print("\n⚑ NVIDIA API Features:")
print(" βœ“ Dracarys Llama 3.1 70B via NVIDIA")
print(" βœ“ Available models:")
for model_id, config in NVIDIA_MODEL_REGISTRY.items():
print(f" - {model_id}: {config['description']}")
print(" βœ“ OpenAI-compatible API")
print(" βœ“ Powerful 70B parameter model")
print(" βœ“ Same RAG pipeline as original")
print(" βœ“ Same admin panel and analytics")
print(" βœ“ No local GPU required!")
print("="*60 + "\n")
app.run(debug=False, port=5000, host='0.0.0.0', threaded=True)