import os import base64 from io import BytesIO import gradio as gr import requests import pandas as pd import tools from smolagents import InferenceClientModel from typing import TypedDict, List, Dict, Any, Optional from langgraph.graph import StateGraph, START, END # Helper to build a smolagents-compatible message list def _msg(content: str) -> list: return [{"role": "user", "content": content}] # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # --- Models via HF Inference API (correct method for HF Spaces) --- # InferenceClientModel routes all calls through the HF Serverless Inference API. # No GPU or local model weights are required in the Space container. model = InferenceClientModel( model_id="meta-llama/Llama-3.2-3B-Instruct", max_tokens=2048, temperature=0.3, ) math_model = InferenceClientModel( model_id="deepseek-ai/deepseek-math-7b-instruct", max_tokens=2048, temperature=0.3, ) # Vision model for image / OCR tasks — also served via Inference API vision_model = InferenceClientModel( model_id="Qwen/Qwen2.5-VL-7B-Instruct", max_tokens=2048, ) def _extract_text_from_response(response: Any) -> str: """Normalize model responses into plain text.""" if response is None: return "" if isinstance(response, str): return response if isinstance(response, dict): for key in ("content", "answer", "output", "text", "solution", "extracted_info"): if key in response and response[key] is not None: return str(response[key]) return str(response) content = getattr(response, "content", None) if content is not None: return str(content) return str(response) # --- State --- class AgentState(TypedDict): question: str task_id: Optional[str] file_name: Optional[str] is_searching: Optional[bool] have_file: Optional[bool] is_math: Optional[bool] have_image: Optional[bool] final_answer: Optional[str] retry_count: Optional[int] messages: List[Dict[str, Any]] # --- Nodes --- def read(state: AgentState) -> dict: """Agent reads and logs the incoming question.""" question = state["question"] print(f"Agent is reading the question: {question[:50]}...") return {} def classify(state: AgentState) -> dict: """Agent classifies the question to determine which tools to use.""" question = state["question"].lower() prompt = f""" You are an agent that classifies questions to determine which tools to use. Classify the following question into the categories: 'need to be searched on web/wikipedia', 'has a file in the question', 'is a math problem', 'has an image in the question'. Question: {question} Return a JSON object with boolean fields for each category, for example: {{ "is_searching": true, "have_file": false, "is_math": false, "have_image": false }} """ messages = _msg(prompt) response = model(messages) raw = _extract_text_from_response(response) import json, re match = re.search(r'\{.*?\}', raw, re.DOTALL) data = {} if match: try: data = json.loads(match.group()) except json.JSONDecodeError: pass is_searching = bool(data.get("is_searching", False)) have_file = bool(data.get("have_file", False)) is_math = bool(data.get("is_math", False)) have_image = bool(data.get("have_image", False)) print(f"Classification: is_searching={is_searching}, have_file={have_file}, is_math={is_math}, have_image={have_image}") new_messages = state.get("messages", []) + [ {"role": "system", "content": "Classify the question to determine which tools to use."}, {"role": "user", "content": question}, {"role": "assistant", "content": f"is_searching={is_searching}, have_file={have_file}, is_math={is_math}, have_image={have_image}"}, ] return { "is_searching": is_searching, "have_file": have_file, "is_math": is_math, "have_image": have_image, "messages": new_messages, } def handele_search(state: AgentState) -> dict: """Agent performs a web search if classified as needing search.""" question = state["question"] print(f"Agent is performing a web search for: {question[:50]}...") search_results = tools.WebSearchTool()(question) print(f"Search results: {search_results[:100]}...") new_messages = state.get("messages", []) + [ {"role": "system", "content": "Perform a web search if classified as needing search."}, {"role": "user", "content": question}, {"role": "assistant", "content": f"Search results: {search_results[:100]}..."}, ] return {"search_results": search_results, "messages": new_messages} def handle_image(state: AgentState) -> dict: """Agent handles an image using a vision model via the HF Inference API. Instead of loading a local transformer model (which would be too heavy for a standard Space), the image is forwarded to a vision-capable InferenceClientModel (Qwen2.5-VL) through the HF Serverless Inference API. """ question = state["question"] task_id = state.get("task_id", "") file_name = state.get("file_name", "") image_reader = tools.ImageReaderTool() image_data_uri = image_reader(task_id, file_name) if task_id and file_name else "" if not image_data_uri or image_data_uri.startswith("Failed"): print(f"Could not download image for task {task_id}") new_messages = state.get("messages", []) + [ {"role": "assistant", "content": f"[Could not download image '{file_name}' for analysis.]"}, ] return {"image_description": "", "transcribed_text": "", "messages": new_messages} prompt_text = ( f"Analyze the attached image in detail.\n" f"Describe its content and transcribe all text visible in it.\n\n" f"Question: {question}\n\n" f"Return a JSON object: " f'{{ "image_description": "...", "transcribed_text": "..." }}' ) # Send image + text to the vision model via the HF Inference API. # InferenceClientModel accepts OpenAI-style multimodal message format. vision_messages = [ { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": image_data_uri}}, {"type": "text", "text": prompt_text}, ], } ] try: response = vision_model(vision_messages) ocr_text = _extract_text_from_response(response) except Exception as e: ocr_text = f"Vision model error: {e}" import json, re match = re.search(r'\{.*?\}', ocr_text, re.DOTALL) image_description = ocr_text transcribed_text = ocr_text if match: try: data = json.loads(match.group()) image_description = data.get("image_description", ocr_text) transcribed_text = data.get("transcribed_text", ocr_text) except json.JSONDecodeError: pass print(f"Image description: {image_description[:100]}...") print(f"Transcribed text: {transcribed_text[:100]}...") new_messages = state.get("messages", []) + [ {"role": "system", "content": "Analyze and describe the image if classified as having an image."}, {"role": "user", "content": question}, {"role": "assistant", "content": f"Image description: {image_description[:100]}..., Transcribed text: {transcribed_text[:100]}..."}, ] return {"image_description": image_description, "transcribed_text": transcribed_text, "messages": new_messages} def handle_file(state: AgentState) -> dict: """Agent processes the file if classified as having a file.""" question = state["question"] task_id = state.get("task_id", "") file_name = state.get("file_name", "") file_reader = tools.FileReaderTool() file_content = file_reader(task_id, file_name) if task_id and file_name else "" file_context = "" if file_content: file_context = f"\n\n--- Attached file: {file_name} ---\n{file_content}\n--- End of file ---" elif file_name: file_context = f"\n\n[Note: A file '{file_name}' was referenced but could not be retrieved.]" prompt = ( f"You are an agent that can read and extract information from files.\n" f"Read the attached file content carefully and extract any relevant information " f"that could help answer the question.\n\n" f"Question: {question}{file_context}\n\n" f'Return a JSON object: {{ "extracted_info": "..." }}' ) messages = _msg(prompt) response = model(messages) extracted_info = _extract_text_from_response(response) print(f"Extracted file info: {extracted_info[:100]}...") new_messages = state.get("messages", []) + [ {"role": "system", "content": "Read and extract information from the attached file."}, {"role": "user", "content": question}, {"role": "assistant", "content": f"Extracted info: {extracted_info[:100]}..."}, ] return {"extracted_info": extracted_info, "messages": new_messages} def handle_math(state: AgentState) -> dict: """Agent handles a math problem if classified as a math problem.""" question = state["question"] print(f"Agent is handling a math problem: {question[:50]}...") messages = _msg(f"Solve the following math problem step by step:\n\n{question}") response = math_model(messages) solution = _extract_text_from_response(response) print(f"Math solution: {solution[:100]}...") new_messages = state.get("messages", []) + [ {"role": "system", "content": "Handle the question if classified as a math problem."}, {"role": "user", "content": question}, {"role": "assistant", "content": f"Math solution: {solution[:100]}..."}, ] return {"math_solution": solution, "messages": new_messages} def answer(state: AgentState) -> dict: """Synthesize a final answer from all gathered context in messages.""" question = state["question"] messages_history = state.get("messages", []) context_parts = [ msg["content"] for msg in messages_history if msg.get("role") == "assistant" ] context = "\n".join(context_parts) if context_parts else "No additional context gathered." prompt = ( "You are a general AI assistant. I will ask you a question. Report your thoughts, " "and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. " "YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated " "list of numbers and/or strings. If you are asked for a number, don't use comma to write " "your number neither use units such as $ or percent sign unless specified otherwise. " "If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), " "and write the digits in plain text unless specified otherwise. If you are asked for a comma " "separated list, apply the above rules depending of whether the element to be put in the list " "is a number or a string.\n\n" f"Question: {question}\n\n" f"Context gathered:\n{context}\n" ) messages = _msg(prompt) response = model(messages) raw_response = _extract_text_from_response(response) if "FINAL ANSWER:" in raw_response: final_answer = raw_response.split("FINAL ANSWER:")[-1].strip() else: final_answer = raw_response.strip() print(f"Final answer: {final_answer[:100]}...") return {"final_answer": final_answer} def evaluate(state: AgentState) -> dict: """LLM evaluates whether the current final_answer is adequate.""" import json, re question = state["question"] current_answer = state.get("final_answer", "") retry_count = state.get("retry_count", 0) or 0 prompt = ( f"You are a strict evaluator. Given the question and a candidate answer, decide if the " f"answer is complete, relevant, and not an error message.\n\n" f"Question: {question}\nCandidate answer: {current_answer}\n\n" f'Return ONLY a JSON object:\n' f'{{"is_adequate": true}} if the answer looks correct and complete,\n' f'{{"is_adequate": false}} if the answer is wrong, incomplete, an error, or says it could not find information.' ) response = model(_msg(prompt)) raw = _extract_text_from_response(response) match = re.search(r'\{.*?\}', raw, re.DOTALL) data = {} if match: try: data = json.loads(match.group()) except json.JSONDecodeError: pass is_adequate = bool(data.get("is_adequate", True)) print(f"Evaluation: is_adequate={is_adequate}, retry_count={retry_count}") return { "retry_count": retry_count + (0 if is_adequate else 1), "is_searching": False if not is_adequate else state.get("is_searching"), "have_file": False if not is_adequate else state.get("have_file"), "is_math": False if not is_adequate else state.get("is_math"), "have_image": False if not is_adequate else state.get("have_image"), } def route_after_evaluate(state: AgentState) -> str: retry_count = state.get("retry_count", 0) or 0 if retry_count > 0 and retry_count <= 2: print(f"Answer inadequate — retry {retry_count}/2, routing to web search") return "handle_search" return END def route_after_classify(state: AgentState) -> str: if state.get("have_image"): return "handle_image" if state.get("have_file"): return "handle_file" if state.get("is_math"): return "handle_math" if state.get("is_searching"): return "handle_search" return "answer" # --- Build LangGraph --- agent_graph = StateGraph(AgentState) agent_graph.add_node("read", read) agent_graph.add_node("classify", classify) agent_graph.add_node("handle_search", handele_search) agent_graph.add_node("handle_image", handle_image) agent_graph.add_node("handle_file", handle_file) agent_graph.add_node("handle_math", handle_math) agent_graph.add_node("answer", answer) agent_graph.add_node("evaluate", evaluate) agent_graph.add_edge(START, "read") agent_graph.add_edge("read", "classify") agent_graph.add_conditional_edges("classify", route_after_classify) agent_graph.add_edge("handle_search", "answer") agent_graph.add_edge("handle_image", "answer") agent_graph.add_edge("handle_file", "answer") agent_graph.add_edge("handle_math", "answer") agent_graph.add_edge("answer", "evaluate") agent_graph.add_conditional_edges("evaluate", route_after_evaluate) compiled_agent = agent_graph.compile() # --- Agent --- class BasicAgent: def __init__(self): self.file_reader = tools.FileReaderTool() self.image_reader = tools.ImageReaderTool() self.web_search = tools.WebSearchTool() print("Agent initialized.") def __call__(self, question: str, task_id: str = "", file_name: str = "") -> str: print(f"Agent received question (first 50 chars): {question[:50]}...") result_state = compiled_agent.invoke({ "question": question, "task_id": task_id, "file_name": file_name, "messages": [], "is_searching": False, "have_file": False, "is_math": False, "have_image": False, "final_answer": "", "retry_count": 0, }) final_answer = result_state.get("final_answer", "No answer produced.") print(f"Agent returning answer: {final_answer[:100]}...") return final_answer def run_and_submit_all(profile: gr.OAuthProfile | None): """Fetches all questions, runs the BasicAgent on them, submits all answers.""" space_id = os.getenv("SPACE_ID") if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" try: agent = BasicAgent() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(agent_code) print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: return f"Error fetching questions: {e}", None except Exception as e: return f"An unexpected error occurred fetching questions: {e}", None results_log = [] answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") or item.get("Question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue file_name = item.get("file_name", "") if file_name: print(f"Task {task_id} has attached file: {file_name}") try: submitted_answer = agent(question_text, task_id=task_id, file_name=file_name) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) except Exception as e: print(f"Error running agent on task {task_id}: {e}") results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) if not answers_payload: return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") return final_status, pd.DataFrame(results_log) except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except Exception: error_detail += f" Response: {e.response.text[:500]}" return f"Submission Failed: {error_detail}", pd.DataFrame(results_log) except requests.exceptions.Timeout: return "Submission Failed: The request timed out.", pd.DataFrame(results_log) except requests.exceptions.RequestException as e: return f"Submission Failed: Network error - {e}", pd.DataFrame(results_log) except Exception as e: return f"An unexpected error occurred during submission: {e}", pd.DataFrame(results_log) # --- Gradio Interface --- with gr.Blocks() as demo: gr.Markdown("# Basic Agent Evaluation Runner") gr.Markdown( """ **Instructions:** 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. --- **Disclaimers:** Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a separate action or even to answer the questions in async. """ ) gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("\n" + "-" * 30 + " App Starting " + "-" * 30) space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") if space_host_startup: print(f"✅ SPACE_HOST found: {space_host_startup}") print(f" Runtime URL should be: https://{space_host_startup}.hf.space") else: print("ℹ️ SPACE_HOST environment variable not found (running locally?).") if space_id_startup: print(f"✅ SPACE_ID found: {space_id_startup}") print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") else: print("ℹ️ SPACE_ID environment variable not found (running locally?).") print("-" * (60 + len(" App Starting ")) + "\n") print("Launching Gradio Interface for Basic Agent Evaluation...") demo.launch(debug=True, share=False)