import ast import logging import re from typing import Dict, List, Optional, Tuple import gradio as gr import matplotlib.pyplot as plt import numpy as np import pandas as pd from datasets import load_dataset from sklearn.ensemble import RandomForestClassifier from sklearn.impute import SimpleImputer from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) APP_TITLE = "Noise Detection" APP_SUBTITLE = "Classify quantum circuits into clean, depolarizing, amplitude_damping, or hardware-aware noise conditions." REPO_CONFIG = { "clean": { "label": "clean", "repo": "QSBench/QSBench-Core-v1.0.0-demo", }, "depolarizing": { "label": "depolarizing", "repo": "QSBench/QSBench-Depolarizing-Demo-v1.0.0", }, "amplitude_damping": { "label": "amplitude_damping", "repo": "QSBench/QSBench-Amplitude-v1.0.0-demo", }, "hardware_aware": { "label": "hardware_aware", "repo": "QSBench/QSBench-Transpilation-v1.0.0-demo", }, } CLASS_ORDER = ["clean", "depolarizing", "amplitude_damping", "hardware_aware"] NON_FEATURE_COLS = { "sample_id", "sample_seed", "circuit_hash", "split", "circuit_qasm", "qasm_raw", "qasm_transpiled", "circuit_type_resolved", "circuit_type_requested", "noise_type", "noise_prob", "observable_bases", "observable_mode", "backend_device", "precision_mode", "circuit_signature", "entanglement", "meyer_wallach", "cx_count", "noise_label", } SOFT_EXCLUDE_PATTERNS = ["ideal_", "noisy_", "error_", "sign_ideal_", "sign_noisy_"] _ASSET_CACHE: Dict[str, pd.DataFrame] = {} _COMBINED_CACHE: Optional[pd.DataFrame] = None def safe_parse(value): """Safely parse stringified Python literals.""" if isinstance(value, str): try: return ast.literal_eval(value) except Exception: return value return value def adjacency_features(adj_value) -> Dict[str, float]: """Derive compact graph features from an adjacency matrix.""" parsed = safe_parse(adj_value) if not isinstance(parsed, list) or len(parsed) == 0: return { "adj_edge_count": np.nan, "adj_density": np.nan, "adj_degree_mean": np.nan, "adj_degree_std": np.nan, } try: arr = np.array(parsed, dtype=float) n = arr.shape[0] edge_count = float(np.triu(arr, k=1).sum()) possible_edges = float(n * (n - 1) / 2) density = edge_count / possible_edges if possible_edges > 0 else np.nan degrees = arr.sum(axis=1) return { "adj_edge_count": edge_count, "adj_density": density, "adj_degree_mean": float(np.mean(degrees)), "adj_degree_std": float(np.std(degrees)), } except Exception: return { "adj_edge_count": np.nan, "adj_density": np.nan, "adj_degree_mean": np.nan, "adj_degree_std": np.nan, } def qasm_features(qasm_value) -> Dict[str, float]: """Extract lightweight text statistics from QASM.""" if not isinstance(qasm_value, str) or not qasm_value.strip(): return { "qasm_length": np.nan, "qasm_line_count": np.nan, "qasm_gate_keyword_count": np.nan, "qasm_measure_count": np.nan, "qasm_comment_count": np.nan, } text = qasm_value lines = [line for line in text.splitlines() if line.strip()] gate_keywords = re.findall( r"\b(cx|h|x|y|z|rx|ry|rz|u1|u2|u3|u|swap|cz|ccx|rxx|ryy|rzz)\b", text, flags=re.IGNORECASE, ) measure_count = len(re.findall(r"\bmeasure\b", text, flags=re.IGNORECASE)) comment_count = sum(1 for line in lines if line.strip().startswith("//")) return { "qasm_length": float(len(text)), "qasm_line_count": float(len(lines)), "qasm_gate_keyword_count": float(len(gate_keywords)), "qasm_measure_count": float(measure_count), "qasm_comment_count": float(comment_count), } def enrich_dataframe(df: pd.DataFrame) -> pd.DataFrame: """Add derived numeric features for classification.""" df = df.copy() if "adjacency" in df.columns: adj_df = df["adjacency"].apply(adjacency_features).apply(pd.Series) df = pd.concat([df, adj_df], axis=1) qasm_source = "qasm_transpiled" if "qasm_transpiled" in df.columns else "qasm_raw" if qasm_source in df.columns: qasm_df = df[qasm_source].apply(qasm_features).apply(pd.Series) df = pd.concat([df, qasm_df], axis=1) return df def load_single_dataset(dataset_key: str) -> pd.DataFrame: """Load a single dataset shard from Hugging Face and cache it.""" if dataset_key not in _ASSET_CACHE: logger.info("Loading dataset: %s", dataset_key) ds = load_dataset(REPO_CONFIG[dataset_key]["repo"]) df = pd.DataFrame(ds["train"]) df = enrich_dataframe(df) df["noise_label"] = REPO_CONFIG[dataset_key]["label"] _ASSET_CACHE[dataset_key] = df return _ASSET_CACHE[dataset_key] def load_combined_dataset() -> pd.DataFrame: """Load and merge all noise-condition datasets.""" global _COMBINED_CACHE if _COMBINED_CACHE is None: frames = [load_single_dataset(key) for key in REPO_CONFIG.keys()] combined = pd.concat(frames, ignore_index=True) combined = combined[combined["noise_label"].isin(CLASS_ORDER)].copy() _COMBINED_CACHE = combined return _COMBINED_CACHE def load_guide_content() -> str: """Load the markdown guide if it exists.""" try: with open("GUIDE.md", "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: return "# Guide\n\nGuide file not found." def get_available_feature_columns(df: pd.DataFrame) -> List[str]: """Return numeric feature columns excluding metadata and the target.""" numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() features = [] for col in numeric_cols: if col in NON_FEATURE_COLS: continue if any(pattern in col for pattern in SOFT_EXCLUDE_PATTERNS): continue features.append(col) return sorted(features) def default_feature_selection(features: List[str]) -> List[str]: """Pick a stable set of default features.""" preferred = [ "gate_entropy", "adj_density", "adj_degree_mean", "adj_degree_std", "depth", "total_gates", "single_qubit_gates", "two_qubit_gates", "cx_count", "qasm_length", "qasm_line_count", "qasm_gate_keyword_count", ] selected = [feature for feature in preferred if feature in features] return selected[:8] if selected else features[:8] def make_classification_figure( y_true: np.ndarray, y_pred: np.ndarray, class_names: List[str], feature_names: Optional[List[str]] = None, importances: Optional[np.ndarray] = None, ) -> plt.Figure: """Create a compact classification summary figure.""" fig = plt.figure(figsize=(20, 6)) gs = fig.add_gridspec(1, 3) ax1 = fig.add_subplot(gs[0, 0]) ax2 = fig.add_subplot(gs[0, 1]) ax3 = fig.add_subplot(gs[0, 2]) cm = confusion_matrix(y_true, y_pred, labels=class_names) im = ax1.imshow(cm, interpolation="nearest") ax1.set_title("Confusion Matrix") ax1.set_xlabel("Predicted") ax1.set_ylabel("Actual") ax1.set_xticks(np.arange(len(class_names))) ax1.set_yticks(np.arange(len(class_names))) ax1.set_xticklabels(class_names, rotation=45, ha="right") ax1.set_yticklabels(class_names) for i in range(cm.shape[0]): for j in range(cm.shape[1]): ax1.text(j, i, cm[i, j], ha="center", va="center") fig.colorbar(im, ax=ax1, fraction=0.046, pad=0.04) residual_like = (y_true != y_pred).astype(int) ax2.hist(residual_like, bins=[-0.5, 0.5, 1.5]) ax2.set_title("Correct vs Incorrect") ax2.set_xlabel("0 = Correct, 1 = Incorrect") ax2.set_ylabel("Count") if importances is not None and feature_names is not None and len(importances) == len(feature_names): idx = np.argsort(importances)[-10:] ax3.barh([feature_names[i] for i in idx], importances[idx]) ax3.set_title("Top-10 Feature Importances") ax3.set_xlabel("Importance") else: ax3.text(0.5, 0.5, "Feature importances are unavailable.", ha="center", va="center") ax3.set_axis_off() fig.tight_layout() return fig def build_dataset_profile(df: pd.DataFrame) -> str: """Build a dataset summary for the explorer tab.""" return ( f"### Dataset profile\n\n" f"**Rows:** {len(df):,} \n" f"**Columns:** {len(df.columns):,} \n" f"**Classes:** {', '.join(CLASS_ORDER)}" ) def refresh_explorer(dataset_key: str, split_name: str) -> Tuple[gr.update, pd.DataFrame, str, str, str, str]: """Refresh the explorer view for the selected source dataset.""" df = load_single_dataset(dataset_key) splits = df["split"].dropna().unique().tolist() if "split" in df.columns else ["train"] if not splits: splits = ["train"] if split_name not in splits: split_name = splits[0] filtered = df[df["split"] == split_name] if "split" in df.columns else df display_df = filtered.head(12).copy() raw_qasm = display_df["qasm_raw"].iloc[0] if "qasm_raw" in display_df.columns and not display_df.empty else "// N/A" transpiled_qasm = display_df["qasm_transpiled"].iloc[0] if "qasm_transpiled" in display_df.columns and not display_df.empty else "// N/A" profile_box = build_dataset_profile(df) summary_box = ( f"### Split summary\n\n" f"**Dataset:** `{dataset_key}` \n" f"**Label:** `{REPO_CONFIG[dataset_key]['label']}` \n" f"**Available splits:** {', '.join(splits)} \n" f"**Preview rows:** {len(display_df)}" ) return ( gr.update(choices=splits, value=split_name), display_df, raw_qasm, transpiled_qasm, profile_box, summary_box, ) def sync_feature_picker(_dataset_key: str) -> gr.update: """Refresh the feature list from the combined dataset.""" df = load_combined_dataset() features = get_available_feature_columns(df) defaults = default_feature_selection(features) return gr.update(choices=features, value=defaults) def train_classifier( feature_columns: List[str], test_size: float, n_estimators: int, max_depth: float, random_state: float, ) -> Tuple[Optional[plt.Figure], str]: """Train a four-class classifier and return metrics plus a plot.""" if not feature_columns: return None, "### ❌ Please select at least one feature." df = load_combined_dataset() required_cols = feature_columns + ["noise_label"] train_df = df.dropna(subset=required_cols).copy() train_df = train_df[train_df["noise_label"].isin(CLASS_ORDER)] if len(train_df) < 20: return None, "### ❌ Not enough clean rows after filtering missing values." X = train_df[feature_columns] y = train_df["noise_label"] seed = int(random_state) depth = int(max_depth) if max_depth and int(max_depth) > 0 else None trees = int(n_estimators) try: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, random_state=seed, stratify=y, ) except ValueError: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, random_state=seed, ) model = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ( "classifier", RandomForestClassifier( n_estimators=trees, max_depth=depth, random_state=seed, n_jobs=-1, ), ), ] ) model.fit(X_train, y_train) y_pred = model.predict(X_test) accuracy = float(accuracy_score(y_test, y_pred)) macro_f1 = float(f1_score(y_test, y_pred, average="macro")) weighted_f1 = float(f1_score(y_test, y_pred, average="weighted")) classifier = model.named_steps["classifier"] importances = getattr(classifier, "feature_importances_", None) fig = make_classification_figure(y_test.to_numpy(), y_pred, CLASS_ORDER, list(feature_columns), importances) report = classification_report(y_test, y_pred, labels=CLASS_ORDER, output_dict=False, zero_division=0) results = ( "### Classification results\n\n" f"**Rows used:** {len(train_df):,} \n" f"**Test size:** {test_size:.0%} \n" f"**Accuracy:** {accuracy:.4f} \n" f"**Macro F1:** {macro_f1:.4f} \n" f"**Weighted F1:** {weighted_f1:.4f}\n\n" "```text\n" f"{report}" "```" ) return fig, results CUSTOM_CSS = """ .gradio-container { max-width: 1400px !important; } footer { margin-top: 1rem; } """ with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown(f"# 🌌 {APP_TITLE}") gr.Markdown(APP_SUBTITLE) with gr.Tabs(): with gr.TabItem("🔎 Explorer"): dataset_dropdown = gr.Dropdown( list(REPO_CONFIG.keys()), value="clean", label="Dataset", ) split_dropdown = gr.Dropdown( ["train"], value="train", label="Split", ) profile_box = gr.Markdown(value="### Loading dataset...") summary_box = gr.Markdown(value="### Loading split summary...") explorer_df = gr.Dataframe(label="Preview", interactive=False) with gr.Row(): raw_qasm = gr.Code(label="Raw QASM", language=None) transpiled_qasm = gr.Code(label="Transpiled QASM", language=None) with gr.TabItem("🧠 Classification"): feature_picker = gr.CheckboxGroup(label="Input features", choices=[]) test_size = gr.Slider(0.1, 0.4, value=0.2, step=0.05, label="Test split") n_estimators = gr.Slider(50, 400, value=200, step=10, label="Trees") max_depth = gr.Slider(1, 30, value=12, step=1, label="Max depth") seed = gr.Number(value=42, precision=0, label="Random seed") run_btn = gr.Button("Train & Evaluate", variant="primary") plot = gr.Plot() metrics = gr.Markdown() with gr.TabItem("📖 Guide"): gr.Markdown(load_guide_content()) gr.Markdown("---") gr.Markdown( "### 🔗 Links\n" "[Website](https://qsbench.github.io) | " "[Hugging Face](https://huggingface.co/QSBench) | " "[GitHub](https://github.com/QSBench)" ) dataset_dropdown.change( refresh_explorer, [dataset_dropdown, split_dropdown], [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], ) split_dropdown.change( refresh_explorer, [dataset_dropdown, split_dropdown], [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], ) dataset_dropdown.change(sync_feature_picker, [dataset_dropdown], [feature_picker]) run_btn.click( train_classifier, [feature_picker, test_size, n_estimators, max_depth, seed], [plot, metrics], ) demo.load( refresh_explorer, [dataset_dropdown, split_dropdown], [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], ) demo.load(sync_feature_picker, [dataset_dropdown], [feature_picker]) if __name__ == "__main__": demo.launch(theme=gr.themes.Soft(), css=CUSTOM_CSS)