# Import os paths, serialization tools, and datetime metadata
import json
import os
from datetime import datetime
# Import deep learning acceleration libraries
import torch
# Import Hugging Face API connections and agent frameworks
from huggingface_hub import HfApi
from smolagents import CodeAgent, Model, tool
# Import query functions from database module
from database import db_query, db_execute
# Import model loaders and device mapping tools
from models import load_gemma_model, get_device
# Import spaces wrapper conditionally for Hugging Face ZeroGPU compatibility
try:
import spaces
has_spaces = True
except ImportError:
has_spaces = False
def gpu_decorator(fn):
"""Decorator to conditionally apply spaces.GPU on Hugging Face Spaces."""
if has_spaces:
return spaces.GPU(fn)
return fn
# Custom Response Wrapper class matching smolagents Model return contract
class ChatResponse:
# Initialize response object with generated text content
def __init__(self, content):
self.content = content
# Custom Model Subclass to reuse our warm, preloaded Gemma 4 model instance
class GemmaLocalModel(Model):
# Initialize the custom model wrapper
def __init__(self, model_id="google/gemma-4-12B-it", **kwargs):
super().__init__(model_id=model_id, **kwargs)
# Implement generate to run inference inside agent execution loops
@gpu_decorator
def generate(self, messages, stop_sequences=None, **kwargs):
# Retrieve the preloaded model and processor from cache
model, processor = load_gemma_model()
# Resolve target device mapping
device = get_device()
# Format prompt messages into the model's native chat syntax
text_prompt = processor.apply_chat_template(
messages, add_generation_prompt=True
)
# Process and transfer text inputs to the hardware device
inputs = processor(text=text_prompt, return_tensors="pt").to(device)
# Ensure correct datatypes are mapped for visual/multimodal layers
if "pixel_values" in inputs:
inputs["pixel_values"] = inputs["pixel_values"].to(model.dtype)
# Run text generation under no-gradient constraints
with torch.no_grad():
generated_ids = model.generate(
**inputs, max_new_tokens=1024, temperature=0.2, do_sample=True
)
# Isolate the input token count
input_len = inputs["input_ids"].shape[1]
# Decode the output token IDs, skipping prompt tokens
response_ids = generated_ids[0][input_len:]
# Convert token IDs back to a string
response_text = processor.decode(response_ids, skip_special_tokens=True).strip()
# Truncate response text if the model outputs any defined stop sequence
if stop_sequences:
for stop_seq in stop_sequences:
if stop_seq in response_text:
response_text = response_text.split(stop_seq)[0]
# Return response wrapped in our ChatResponse object
return ChatResponse(content=response_text)
# --- Define Agent Tools ---
@tool
def query_inventory_database(sql_query: str) -> str:
"""
Executes a read-only SQL SELECT query against the local database and returns results.
Use this to look up inventory items, orders, or customers.
Args:
sql_query: A valid SQL SELECT query string.
"""
# Enforce SELECT constraint to block malicious write commands in this tool
if not sql_query.strip().lower().startswith("select"):
return "Error: This tool only allows SELECT queries. Use execute_database_command for database modifications."
# Run the read query against SQLite
results = db_query(sql_query)
# Format database rows into a readable JSON string
return json.dumps(results, indent=2)
@tool
def execute_database_command(sql_command: str) -> str:
"""
Executes an INSERT, UPDATE, or DELETE SQL statement against the local database to modify records.
Use this to record new orders, update stock levels, or update customer shipping addresses.
Args:
sql_command: A valid SQL write command string.
"""
# Block SELECT statements to ensure clean usage mapping
if sql_command.strip().lower().startswith("select"):
return "Error: Use query_inventory_database for SELECT queries."
# Run database write modification command
result = db_execute(sql_command)
return result
@tool
def generate_invoice_pdf(order_id: int) -> str:
"""
Generates a professional PDF invoice for a given order ID and saves it locally.
Returns the file path of the generated PDF.
Args:
order_id: The ID of the order to generate an invoice for.
"""
# Import ReportLab page sizing utilities
from reportlab.lib.pagesizes import letter
# Import flowable element classes for layout building
from reportlab.platypus import (
SimpleDocTemplate,
Paragraph,
Spacer,
Table,
TableStyle,
)
# Import text style registries
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
# Import color palettes
from reportlab.lib import colors
# Query order details from database
order_rows = db_query("SELECT * FROM orders WHERE id = ?", (order_id,))
# Validate order existence
if not order_rows or "error" in order_rows[0]:
return f"Error: Order ID {order_id} not found."
# Reference target order dictionary
order = order_rows[0]
# Query matching customer details
customer_rows = db_query(
"SELECT * FROM customers WHERE id = ?", (order["customer_id"],)
)
# Map customer dict, fallback to placeholders if missing
customer = (
customer_rows[0]
if customer_rows
else {"name": "Unknown", "email": "N/A", "shipping_address": "N/A"}
)
# Query all item rows related to this order
items = db_query(
"SELECT order_items.*, inventory.name AS item_name FROM order_items JOIN inventory ON order_items.product_id = inventory.id WHERE order_id = ?",
(order_id,),
)
# Compile the target PDF filename
pdf_filename = f"invoice_order_{order_id}.pdf"
# Resolve absolute path for PDF generation
pdf_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), pdf_filename)
try:
# Instantiate ReportLab page layout with standard letter sizing
doc = SimpleDocTemplate(pdf_path, pagesize=letter)
# Initialize empty document story flow list
story = []
# Load sample style dictionary
styles = getSampleStyleSheet()
# Design custom brand-aligned header style
title_style = ParagraphStyle(
"InvoiceTitle",
parent=styles["Heading1"],
fontName="Helvetica-Bold",
fontSize=24,
leading=28,
textColor=colors.HexColor("#0d9488"), # Teal branding color
)
# Reference normal text style
normal_style = styles["Normal"]
# Append Title to document flow
story.append(Paragraph("INVOICE", title_style))
# Append spacing under Title
story.append(Spacer(1, 15))
# Format customer billing info block
details_text = f"""
Order ID: {order_id}
Date: {order["order_date"]}
Status: {order["status"]}
Billed To:
{customer["name"]}
Email: {customer["email"]}
Address: {customer["shipping_address"]}
"""
# Append customer billing details to document flow
story.append(Paragraph(details_text, normal_style))
# Append spacing under details
story.append(Spacer(1, 20))
# Setup items list header
data = [["Item", "Quantity", "Price", "Total"]]
# Format line items into the grid list
for item in items:
qty = item["quantity"]
price = item["price"]
total = qty * price
data.append([item["item_name"], str(qty), f"${price:.2f}", f"${total:.2f}"])
# Append final sum indicator row
data.append(["", "", "Total Due:", f"${order['total_price']:.2f}"])
# Create table mapping columns widths
table = Table(data, colWidths=[250, 70, 90, 90])
# Apply custom table grids, colors, padding, and font stylings
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#f3f4f6")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#111827")),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("BOTTOMPADDING", (0, 0), (-1, 0), 8),
("GRID", (0, 0), (-1, -2), 0.5, colors.HexColor("#e5e7eb")),
("LINEBELOW", (0, -1), (-1, -1), 1.5, colors.HexColor("#0d9488")),
("TOPPADDING", (0, -1), (-1, -1), 8),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
]
)
)
# Append table grid to document flow
story.append(table)
# Build the final PDF document on disk
doc.build(story)
return f"Success: Invoice PDF generated successfully at {pdf_path}"
except Exception as e:
# Catch and report any formatting errors during generation
return f"Error building PDF: {str(e)}"
@tool
def draft_customer_email(to_email: str, subject: str, body_content: str) -> str:
"""
Drafts an email to a customer and saves it locally in a structured text file.
Returns the file path of the draft email.
Args:
to_email: The customer's email address.
subject: The subject line of the email.
body_content: The formatted body content of the email.
"""
# Create the target email filename
draft_filename = f"email_draft_{to_email.replace('@', '_at_')}.txt"
# Resolve absolute path for email generation
draft_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), draft_filename
)
try:
# Write email text blocks to draft file
with open(draft_path, "w", encoding="utf-8") as f:
f.write(f"To: {to_email}\n")
f.write(f"Subject: {subject}\n")
f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 40 + "\n\n")
f.write(body_content)
f.write("\n\n" + "=" * 40 + "\n")
f.write("Draft auto-generated by Vessel Studio Client.\n")
return f"Success: Email draft saved successfully at {draft_path}"
except Exception as e:
# Return error if file write fails
return f"Error writing email file: {str(e)}"
# --- Trace Pushing Mechanism ---
def share_trace_on_hub(task_id: str, steps: list) -> bool:
"""Serializes the agent execution steps to JSON and pushes it to the Hugging Face Space."""
# Load Hugging Face authorization credentials from environment
token = os.getenv("HF_TOKEN")
# Load Space repository identifier from environment
repo_id = os.getenv("SPACE_ID")
# Resolve trace logging directory path
trace_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "agent_traces")
# Create directory if it is absent
os.makedirs(trace_dir, exist_ok=True)
# Format trace log dictionary
trace_data = {
"task_id": task_id,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"steps": [str(step) for step in steps],
}
# Format trace filename
filename = f"trace_{task_id}.json"
# Resolve target file path
local_path = os.path.join(trace_dir, filename)
# Save structured trace log into a JSON file locally
with open(local_path, "w", encoding="utf-8") as f:
json.dump(trace_data, f, indent=2)
# Push file to Space Hub if credentials and Space target are configured
if token and token != "hf_YOUR_WRITE_TOKEN_HERE" and repo_id:
try:
# Instantiate Hugging Face API client
api = HfApi(token=token)
# Upload file directly to the Space repository
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=f"agent_traces/{filename}",
repo_id=repo_id,
repo_type="space",
)
print(f"Trace pushed to Space repo: {repo_id}/agent_traces/{filename}")
return True
except Exception as e:
# Log failure if upload is blocked
print(f"Failed to push trace to Hub: {e}")
return False
# Log local fallback
print("Trace saved locally (SPACE_ID or HF_TOKEN config not found).")
return False
# --- Core Run Agent Loop ---
def run_agent(task_prompt: str, task_id: str | None = None) -> dict:
"""Instantiates a CodeAgent with our Gemma 4 model and runs the task."""
# Fallback to dynamic timestamp-based ID if none is supplied
if task_id is None:
task_id = f"task_{int(datetime.now().timestamp())}"
# Initialize our custom model wrapper
model_wrapper = GemmaLocalModel()
# Define tools list for agent capability mapping
tools = [
query_inventory_database,
execute_database_command,
generate_invoice_pdf,
draft_customer_email,
]
# Instantiate CodeAgent
agent = CodeAgent(tools=tools, model=model_wrapper, max_steps=5)
# Execute target task instruction
result = agent.run(task_prompt)
# Fetch intermediate execution steps from agent memory
steps = agent.memory.get_full_steps() if hasattr(agent, "memory") else []
# Upload logs file to the Hub (Sharing is Caring badge)
shared = share_trace_on_hub(task_id, steps)
return {
"task_id": task_id,
"result": result,
"trace_shared": shared,
"local_trace_path": os.path.join("agent_traces", f"trace_{task_id}.json"),
}
# Execute test runs on direct execution
if __name__ == "__main__":
# Import database initializer
import database
# Set tables schemas and seed data
database.initialize_database()
# Test query instruction
test_prompt = "Query the inventory to find all items that have 'rustic' style, and count them."
try:
print("Running agent test task...")
# Run test execution
agent_out = run_agent(test_prompt, "test_run_01")
print("\nAgent Output:", agent_out["result"])
print("Trace Path:", agent_out["local_trace_path"])
except Exception as e:
# Log failure
print("Agent failed to run:", e)