import os import re import logging import tempfile from pathlib import Path from typing import List,Tuple,Any import numpy as np import PyPDF2 from sentence_transformers import SentenceTransformer import faiss import gradio as gr from gtts import gTTS import requests import math import ast import json try: import sympy as sp SYMPY_OK = True except Exception: SYMPY_OK = False try: from groq import Groq GROQ_OK = True except ImportError: GROQ_OK = False print("Groq library not installed!") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) GROQ_API_KEY = os.getenv("GROQ_API_KEY","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") groq_client = None if GROQ_OK: try: groq_client = Groq(api_key=GROQ_API_KEY) print("Groq client initialized successfully!") except Exception as e: groq_client = None print(f"Groq initialization error: {e}") class SafeEval(ast.NodeVisitor): ALLOWED_NAMES = {n: getattr(math,n) for n in dir(math) if not n.startswith("__")} ALLOWED_NAMES.update({"abs": abs,"round": round,"pi": math.pi,"e": math.e}) def visit(self,node): if isinstance(node,ast.Expression): return self.visit(node.body) if isinstance(node,ast.BinOp): left = self.visit(node.left) right = self.visit(node.right) return self._binop(node.op,left,right) if isinstance(node,ast.UnaryOp): operand = self.visit(node.operand) return self._unaryop(node.op,operand) if isinstance(node,ast.Num): return node.n if isinstance(node,ast.Constant) and isinstance(node.value,(int,float)): return node.value if isinstance(node,ast.Call): func = node.func if isinstance(func,ast.Name) and func.id in self.ALLOWED_NAMES: args = [self.visit(a) for a in node.args] return self.ALLOWED_NAMES[func.id](*args) if isinstance(node,ast.Name): if node.id in self.ALLOWED_NAMES: return self.ALLOWED_NAMES[node.id] raise ValueError(f"Use of name '{node.id}' is not allowed") raise ValueError(f"Unsupported expression: {ast.dump(node)}") def _binop(self,op,a,b): if isinstance(op,ast.Add): return a + b if isinstance(op,ast.Sub): return a - b if isinstance(op,ast.Mult): return a * b if isinstance(op,ast.Div): return a / b if isinstance(op,ast.Mod): return a % b if isinstance(op,ast.Pow): return a ** b if isinstance(op,ast.FloorDiv): return a // b raise ValueError("Unsupported binary operator") def _unaryop(self,op,a): if isinstance(op,ast.UAdd): return +a if isinstance(op,ast.USub): return -a raise ValueError("Unsupported unary operator") def safe_calc_eval(expr: str): expr = expr.strip() expr = expr.replace('^','**') expr = expr.replace('x','*').replace('X','*') expr = expr.replace('×','*').replace('÷','/') if SYMPY_OK: try: result = sp.sympify(expr) numeric = float(result.evalf()) return True,str(numeric) except: pass try: node = ast.parse(expr,mode='eval') se = SafeEval() val = se.visit(node) return True,str(val) except Exception as e: return False,f"Calc error: {e}" def get_stock_price(symbol: str) -> dict: symbol = symbol.upper().strip() try: url = f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}" headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} resp = requests.get(url,headers=headers,timeout=10) resp.raise_for_status() data = resp.json() if "chart" in data and "result" in data["chart"] and data["chart"]["result"]: result = data["chart"]["result"][0] meta = result.get("meta",{}) current_price = meta.get("regularMarketPrice",0) previous_close = meta.get("previousClose",0) currency = meta.get("currency","USD") exchange = meta.get("exchangeName","Unknown") name = meta.get("shortName",symbol) change = current_price - previous_close if previous_close else 0 change_percent = (change / previous_close * 100) if previous_close else 0 return { "success": True, "symbol": symbol, "name": name, "price": round(current_price,2), "change": round(change,2), "change_percent": round(change_percent,2), "previous_close": round(previous_close,2), "currency": currency, "exchange": exchange } return {"success": False,"error": f"No data for {symbol}"} except Exception as e: logger.error(f"Stock API error: {e}") return {"success": False,"error": str(e)} def extract_stock_symbol(question: str) -> str: question_upper = question.upper() known_stocks = { "CARECLOUD": "MTBC","CARE CLOUD": "MTBC","MTBC": "MTBC", "APPLE": "AAPL","GOOGLE": "GOOGL","ALPHABET": "GOOGL", "MICROSOFT": "MSFT","AMAZON": "AMZN","TESLA": "TSLA", "META": "META","FACEBOOK": "META","NVIDIA": "NVDA", "NETFLIX": "NFLX","INTEL": "INTC","AMD": "AMD", "PAYPAL": "PYPL","DISNEY": "DIS","WALMART": "WMT", "NIKE": "NKE","BOEING": "BA","UBER": "UBER", "ZOOM": "ZM","SPOTIFY": "SPOT" } for name,symbol in known_stocks.items(): if name in question_upper: logger.info(f"Found stock: {name} -> {symbol}") return symbol common_words = {'THE','AND','FOR','ARE','BUT','NOT','YOU','ALL', 'STOCK','PRICE','CURRENT','TELL','ABOUT','WHAT','HOW'} words = re.findall(r'\b[A-Z]{2,5}\b',question_upper) for word in words: if word not in common_words: return word return "" def web_search(query: str,max_results: int = 5) -> List[dict]: try: resp = requests.get( "https://html.duckduckgo.com/html/", params={"q": query}, timeout=10, headers={"User-Agent": "Mozilla/5.0"} ) resp.raise_for_status() text = resp.text results = [] parts = text.split('result__a') for part in parts[1:max_results+1]: title = "" snippet = "" try: title_match = re.search(r'>([^<]+)<',part) title = title_match.group(1) if title_match else "" except: pass try: if 'result__snippet' in part: snippet_part = part.split('result__snippet')[1] snippet_match = re.search(r'>([^<]+)<',snippet_part) snippet = snippet_match.group(1) if snippet_match else "" except: pass if title or snippet: results.append({"title": title.strip(),"snippet": snippet.strip()}) return results except Exception as e: logger.error(f"Web search error: {e}") return [] class AgenticRAGAgent: def __init__(self): self.chunks = [] self.index = None self.embedder = SentenceTransformer('all-MiniLM-L6-v2') self.temperature = 0.3 self.max_tokens = 1000 self.chunk_size = 512 self.chunk_overlap = 50 self.retrieval_k = 10 self.enable_web_search = True self.enable_calculations = True self.enable_fact_checking = True self.enable_analysis = True self.enable_stock_lookup = True self.relevance_threshold = 0.35 self.pdf_loaded = False print("AgenticRAGAgent initialized") def remove_emojis(self,text: str) -> str: emoji_pattern = re.compile("[" u"\U0001F600-\U0001F64F" u"\U0001F300-\U0001F5FF" u"\U0001F680-\U0001F6FF" u"\U0001F1E0-\U0001F1FF" u"\U00002702-\U000027B0" u"\U000024C2-\U0001F251" "]+",flags=re.UNICODE) return emoji_pattern.sub(r'',text) def clean_for_voice(self,text: str) -> str: text = self.remove_emojis(text) text = re.sub(r'[\*_`#\[\]\|]','',text) text = re.sub(r'\s+',' ',text).strip() return text def generate_voice(self,text: str): if not text or not text.strip(): return None clean = self.clean_for_voice(text) if len(clean) < 5: return None try: tts = gTTS(text=clean[:500],lang='en',slow=False) tmp = tempfile.NamedTemporaryFile(delete=False,suffix=".mp3") tts.save(tmp.name) return tmp.name except Exception as e: logger.error(f"Voice error: {e}") return None def upload_pdfs(self,files): if not files: return "No files selected." folder = Path("sample_data") folder.mkdir(exist_ok=True) all_chunks = [] count = 0 for file in files: filename = str(file.name) if hasattr(file,'name') else str(file) if not filename.lower().endswith('.pdf'): continue dest = folder / Path(filename).name try: content = file.read() if hasattr(file,'read') else open(filename,'rb').read() with open(dest,"wb") as f: f.write(content) except Exception as e: continue text = "" try: with open(dest,'rb') as f: reader = PyPDF2.PdfReader(f) for page in reader.pages: t = page.extract_text() if t: text += t + " " except Exception as e: continue if text.strip(): chunks = [text[i:i+self.chunk_size] for i in range(0,len(text),self.chunk_size - self.chunk_overlap)] all_chunks.extend([{"content": str(c.strip())} for c in chunks if c.strip()]) count += 1 if not all_chunks: return "No readable text in PDFs." vecs = self.embedder.encode([c["content"] for c in all_chunks],show_progress_bar=True) vecs = vecs / np.linalg.norm(vecs,axis=1,keepdims=True) dim = vecs.shape[1] self.index = faiss.IndexFlatIP(dim) self.index.add(vecs.astype('float32')) self.chunks = all_chunks self.pdf_loaded = True return f"Loaded {count} PDF(s) with {len(all_chunks)} chunks!" def is_stock_question(self,question: str) -> Tuple[bool,str]: question_lower = question.lower() stock_keywords = ['stock','share','price','trading','ticker','nasdaq','nyse','market'] known_companies = ['carecloud','mtbc','apple','google','microsoft','amazon', 'tesla','meta','nvidia','netflix','intel','amd'] has_keyword = any(kw in question_lower for kw in stock_keywords) has_company = any(co in question_lower for co in known_companies) if has_keyword or has_company: symbol = extract_stock_symbol(question) if symbol: logger.info(f"Stock question detected: {symbol}") return True,symbol return False,"" def is_calculation_question(self,question: str) -> Tuple[bool,str]: question_lower = question.lower() calc_keywords = ['calculate','compute','solve','calcuate','calc'] has_calc_word = any(kw in question_lower for kw in calc_keywords) math_match = re.search(r'(\d+)\s*[\*xX×\+\-\/÷\^]\s*(\d+)',question) if math_match: expr = math_match.group(0) expr = expr.replace('x','*').replace('X','*').replace('×','*').replace('÷','/') logger.info(f"Math expression found: {expr}") return True,expr pure_math = re.match(r'^[\d\s\+\-\*\/\^\(\)\.xX×÷]+$',question.strip()) if pure_math: expr = question.strip() expr = expr.replace('x','*').replace('X','*').replace('×','*').replace('÷','/') return True,expr if has_calc_word: nums = re.findall(r'\d+',question) if len(nums) >= 2: expr = f"{nums[0]}*{nums[1]}" return True,expr return False,"" def is_pdf_related_question(self,question: str) -> bool: pdf_keywords = ['pdf','document','file','attached','uploaded','summarize', 'summary','in the document','from the document','the paper'] question_lower = question.lower() return any(kw in question_lower for kw in pdf_keywords) def is_general_knowledge_question(self,question: str) -> bool: question_lower = question.lower() if 'stock' in question_lower or 'price' in question_lower: return False if re.search(r'\d+\s*[\*\+\-\/]\s*\d+',question): return False general_triggers = ['what is ai','how does','explain','tell me about', 'history of','future of','definition'] return any(t in question_lower for t in general_triggers) def check_context_relevance(self,question: str,context: str,scores: np.ndarray) -> Tuple[bool,float]: if not context: return False,0.0 max_score = float(np.max(scores)) if len(scores) > 0 else 0.0 stop_words = {'what','is','the','a','how','tell','me','about','stock','price'} q_terms = [w.lower() for w in re.findall(r'\b\w+\b',question) if w.lower() not in stop_words and len(w) > 2] matches = sum(1 for t in q_terms if t in context.lower()) coverage = matches / len(q_terms) if q_terms else 0 is_relevant = max_score >= self.relevance_threshold and coverage >= 0.3 return is_relevant,max_score def determine_tool(self,question: str) -> Tuple[str,str]: logger.info(f"Determining tool for: {question}") is_stock,symbol = self.is_stock_question(question) if is_stock and symbol: logger.info(f"Tool: STOCK,Symbol: {symbol}") return 'stock',symbol is_calc,expr = self.is_calculation_question(question) if is_calc and expr: logger.info(f"Tool: CALCULATOR,Expression: {expr}") return 'calculator',expr if self.is_pdf_related_question(question): if self.pdf_loaded: logger.info("Tool: PDF") return 'pdf','' if self.is_general_knowledge_question(question): logger.info("Tool: WEB") return 'web','' if self.pdf_loaded: return 'check_pdf','' logger.info("Tool: WEB (default)") return 'web','' def perform_analysis(self,answer: str,tools_used: List[str]) -> str: if not self.enable_analysis or not answer: return "" analysis = [] for tool in tools_used: if tool == "PDF": analysis.append("📄 Source: PDF Documents") elif tool == "Web": analysis.append("🌐 Source: Web Search") elif tool == "Calculator": analysis.append("🧮 Source: Calculator") elif tool == "Stock": analysis.append("📈 Source: Yahoo Finance (Real-time)") word_count = len(answer.split()) analysis.append(f"📊 Response: {word_count} words") if analysis: return "\n\n[📊 Analysis]\n• " + "\n• ".join(analysis) return "" def ask(self,question: str,history: List) -> Tuple[List,Any]: global groq_client if not isinstance(question,str): question = str(question) if question else "" if not isinstance(history,list): history = [] question = question.strip() if not question: return history,None if question.lower() in ["hi","hello","hey"]: reply = "👋 Hi! I can help with:\n• 📈 Stock prices (try: 'stock price of MTBC')\n• 🧮 Calculations (try: '2*4')\n• 📄 PDF questions\n• 🌐 Web search" history.append([question,reply]) return history,self.generate_voice(reply) tools_used = [] reply = "" tool,extra = self.determine_tool(question) logger.info(f"Selected tool: {tool},extra: {extra}") # STOCK TOOL if tool == 'stock' and extra: stock_data = get_stock_price(extra) if stock_data.get("success"): change_emoji = "📈" if stock_data["change"] >= 0 else "📉" sign = "+" if stock_data["change"] >= 0 else "" reply = f"""## 📈 {stock_data['name']} ({stock_data['symbol']}) **Current Price:** ${stock_data['price']} {stock_data['currency']} **Change:** {change_emoji} {sign}${stock_data['change']} ({sign}{stock_data['change_percent']}%) **Previous Close:** ${stock_data['previous_close']} **Exchange:** {stock_data['exchange']} *Real-time data from Yahoo Finance*""" tools_used.append("Stock") else: tool = 'web' # CALCULATOR TOOL if tool == 'calculator' and extra: ok,result = safe_calc_eval(extra) if ok: reply = f"""## 🧮 Calculator **Expression:** `{extra}` **Result:** **{result}**""" tools_used.append("Calculator") else: reply = f"Calculation error: {result}" tools_used.append("Calculator") # PDF TOOL if tool in ['pdf','check_pdf'] and self.index: try: q_vec = self.embedder.encode([question]) q_vec = q_vec / np.linalg.norm(q_vec) scores,indices = self.index.search(q_vec.astype('float32'),k=self.retrieval_k) context_list = [self.chunks[i]["content"] for i in indices[0] if i < len(self.chunks)] context = "\n\n".join(context_list) if tool == 'pdf' or self.check_context_relevance(question,context,scores[0])[0]: tools_used.append("PDF") prompt = f"Document:\n{context}\n\nQuestion: {question}\n\nAnswer based on the document:" if groq_client: resp = groq_client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{"role": "user","content": prompt}], temperature=self.temperature, max_tokens=self.max_tokens ) reply = resp.choices[0].message.content.strip() else: tool = 'web' except Exception as e: logger.error(f"PDF error: {e}") tool = 'web' # WEB SEARCH TOOL if tool == 'web' and not reply: results = web_search(question) if results: tools_used.append("Web") web_text = "\n".join([f"- {r['title']}: {r['snippet']}" for r in results[:3]]) prompt = f"Web results:\n{web_text}\n\nQuestion: {question}\n\nProvide a helpful answer:" if groq_client: try: resp = groq_client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{"role": "user","content": prompt}], temperature=self.temperature, max_tokens=self.max_tokens ) reply = resp.choices[0].message.content.strip() reply += "\n\n🌐 **Web Sources:**\n" + "\n".join([f"• {r['title']}" for r in results[:3]]) except Exception as e: reply = f"Error: {e}" else: reply = "Web results:\n" + web_text # FALLBACK if not reply: if groq_client: try: resp = groq_client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{"role": "user","content": question}], temperature=self.temperature, max_tokens=self.max_tokens ) reply = resp.choices[0].message.content.strip() tools_used.append("LLM") except Exception as e: reply = f"Error: {e}" else: reply = "Unable to process request." # Add analysis analysis = self.perform_analysis(reply,tools_used) if analysis: reply += analysis logger.info(f"Tools used: {tools_used}") history.append([question,reply]) return history,self.generate_voice(reply) def update_settings(self,temp,tokens,chunk_size,overlap,k,web,calc,fact,analysis): self.temperature = float(temp) self.max_tokens = int(tokens) self.chunk_size = int(chunk_size) self.chunk_overlap = int(overlap) self.retrieval_k = int(k) self.enable_web_search = bool(web) self.enable_calculations = bool(calc) self.enable_fact_checking = bool(fact) self.enable_analysis = bool(analysis) return f"Settings updated! Temp={temp},Tokens={tokens}" def create_interface(): agent = AgenticRAGAgent() with gr.Blocks(title="AI Research Agent") as interface: chat_memory = gr.State([]) gr.HTML("""

🤖 AI Research Agent

📈 Stocks | 🧮 Calculator | 📄 PDF | 🌐 Web Search

""") with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot(label="Chat",height=500) with gr.Row(): msg = gr.Textbox(placeholder="Try: 'stock price of MTBC' or '2*4' or 'summarize the PDF'",scale=4) submit_btn = gr.Button("Send",variant="primary") clear_btn = gr.Button("Clear") audio_output = gr.Audio(label="Voice",autoplay=True) with gr.Column(scale=1): pdf_upload = gr.Files(file_types=[".pdf"],label="Upload PDFs") upload_status = gr.Textbox(label="Status",interactive=False) with gr.Accordion("Settings",open=False): temp = gr.Slider(0,1,value=0.3,label="Temperature") tokens = gr.Slider(100,2000,value=1000,label="Max Tokens") chunk = gr.Slider(256,1024,value=512,label="Chunk Size") overlap = gr.Slider(0,200,value=50,label="Overlap") k = gr.Slider(3,15,value=10,label="Retrieval K") web = gr.Checkbox(value=True,label="Web Search") calc = gr.Checkbox(value=True,label="Calculator") fact = gr.Checkbox(value=True,label="Fact Check") analysis = gr.Checkbox(value=True,label="Analysis") apply_btn = gr.Button("Apply") status = gr.Textbox(label="Settings Status") def respond(message,history): new_history,audio = agent.ask(message,history) display = [] for item in new_history: if isinstance(item,list) and len(item) == 2: display.append({"role": "user","content": str(item[0])}) display.append({"role": "assistant","content": str(item[1])}) return "",new_history,display,audio submit_btn.click(respond,[msg,chat_memory],[msg,chat_memory,chatbot,audio_output]) msg.submit(respond,[msg,chat_memory],[msg,chat_memory,chatbot,audio_output]) clear_btn.click(lambda: ([],[]),outputs=[chat_memory,chatbot]) pdf_upload.change(agent.upload_pdfs,[pdf_upload],[upload_status]) apply_btn.click(agent.update_settings,[temp,tokens,chunk,overlap,k,web,calc,fact,analysis],[status]) return interface if __name__ == "__main__": print("Starting AI Research Agent...") app = create_interface() app.launch(server_name="0.0.0.0",server_port=7860,show_error=True)