Food Desert
Switch Stage3 to explicit-only no-why selection, drop bear probe, and set k=1 defaults
06a3c46
Raw
History Blame
23.3 kB
import gradio as gr
import os
import logging
import time
import json
from datetime import datetime
from PIL import Image
from pathlib import Path
from typing import List
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
from psq_rag.pipeline.preproc import extract_user_provided_tags_upto_3_words
from psq_rag.llm.rewrite import llm_rewrite_prompt
from psq_rag.retrieval.psq_retrieval import psq_candidates_from_rewrite_phrases, _norm_tag_for_lookup
from psq_rag.llm.select import llm_select_indices, llm_infer_structural_tags, llm_infer_probe_tags
from psq_rag.retrieval.state import expand_tags_via_implications
from psq_rag.ui.group_ranked_display import render_group_rankings_markdown
def _split_prompt_commas(s: str) -> List[str]:
return [p.strip() for p in (s or "").split(",") if p.strip()]
def _norm_for_dedupe(tag: str) -> str:
# your canonical form for lookup/dedupe
return _norm_tag_for_lookup(tag.lower())
def compose_final_prompt(rewritten_prompt: str, selected_tags: List[str]) -> str:
parts = _split_prompt_commas(rewritten_prompt)
parts.extend(selected_tags)
seen = set()
out = []
for p in parts:
key = _norm_for_dedupe(p)
if key in seen:
continue
seen.add(key)
out.append(p)
return ", ".join(out)
def _build_selection_query(
prompt_in: str,
rewritten: str,
structural_tags: List[str],
probe_tags: List[str],
) -> str:
lines = [f"IMAGE DESCRIPTION: {prompt_in.strip()}"]
if rewritten and rewritten.strip():
lines.append(f"REWRITE PHRASES: {rewritten.strip()}")
hint_tags = []
if structural_tags:
hint_tags.extend(structural_tags)
if probe_tags:
hint_tags.extend(probe_tags)
if hint_tags:
# Keep hints as context only; selection still must choose by candidate indices.
lines.append(
"INFERRED TAG HINTS (context only): " + ", ".join(sorted(set(hint_tags)))
)
return "\n".join(lines)
# Set up logging
# Minimal prod logging: warnings+ to stderr, no file by default
import os, logging
LOG_LEVEL = os.environ.get("PSQ_LOG_LEVEL", "WARNING").upper()
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.WARNING),
format="%(asctime)s %(levelname)s:%(message)s",
handlers=[logging.StreamHandler()] # no file -> avoids huge logs on Spaces
)
# Quiet down common noisy libs (optional)
for _name in ("gensim", "gradio", "hnswlib", "httpx", "uvicorn"):
logging.getLogger(_name).setLevel(logging.ERROR)
# Turn off Gradio analytics phone-home to avoid those background thread errors (optional)
os.environ["GRADIO_ANALYTICS_ENABLED"] = "0"
MASCOT_DIR = Path(__file__).parent / "mascotimages"
MASCOT_FILE = MASCOT_DIR / "transparentsquirrel.png"
def _load_mascot_image():
"""Load mascot image if available; return None when missing/unreadable."""
if not MASCOT_FILE.exists():
logging.warning("Mascot image missing: %s", MASCOT_FILE)
return None
try:
return Image.open(MASCOT_FILE).convert("RGBA")
except Exception as e:
logging.warning("Failed to load mascot image (%s): %s", MASCOT_FILE, e)
return None
try:
from gradio_client import utils as _gc_utils
_orig_get_type = _gc_utils.get_type
_orig_j2p = _gc_utils._json_schema_to_python_type
_orig_pub = _gc_utils.json_schema_to_python_type
def _get_type_safe(schema):
# Sometimes schema is a bare True/False (JSON Schema boolean form)
if not isinstance(schema, dict):
return "any"
return _orig_get_type(schema)
def _j2p_safe(schema, defs=None):
# Accept non-dict schemas (True/False/None) and treat as "any"
if not isinstance(schema, dict):
return "any"
return _orig_j2p(schema, defs or schema.get("$defs"))
def _pub_safe(schema):
# Public wrapper used by Gradio; keep it resilient too
if not isinstance(schema, dict):
return "any"
return _j2p_safe(schema, schema.get("$defs"))
_gc_utils.get_type = _get_type_safe
_gc_utils._json_schema_to_python_type = _j2p_safe
_gc_utils.json_schema_to_python_type = _pub_safe
except Exception as e:
print("gradio_client hotfix not applied:", e)
# -------------------------------------------------------------------------------
allow_nsfw_tags = False
def _is_production_runtime() -> bool:
"""Best-effort detection for deployed runtime (HF Spaces or explicit env)."""
if os.environ.get("PSQ_PRODUCTION", "").strip().lower() in {"1", "true", "yes"}:
return True
if os.environ.get("SPACE_ID"):
return True
if os.environ.get("HF_SPACE_ID"):
return True
if os.environ.get("SYSTEM") == "spaces":
return True
return False
verbose_retrieval_default = "0" if _is_production_runtime() else "1"
verbose_retrieval = os.environ.get("PSQ_VERBOSE_RETRIEVAL", verbose_retrieval_default).strip().lower() in {"1", "true", "yes"}
verbose_retrieval_all = False
verbose_retrieval_limit = 20
enable_probe_tags = os.environ.get("PSQ_ENABLE_PROBE", "1").strip() not in {"0", "false", "False"}
display_top_groups_default = int(os.environ.get("PSQ_DISPLAY_TOP_GROUPS", "10"))
display_top_tags_per_group_default = int(os.environ.get("PSQ_DISPLAY_TOP_TAGS_PER_GROUP", "5"))
display_rank_top_k_default = int(os.environ.get("PSQ_DISPLAY_GROUP_RANK_TOP_K", "5"))
retrieval_global_k = int(os.environ.get("PSQ_RETRIEVAL_GLOBAL_K", "300"))
retrieval_per_phrase_k = int(os.environ.get("PSQ_RETRIEVAL_PER_PHRASE_K", "10"))
retrieval_per_phrase_final_k = int(os.environ.get("PSQ_RETRIEVAL_PER_PHRASE_FINAL_K", "1"))
selection_mode = os.environ.get("PSQ_SELECTION_MODE", "chunked_map_union").strip()
selection_chunk_size = int(os.environ.get("PSQ_SELECTION_CHUNK_SIZE", "60"))
selection_per_phrase_k = int(os.environ.get("PSQ_SELECTION_PER_PHRASE_K", "2"))
selection_candidate_cap = int(os.environ.get("PSQ_SELECTION_CANDIDATE_CAP", "0"))
stage1_rewrite_timeout_s = float(os.environ.get("PSQ_TIMEOUT_REWRITE_S", "45"))
stage1_struct_timeout_s = float(os.environ.get("PSQ_TIMEOUT_STRUCT_S", "45"))
stage1_probe_timeout_s = float(os.environ.get("PSQ_TIMEOUT_PROBE_S", "45"))
stage3_select_timeout_s = float(os.environ.get("PSQ_TIMEOUT_SELECT_S", "45"))
timing_log_path = Path(os.environ.get("PSQ_TIMING_LOG_PATH", "data/runtime_metrics/ui_pipeline_timings.jsonl"))
css = """
.scrollable-content{
max-height: 420px;
overflow-y: scroll; /* always show scrollbar */
overflow-x: hidden;
padding-right: 8px;
padding-bottom: 14px; /* <— add this */
scrollbar-gutter: stable; /* prevent layout shift as it fills */
/* Firefox */
scrollbar-width: auto;
scrollbar-color: rgba(180,180,180,.9) rgba(0,0,0,.15);
}
/* WebKit/Chromium (Chrome/Edge/Safari) */
.scrollable-content::-webkit-scrollbar{ width: 10px; }
.scrollable-content::-webkit-scrollbar-thumb{ background: rgba(180,180,180,.9); border-radius: 8px; }
.scrollable-content::-webkit-scrollbar-track{ background: rgba(0,0,0,.15); }
/* (Optional) make both scroll panes taller so they fill more of the column */
.pane-left .scrollable-content,
.pane-right .scrollable-content {
max-height: 610px; /* was 420px; tweak to taste */
}
"""
def rag_pipeline_ui(
user_prompt: str,
display_top_groups: float,
display_top_tags_per_group: float,
display_rank_top_k: float,
):
logs = []
def log(s): logs.append(s)
try:
stage_timings = {}
def _record_timing(stage: str, dt_s: float):
stage_timings[stage] = float(dt_s)
def _emit_timing_summary(total_s: float):
summary_order = [
"preprocess",
"rewrite",
"structural",
"probe",
"retrieval",
"selection",
"implication_expansion",
"prompt_composition",
"group_display",
]
lines = []
for k in summary_order:
if k in stage_timings:
lines.append(f"{k}={stage_timings[k]:.2f}s")
slowest = max(stage_timings.items(), key=lambda kv: kv[1])[0] if stage_timings else "n/a"
log("Timing Summary: " + ", ".join(lines))
log(f"Timing Slowest Stage: {slowest}")
log(f"Timing Total: {total_s:.2f}s")
def _append_timing_jsonl(total_s: float):
try:
timing_log_path.parent.mkdir(parents=True, exist_ok=True)
rec = {
"timestamp_utc": datetime.utcnow().isoformat(timespec="seconds") + "Z",
"stages_s": stage_timings,
"total_s": float(total_s),
"config": {
"timeout_rewrite_s": stage1_rewrite_timeout_s,
"timeout_struct_s": stage1_struct_timeout_s,
"timeout_probe_s": stage1_probe_timeout_s,
"timeout_select_s": stage3_select_timeout_s,
},
}
with timing_log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=True) + "\n")
log(f"Timing Log: wrote {timing_log_path}")
except Exception as e:
log(f"Timing Log: failed ({type(e).__name__}: {e})")
def _future_with_timeout(fut, timeout_s: float, stage_name: str, fallback):
t0 = time.perf_counter()
try:
out = fut.result(timeout=max(1.0, float(timeout_s)))
dt = time.perf_counter() - t0
log(f"{stage_name}: {dt:.2f}s")
stage_key = {
"Rewrite": "rewrite",
"Structural inference": "structural",
"Probe inference": "probe",
"Index selection": "selection",
}.get(stage_name)
if stage_key:
_record_timing(stage_key, dt)
return out
except FutureTimeoutError:
fut.cancel()
log(f"{stage_name}: timed out after {timeout_s:.0f}s; using fallback")
return fallback
except Exception as e:
log(f"{stage_name}: failed ({type(e).__name__}: {e}); using fallback")
return fallback
t_total0 = time.perf_counter()
log("Start: received prompt")
prompt_in = (user_prompt or "").strip()
if not prompt_in:
return "Error: empty prompt", "", ""
log("Input:")
log(prompt_in)
log("")
log(
"Runtime config: "
f"retrieval_global_k={retrieval_global_k} "
f"retrieval_per_phrase_k={retrieval_per_phrase_k} "
f"retrieval_per_phrase_final_k={retrieval_per_phrase_final_k} "
f"selection_mode={selection_mode} "
f"selection_chunk_size={selection_chunk_size} "
f"selection_per_phrase_k={selection_per_phrase_k}"
)
log("")
t0 = time.perf_counter()
user_tags = extract_user_provided_tags_upto_3_words(prompt_in)
dt = time.perf_counter()-t0
_record_timing("preprocess", dt)
log(f"Preprocess (user tag extraction): {dt:.2f}s")
log("Heuristically extracted user tags:")
if user_tags:
log(", ".join(user_tags))
else:
log("(none)")
log("")
log("Step 1: LLM rewrite + structural inference + probe (concurrent)")
max_workers = 3 if enable_probe_tags else 2
with ThreadPoolExecutor(max_workers=max_workers) as ex:
fut_rewrite = ex.submit(llm_rewrite_prompt, prompt_in, log)
fut_struct = ex.submit(llm_infer_structural_tags, prompt_in, log=log)
fut_probe = ex.submit(llm_infer_probe_tags, prompt_in, log=log) if enable_probe_tags else None
rewritten = _future_with_timeout(
fut_rewrite, stage1_rewrite_timeout_s, "Rewrite", prompt_in
)
structural_tags = _future_with_timeout(
fut_struct, stage1_struct_timeout_s, "Structural inference", []
)
probe_tags = (
_future_with_timeout(fut_probe, stage1_probe_timeout_s, "Probe inference", [])
if fut_probe else []
)
log("Rewrite:")
log(rewritten if rewritten else "(empty)")
log("")
rewrite_for_retrieval = rewritten
if user_tags:
# keep them separate in logs, but allow them to help retrieval
rewrite_for_retrieval = (rewrite_for_retrieval + ", " + ", ".join(user_tags)).strip(", ").strip()
log("Step 2: Prompt Squirrel retrieval (hidden)")
try:
t0 = time.perf_counter()
retrieval_context_tags = list(dict.fromkeys((structural_tags or []) + (probe_tags or [])))
rewrite_phrases = [p.strip() for p in (rewrite_for_retrieval or "").split(",") if p.strip()]
retrieval_result = psq_candidates_from_rewrite_phrases(
rewrite_phrases=rewrite_phrases,
allow_nsfw_tags=allow_nsfw_tags,
context_tags=retrieval_context_tags,
global_k=max(1, retrieval_global_k),
per_phrase_k=max(1, retrieval_per_phrase_k),
per_phrase_final_k=max(1, retrieval_per_phrase_final_k),
verbose=verbose_retrieval,
)
if isinstance(retrieval_result, tuple):
candidates, phrase_reports = retrieval_result
else:
candidates, phrase_reports = retrieval_result, []
if selection_candidate_cap > 0 and len(candidates) > selection_candidate_cap:
candidates = candidates[:selection_candidate_cap]
log(f"Selection candidate cap applied: {selection_candidate_cap}")
dt = time.perf_counter()-t0
_record_timing("retrieval", dt)
log(f"Retrieval: {dt:.2f}s")
log(f"Retrieved {len(candidates)} candidate tags")
if verbose_retrieval:
log(f"Total unique candidates: {len(candidates)}")
limit = None if verbose_retrieval_all else max(1, int(verbose_retrieval_limit))
for report in phrase_reports:
phrase = report.get("normalized") or report.get("phrase") or ""
lookup = report.get("lookup") or ""
tfidf_vocab = report.get("tfidf_vocab")
log(f"Phrase: {phrase} (lookup={lookup}) tfidf_vocab={tfidf_vocab}")
rows = report.get("candidates", [])
shown = rows if limit is None else rows[:limit]
for row in shown:
tag = row.get("tag")
alias_token = row.get("alias_token")
score_fasttext = row.get("score_fasttext")
score_context = row.get("score_context")
score_combined = row.get("score_combined")
count = row.get("count")
alias_part = ""
if alias_token and alias_token != tag:
alias_part = f" [alias_token={alias_token}]"
fasttext_str = (
f"{score_fasttext:.3f}" if isinstance(score_fasttext, (int, float)) else score_fasttext
)
if score_context is None:
context_str = "None"
else:
context_str = (
f"{score_context:.3f}" if isinstance(score_context, (int, float)) else score_context
)
combined_str = (
f"{score_combined:.3f}" if isinstance(score_combined, (int, float)) else score_combined
)
log(
f" {tag}{alias_part} | fasttext={fasttext_str} context={context_str} "
f"combined={combined_str} count={count}"
)
if limit is not None and len(rows) > limit:
log(f" ... ({len(rows) - limit} more)")
except Exception as e:
log(f"Retrieval fallback: {type(e).__name__}: {e}")
candidates = []
log("Step 3: LLM index selection (uses rewrite + structural/probe context)")
selection_query = _build_selection_query(
prompt_in=prompt_in,
rewritten=rewritten,
structural_tags=structural_tags,
probe_tags=probe_tags,
)
with ThreadPoolExecutor(max_workers=1) as ex:
fut_sel = ex.submit(
llm_select_indices,
query_text=selection_query,
candidates=candidates,
max_pick=0,
log=log,
mode=selection_mode,
chunk_size=max(1, selection_chunk_size),
per_phrase_k=max(1, selection_per_phrase_k),
)
picked_indices = _future_with_timeout(
fut_sel, stage3_select_timeout_s, "Index selection", []
)
selected_tags = [candidates[i].tag for i in picked_indices] if picked_indices else []
if structural_tags:
# Add structural tags that aren't already selected
existing = {t for t in selected_tags}
new_structural = [t for t in structural_tags if t not in existing]
selected_tags.extend(new_structural)
log(f" Added {len(new_structural)} structural tags: {', '.join(new_structural)}")
else:
log(" No structural tags inferred")
if probe_tags:
existing = {t for t in selected_tags}
new_probe = [t for t in probe_tags if t not in existing]
selected_tags.extend(new_probe)
log(f" Added {len(new_probe)} probe tags: {', '.join(new_probe)}")
elif enable_probe_tags:
log(" No probe tags inferred")
log("Step 3c: Expand via tag implications")
t0 = time.perf_counter()
tag_set = set(selected_tags)
expanded, implied_only = expand_tags_via_implications(tag_set)
dt = time.perf_counter()-t0
_record_timing("implication_expansion", dt)
log(f"Implication expansion: {dt:.2f}s")
if implied_only:
selected_tags.extend(sorted(implied_only))
log(f" Added {len(implied_only)} implied tags: {', '.join(sorted(implied_only))}")
else:
log(" No additional implied tags")
log("Step 4: Compose final prompt")
t0 = time.perf_counter()
final_prompt = compose_final_prompt(rewritten, selected_tags)
dt = time.perf_counter()-t0
_record_timing("prompt_composition", dt)
log(f"Prompt composition: {dt:.2f}s")
log("Step 5: Build ranked group/category display")
t0 = time.perf_counter()
seed_terms = []
seed_terms.extend(user_tags)
seed_terms.extend([p.strip() for p in (rewritten or "").split(",") if p.strip()])
seed_terms.extend(structural_tags or [])
seed_terms.extend(probe_tags or [])
seed_terms.extend(selected_tags)
seed_terms = list(dict.fromkeys(seed_terms))
groups_md = render_group_rankings_markdown(
seed_terms=seed_terms,
top_groups=max(1, int(display_top_groups)),
top_tags_per_group=max(1, int(display_top_tags_per_group)),
group_rank_top_k=max(1, int(display_rank_top_k)),
)
dt = time.perf_counter()-t0
_record_timing("group_display", dt)
log(f"Ranked group display: {dt:.2f}s")
total_dt = time.perf_counter()-t_total0
_emit_timing_summary(total_dt)
_append_timing_jsonl(total_dt)
log("Done: final prompt ready")
return "\n".join(logs), final_prompt, groups_md
except Exception as e:
log(f"Error: {type(e).__name__}: {e}")
return "\n".join(logs), "", ""
with gr.Blocks(css=css) as app:
with gr.Row():
with gr.Column(scale=3, elem_classes=["prompt-col"]):
image_tags = gr.Textbox(
label="Enter Prompt",
placeholder="e.g. fox, outside, detailed background, .",
lines=1
)
with gr.Column(scale=1):
_mascot_pil = _load_mascot_image()
if _mascot_pil is not None:
mascot_img = gr.Image(
value=_mascot_pil,
show_label=False,
interactive=False,
height=220,
elem_id="mascot"
)
else:
mascot_img = gr.Markdown("`(mascot image unavailable)`")
submit_button = gr.Button("Run", variant="primary")
gr.Markdown(
"""
### Prompt Squirrel RAG (pipeline version)
Type a rough prompt. This tool rewrites it and aligns it to an e621-style tag vocabulary using Prompt Squirrel internally,
then returns a cleaned, model-friendly prompt.
""".strip()
)
console = gr.Textbox(
label="Console",
lines=10,
interactive=False,
placeholder="Progress logs will appear here."
)
final_prompt = gr.Textbox(
label="Final Prompt",
lines=3,
interactive=False,
placeholder="Your optimized prompt will appear here."
)
with gr.Accordion("Display Settings", open=False):
with gr.Row():
display_top_groups = gr.Number(
value=display_top_groups_default,
precision=0,
label="Rows (Top Groups/Categories)",
minimum=1,
)
display_top_tags_per_group = gr.Number(
value=display_top_tags_per_group_default,
precision=0,
label="Top Tags Shown Per Row",
minimum=1,
)
display_rank_top_k = gr.Number(
value=display_rank_top_k_default,
precision=0,
label="Top Tags Used for Row Ranking",
minimum=1,
)
group_rankings_md = gr.Markdown(
label="Ranked Group/Category Tag Suggestions",
value="",
)
submit_button.click(
rag_pipeline_ui,
inputs=[image_tags, display_top_groups, display_top_tags_per_group, display_rank_top_k],
outputs=[console, final_prompt, group_rankings_md]
)
image_tags.submit(
rag_pipeline_ui,
inputs=[image_tags, display_top_groups, display_top_tags_per_group, display_rank_top_k],
outputs=[console, final_prompt, group_rankings_md]
)
if __name__ == "__main__":
app.queue().launch(allowed_paths=[str(MASCOT_DIR)])