""" Auto-Ontology — Automotive Market Intelligence HuggingFace Space: Gradio Blocks with pipeline narrative + Strands Agent chatbot. """ import os from pathlib import Path import gradio as gr import pandas as pd # --------------------------------------------------------------------------- # Load dataset (parquet files from the HF dataset repo) # --------------------------------------------------------------------------- DATA_DIR = Path(__file__).parent / "data" HF_DATASET = "cp500/auto-ontology" def _load_parquet(subdir: str, table: str) -> pd.DataFrame | None: """Load a parquet table — try local first, then HF hub.""" local = DATA_DIR / subdir / f"{table}.parquet" if local.exists(): return pd.read_parquet(local) try: return pd.read_parquet(f"hf://datasets/{HF_DATASET}/data/{subdir}/{table}.parquet") except Exception: return None # Lazy-load dataframes _cache: dict[str, pd.DataFrame | None] = {} def get_df(subdir: str, table: str) -> pd.DataFrame: key = f"{subdir}/{table}" if key not in _cache: _cache[key] = _load_parquet(subdir, table) df = _cache[key] if df is None: raise ValueError(f"Table {key} not available") return df # --------------------------------------------------------------------------- # Strands Agent tools — query the parquet dataset # --------------------------------------------------------------------------- from strands import Agent, tool from strands.models.openai import OpenAIModel @tool def search_products(make: str = "", model: str = "", year: int = 0) -> str: """Search the product index for vehicles by make, model, and/or year. Returns matching vehicles with their IDs, make, model, year, and body class.""" df = get_df("hypergraph", "product_index") mask = pd.Series(True, index=df.index) if make: mask &= df["make"].str.contains(make, case=False, na=False) if model: mask &= df["model"].str.contains(model, case=False, na=False) if year: mask &= df["model_year"] == year results = df[mask].head(20) if results.empty: return "No products found matching the criteria." return results.to_markdown(index=False) @tool def browse_signals(domain: str = "", sentiment: str = "", keyword: str = "") -> str: """Browse market signals. Filter by L1 domain code (P/T/C/F/S/R/M/ST), sentiment (bullish/bearish/neutral/mixed), or keyword in signal name. Returns up to 15 matching signals.""" si = get_df("hypergraph", "signal_index") nodes = get_df("hypergraph", "nodes") # Merge to get signal names signals = si.merge(nodes[nodes["node_type"] == "Signal"][["id", "name"]], left_on="signal_id", right_on="id", how="left") mask = pd.Series(True, index=signals.index) if domain: mask &= signals["domain"].str.upper() == domain.upper() if sentiment: mask &= signals["sentiment"].str.lower() == sentiment.lower() if keyword: mask &= signals["name"].str.contains(keyword, case=False, na=False) results = signals[mask][["signal_id", "name", "domain", "subdomain", "sentiment", "impact", "timestamp"]].head(15) if results.empty: return "No signals found matching the criteria." return results.to_markdown(index=False) @tool def get_competitors(product_id: str) -> str: """Get vehicles that compete with a given product. Takes a product_id like 'prd_tesla_model_y_2024' and returns competing vehicles.""" edges = get_df("hypergraph", "edges") products = get_df("hypergraph", "product_index") # Find COMPETES_WITH edges in both directions compete = edges[edges["role"] == "COMPETES_WITH"] as_source = compete[compete["source_id"] == product_id]["target_id"] as_target = compete[compete["target_id"] == product_id]["source_id"] competitor_ids = pd.concat([as_source, as_target]).unique() if len(competitor_ids) == 0: return f"No competitors found for {product_id}." results = products[products["product_id"].isin(competitor_ids)] return f"Competitors of {product_id}:\n\n{results.to_markdown(index=False)}" @tool def graph_stats() -> str: """Get summary statistics of the auto-ontology hypergraph — node counts by type, edge counts by role, signal domain distribution, etc.""" nodes = get_df("hypergraph", "nodes") edges = get_df("hypergraph", "edges") si = get_df("hypergraph", "signal_index") node_counts = nodes["node_type"].value_counts().to_dict() edge_counts = edges["role"].value_counts().to_dict() domain_counts = si["domain"].value_counts().to_dict() sentiment_counts = si["sentiment"].value_counts().to_dict() lines = [ "## Hypergraph Statistics\n", f"**Total nodes:** {len(nodes):,}", f"**Total edges:** {len(edges):,}\n", "### Node Types", ] for t, c in sorted(node_counts.items(), key=lambda x: -x[1]): lines.append(f"- {t}: {c:,}") lines.append("\n### Edge Roles") for r, c in sorted(edge_counts.items(), key=lambda x: -x[1]): lines.append(f"- {r}: {c:,}") lines.append("\n### Signal Domains (L1)") domain_names = { "P": "Product", "C": "Competitive", "T": "Technology", "M": "Market", "F": "Financial", "S": "Supply Chain", "R": "Regulatory", "ST": "Strategic", } for d, c in sorted(domain_counts.items(), key=lambda x: -x[1]): lines.append(f"- {d} ({domain_names.get(d, d)}): {c:,}") lines.append("\n### Signal Sentiment") for s, c in sorted(sentiment_counts.items(), key=lambda x: -x[1]): lines.append(f"- {s}: {c:,}") return "\n".join(lines) # --------------------------------------------------------------------------- # Build Strands Agent # --------------------------------------------------------------------------- SYSTEM_PROMPT = """\ You are an automotive market intelligence analyst with access to the Auto-Ontology \ hypergraph — 176K nodes and 537K edges connecting 94,671 market signals to 1,261 vehicles. The data was extracted from Common Crawl and resolved against the NHTSA vPIC registry. Use your tools to search products, browse signals, find competitors, and get graph stats. \ When answering, cite specific data from the tools. Be concise and analytical. Signal domains: P (Product), T (Technology), C (Competitive), F (Financial), \ S (Supply Chain), R (Regulatory), M (Market), ST (Strategic). Sentiments: bullish, bearish, neutral, mixed. """ def _build_agent(): """Build the Strands agent with HF Inference API.""" hf_token = os.environ.get("HF_TOKEN", "") model = OpenAIModel( client_args={ "base_url": "https://router.huggingface.co/v1/", "api_key": hf_token, }, model_id="Qwen/Qwen2.5-72B-Instruct", ) return Agent( model=model, tools=[search_products, browse_signals, get_competitors, graph_stats], system_prompt=SYSTEM_PROMPT, ) _agent = None def get_agent(): global _agent if _agent is None: _agent = _build_agent() return _agent # --------------------------------------------------------------------------- # Chat handler # --------------------------------------------------------------------------- def chat_fn(message: str, history: list[dict]) -> str: """Handle a chat message using the Strands agent.""" try: agent = get_agent() result = agent(message) return str(result) except Exception as e: return f"Error: {e}\n\nMake sure the HF_TOKEN secret is configured in Space settings." # --------------------------------------------------------------------------- # Pipeline narrative HTML # --------------------------------------------------------------------------- PIPELINE_HTML_PATH = Path(__file__).parent / "pipeline.html" def load_pipeline_html() -> str: if PIPELINE_HTML_PATH.exists(): html = PIPELINE_HTML_PATH.read_text() # Wrap in iframe for isolation import base64 encoded = base64.b64encode(html.encode()).decode() return f'' return "
Pipeline narrative not found. Check pipeline.html.
" # --------------------------------------------------------------------------- # Gradio App # --------------------------------------------------------------------------- DESCRIPTION = """\ # Auto-Ontology — Automotive Market Intelligence Explore a hypergraph of **94,671 market signals** connected to **1,261 vehicles** \ from the NHTSA vPIC registry. Built from Common Crawl data using an AWS pipeline \ with NuExtract structured extraction and vPIC entity resolution. """ with gr.Blocks( title="Auto-Ontology", theme=gr.themes.Base( primary_hue="indigo", secondary_hue="emerald", neutral_hue="slate", ), ) as demo: gr.Markdown(DESCRIPTION) with gr.Tabs(): with gr.Tab("The Pipeline"): gr.HTML(load_pipeline_html()) with gr.Tab("Ask the Ontology"): gr.Markdown( "Chat with a **Strands Agent** that can search products, " "browse market signals, find competitors, and query graph statistics." ) gr.ChatInterface( fn=chat_fn, type="messages", examples=[ "What are the graph statistics?", "Search for Tesla vehicles in the dataset", "Show me bearish signals in the technology domain", "What competes with the Tesla Model Y 2024?", "Find signals about battery technology", ], ) if __name__ == "__main__": demo.launch()