# Import os paths, serialization tools, and datetime metadata import json import os from datetime import datetime # Import deep learning acceleration libraries import torch # Import Hugging Face API connections and agent frameworks from huggingface_hub import HfApi from smolagents import CodeAgent, Model, tool # Import query functions from database module from database import db_query, db_execute # Import model loaders and device mapping tools from models import load_gemma_model, get_device # Import spaces wrapper conditionally for Hugging Face ZeroGPU compatibility try: import spaces has_spaces = True except ImportError: has_spaces = False def gpu_decorator(fn): """Decorator to conditionally apply spaces.GPU on Hugging Face Spaces.""" if has_spaces: return spaces.GPU(fn) return fn # Custom Response Wrapper class matching smolagents Model return contract class ChatResponse: # Initialize response object with generated text content def __init__(self, content): self.content = content # Custom Model Subclass to reuse our warm, preloaded Gemma 4 model instance class GemmaLocalModel(Model): # Initialize the custom model wrapper def __init__(self, model_id="google/gemma-4-12B-it", **kwargs): super().__init__(model_id=model_id, **kwargs) # Implement generate to run inference inside agent execution loops @gpu_decorator def generate(self, messages, stop_sequences=None, **kwargs): # Retrieve the preloaded model and processor from cache model, processor = load_gemma_model() # Resolve target device mapping device = get_device() # Format prompt messages into the model's native chat syntax text_prompt = processor.apply_chat_template( messages, add_generation_prompt=True ) # Process and transfer text inputs to the hardware device inputs = processor(text=text_prompt, return_tensors="pt").to(device) # Ensure correct datatypes are mapped for visual/multimodal layers if "pixel_values" in inputs: inputs["pixel_values"] = inputs["pixel_values"].to(model.dtype) # Run text generation under no-gradient constraints with torch.no_grad(): generated_ids = model.generate( **inputs, max_new_tokens=1024, temperature=0.2, do_sample=True ) # Isolate the input token count input_len = inputs["input_ids"].shape[1] # Decode the output token IDs, skipping prompt tokens response_ids = generated_ids[0][input_len:] # Convert token IDs back to a string response_text = processor.decode(response_ids, skip_special_tokens=True).strip() # Truncate response text if the model outputs any defined stop sequence if stop_sequences: for stop_seq in stop_sequences: if stop_seq in response_text: response_text = response_text.split(stop_seq)[0] # Return response wrapped in our ChatResponse object return ChatResponse(content=response_text) # --- Define Agent Tools --- @tool def query_inventory_database(sql_query: str) -> str: """ Executes a read-only SQL SELECT query against the local database and returns results. Use this to look up inventory items, orders, or customers. Args: sql_query: A valid SQL SELECT query string. """ # Enforce SELECT constraint to block malicious write commands in this tool if not sql_query.strip().lower().startswith("select"): return "Error: This tool only allows SELECT queries. Use execute_database_command for database modifications." # Run the read query against SQLite results = db_query(sql_query) # Format database rows into a readable JSON string return json.dumps(results, indent=2) @tool def execute_database_command(sql_command: str) -> str: """ Executes an INSERT, UPDATE, or DELETE SQL statement against the local database to modify records. Use this to record new orders, update stock levels, or update customer shipping addresses. Args: sql_command: A valid SQL write command string. """ # Block SELECT statements to ensure clean usage mapping if sql_command.strip().lower().startswith("select"): return "Error: Use query_inventory_database for SELECT queries." # Run database write modification command result = db_execute(sql_command) return result @tool def generate_invoice_pdf(order_id: int) -> str: """ Generates a professional PDF invoice for a given order ID and saves it locally. Returns the file path of the generated PDF. Args: order_id: The ID of the order to generate an invoice for. """ # Import ReportLab page sizing utilities from reportlab.lib.pagesizes import letter # Import flowable element classes for layout building from reportlab.platypus import ( SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, ) # Import text style registries from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle # Import color palettes from reportlab.lib import colors # Query order details from database order_rows = db_query("SELECT * FROM orders WHERE id = ?", (order_id,)) # Validate order existence if not order_rows or "error" in order_rows[0]: return f"Error: Order ID {order_id} not found." # Reference target order dictionary order = order_rows[0] # Query matching customer details customer_rows = db_query( "SELECT * FROM customers WHERE id = ?", (order["customer_id"],) ) # Map customer dict, fallback to placeholders if missing customer = ( customer_rows[0] if customer_rows else {"name": "Unknown", "email": "N/A", "shipping_address": "N/A"} ) # Query all item rows related to this order items = db_query( "SELECT order_items.*, inventory.name AS item_name FROM order_items JOIN inventory ON order_items.product_id = inventory.id WHERE order_id = ?", (order_id,), ) # Compile the target PDF filename pdf_filename = f"invoice_order_{order_id}.pdf" # Resolve absolute path for PDF generation pdf_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), pdf_filename) try: # Instantiate ReportLab page layout with standard letter sizing doc = SimpleDocTemplate(pdf_path, pagesize=letter) # Initialize empty document story flow list story = [] # Load sample style dictionary styles = getSampleStyleSheet() # Design custom brand-aligned header style title_style = ParagraphStyle( "InvoiceTitle", parent=styles["Heading1"], fontName="Helvetica-Bold", fontSize=24, leading=28, textColor=colors.HexColor("#0d9488"), # Teal branding color ) # Reference normal text style normal_style = styles["Normal"] # Append Title to document flow story.append(Paragraph("INVOICE", title_style)) # Append spacing under Title story.append(Spacer(1, 15)) # Format customer billing info block details_text = f""" Order ID: {order_id}
Date: {order["order_date"]}
Status: {order["status"]}

Billed To:
{customer["name"]}
Email: {customer["email"]}
Address: {customer["shipping_address"]}
""" # Append customer billing details to document flow story.append(Paragraph(details_text, normal_style)) # Append spacing under details story.append(Spacer(1, 20)) # Setup items list header data = [["Item", "Quantity", "Price", "Total"]] # Format line items into the grid list for item in items: qty = item["quantity"] price = item["price"] total = qty * price data.append([item["item_name"], str(qty), f"${price:.2f}", f"${total:.2f}"]) # Append final sum indicator row data.append(["", "", "Total Due:", f"${order['total_price']:.2f}"]) # Create table mapping columns widths table = Table(data, colWidths=[250, 70, 90, 90]) # Apply custom table grids, colors, padding, and font stylings table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#f3f4f6")), ("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#111827")), ("ALIGN", (0, 0), (-1, -1), "LEFT"), ("BOTTOMPADDING", (0, 0), (-1, 0), 8), ("GRID", (0, 0), (-1, -2), 0.5, colors.HexColor("#e5e7eb")), ("LINEBELOW", (0, -1), (-1, -1), 1.5, colors.HexColor("#0d9488")), ("TOPPADDING", (0, -1), (-1, -1), 8), ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), ] ) ) # Append table grid to document flow story.append(table) # Build the final PDF document on disk doc.build(story) return f"Success: Invoice PDF generated successfully at {pdf_path}" except Exception as e: # Catch and report any formatting errors during generation return f"Error building PDF: {str(e)}" @tool def draft_customer_email(to_email: str, subject: str, body_content: str) -> str: """ Drafts an email to a customer and saves it locally in a structured text file. Returns the file path of the draft email. Args: to_email: The customer's email address. subject: The subject line of the email. body_content: The formatted body content of the email. """ # Create the target email filename draft_filename = f"email_draft_{to_email.replace('@', '_at_')}.txt" # Resolve absolute path for email generation draft_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), draft_filename ) try: # Write email text blocks to draft file with open(draft_path, "w", encoding="utf-8") as f: f.write(f"To: {to_email}\n") f.write(f"Subject: {subject}\n") f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("=" * 40 + "\n\n") f.write(body_content) f.write("\n\n" + "=" * 40 + "\n") f.write("Draft auto-generated by Vessel Studio Client.\n") return f"Success: Email draft saved successfully at {draft_path}" except Exception as e: # Return error if file write fails return f"Error writing email file: {str(e)}" # --- Trace Pushing Mechanism --- def share_trace_on_hub(task_id: str, steps: list) -> bool: """Serializes the agent execution steps to JSON and pushes it to the Hugging Face Space.""" # Load Hugging Face authorization credentials from environment token = os.getenv("HF_TOKEN") # Load Space repository identifier from environment repo_id = os.getenv("SPACE_ID") # Resolve trace logging directory path trace_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "agent_traces") # Create directory if it is absent os.makedirs(trace_dir, exist_ok=True) # Format trace log dictionary trace_data = { "task_id": task_id, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "steps": [str(step) for step in steps], } # Format trace filename filename = f"trace_{task_id}.json" # Resolve target file path local_path = os.path.join(trace_dir, filename) # Save structured trace log into a JSON file locally with open(local_path, "w", encoding="utf-8") as f: json.dump(trace_data, f, indent=2) # Push file to Space Hub if credentials and Space target are configured if token and token != "hf_YOUR_WRITE_TOKEN_HERE" and repo_id: try: # Instantiate Hugging Face API client api = HfApi(token=token) # Upload file directly to the Space repository api.upload_file( path_or_fileobj=local_path, path_in_repo=f"agent_traces/{filename}", repo_id=repo_id, repo_type="space", ) print(f"Trace pushed to Space repo: {repo_id}/agent_traces/{filename}") return True except Exception as e: # Log failure if upload is blocked print(f"Failed to push trace to Hub: {e}") return False # Log local fallback print("Trace saved locally (SPACE_ID or HF_TOKEN config not found).") return False # --- Core Run Agent Loop --- def run_agent(task_prompt: str, task_id: str | None = None) -> dict: """Instantiates a CodeAgent with our Gemma 4 model and runs the task.""" # Fallback to dynamic timestamp-based ID if none is supplied if task_id is None: task_id = f"task_{int(datetime.now().timestamp())}" # Initialize our custom model wrapper model_wrapper = GemmaLocalModel() # Define tools list for agent capability mapping tools = [ query_inventory_database, execute_database_command, generate_invoice_pdf, draft_customer_email, ] # Instantiate CodeAgent agent = CodeAgent(tools=tools, model=model_wrapper, max_steps=5) # Execute target task instruction result = agent.run(task_prompt) # Fetch intermediate execution steps from agent memory steps = agent.memory.get_full_steps() if hasattr(agent, "memory") else [] # Upload logs file to the Hub (Sharing is Caring badge) shared = share_trace_on_hub(task_id, steps) return { "task_id": task_id, "result": result, "trace_shared": shared, "local_trace_path": os.path.join("agent_traces", f"trace_{task_id}.json"), } # Execute test runs on direct execution if __name__ == "__main__": # Import database initializer import database # Set tables schemas and seed data database.initialize_database() # Test query instruction test_prompt = "Query the inventory to find all items that have 'rustic' style, and count them." try: print("Running agent test task...") # Run test execution agent_out = run_agent(test_prompt, "test_run_01") print("\nAgent Output:", agent_out["result"]) print("Trace Path:", agent_out["local_trace_path"]) except Exception as e: # Log failure print("Agent failed to run:", e)