File size: 11,486 Bytes
c6be992
 
 
 
 
 
 
 
 
 
a16e111
eeada1d
c6be992
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a16e111
 
 
 
 
 
 
 
 
 
 
 
eeada1d
 
 
 
 
 
 
 
c6be992
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
import gradio as gr
import os
import logging
from PIL import Image
from pathlib import Path
from typing import List

from psq_rag.pipeline.preproc import extract_user_provided_tags_upto_3_words
from psq_rag.llm.rewrite import llm_rewrite_prompt
from psq_rag.retrieval.psq_retrieval import psq_candidates_from_rewrite_phrases, _norm_tag_for_lookup
from psq_rag.llm.select import llm_select_indices, llm_infer_structural_tags
from psq_rag.retrieval.state import expand_tags_via_implications


def _split_prompt_commas(s: str) -> List[str]:
    return [p.strip() for p in (s or "").split(",") if p.strip()]

def _norm_for_dedupe(tag: str) -> str:
    # your canonical form for lookup/dedupe
    return _norm_tag_for_lookup(tag.lower())

def compose_final_prompt(rewritten_prompt: str, selected_tags: List[str]) -> str:
    parts = _split_prompt_commas(rewritten_prompt)
    parts.extend(selected_tags)

    seen = set()
    out = []
    for p in parts:
        key = _norm_for_dedupe(p)
        if key in seen:
            continue
        seen.add(key)
        out.append(p)

    return ", ".join(out)


# Set up logging
# Minimal prod logging: warnings+ to stderr, no file by default
import os, logging

LOG_LEVEL = os.environ.get("PSQ_LOG_LEVEL", "WARNING").upper()
logging.basicConfig(
    level=getattr(logging, LOG_LEVEL, logging.WARNING),
    format="%(asctime)s %(levelname)s:%(message)s",
    handlers=[logging.StreamHandler()]  # no file -> avoids huge logs on Spaces
)

# Quiet down common noisy libs (optional)
for _name in ("gensim", "gradio", "hnswlib", "httpx", "uvicorn"):
    logging.getLogger(_name).setLevel(logging.ERROR)

# Turn off Gradio analytics phone-home to avoid those background thread errors (optional)
os.environ["GRADIO_ANALYTICS_ENABLED"] = "0"


MASCOT_DIR = Path(__file__).parent / "mascotimages"
MASCOT_FILE = MASCOT_DIR / "transparentsquirrel.png"

try:
    from gradio_client import utils as _gc_utils

    _orig_get_type = _gc_utils.get_type
    _orig_j2p = _gc_utils._json_schema_to_python_type
    _orig_pub = _gc_utils.json_schema_to_python_type

    def _get_type_safe(schema):
        # Sometimes schema is a bare True/False (JSON Schema boolean form)
        if not isinstance(schema, dict):
            return "any"
        return _orig_get_type(schema)

    def _j2p_safe(schema, defs=None):
        # Accept non-dict schemas (True/False/None) and treat as "any"
        if not isinstance(schema, dict):
            return "any"
        return _orig_j2p(schema, defs or schema.get("$defs"))

    def _pub_safe(schema):
        # Public wrapper used by Gradio; keep it resilient too
        if not isinstance(schema, dict):
            return "any"
        return _j2p_safe(schema, schema.get("$defs"))

    _gc_utils.get_type = _get_type_safe
    _gc_utils._json_schema_to_python_type = _j2p_safe
    _gc_utils.json_schema_to_python_type = _pub_safe

except Exception as e:
    print("gradio_client hotfix not applied:", e)
# -------------------------------------------------------------------------------


allow_nsfw_tags = False
verbose_retrieval = True
verbose_retrieval_all = False
verbose_retrieval_limit = 20

css = """
.scrollable-content{
  max-height: 420px;
  overflow-y: scroll;          /* always show scrollbar */
  overflow-x: hidden;
  padding-right: 8px;
  padding-bottom: 14px;   /* <— add this */
  scrollbar-gutter: stable;    /* prevent layout shift as it fills */

  /* Firefox */
  scrollbar-width: auto;                          
  scrollbar-color: rgba(180,180,180,.9) rgba(0,0,0,.15);
}

/* WebKit/Chromium (Chrome/Edge/Safari) */
.scrollable-content::-webkit-scrollbar{ width: 10px; }
.scrollable-content::-webkit-scrollbar-thumb{ background: rgba(180,180,180,.9); border-radius: 8px; }
.scrollable-content::-webkit-scrollbar-track{ background: rgba(0,0,0,.15); }

/* (Optional) make both scroll panes taller so they fill more of the column */
.pane-left  .scrollable-content,
.pane-right .scrollable-content {
  max-height: 610px;                /* was 420px; tweak to taste */
}
"""


def rag_pipeline_ui(user_prompt: str):
    logs = []
    def log(s): logs.append(s)

    try:
        log("Start: received prompt")
        prompt_in = (user_prompt or "").strip()
        if not prompt_in:
            return "Error: empty prompt", ""
            
        log("Input:")
        log(prompt_in)
        log("")

        user_tags = extract_user_provided_tags_upto_3_words(prompt_in)
        log("Heuristically extracted user tags:")
        if user_tags:
            log(", ".join(user_tags))
        else:
            log("(none)")
        log("")

        log("Step 1: LLM rewrite")
        rewritten = llm_rewrite_prompt(prompt_in, log)
        log("Rewrite:")
        log(rewritten if rewritten else "(empty)")
        log("")

        rewrite_for_retrieval = rewritten
        if user_tags:
            # keep them separate in logs, but allow them to help retrieval
            rewrite_for_retrieval = (rewrite_for_retrieval + ", " + ", ".join(user_tags)).strip(", ").strip()


        log("Step 2: Prompt Squirrel retrieval (hidden)")
        try:
            rewrite_phrases = [p.strip() for p in (rewrite_for_retrieval or "").split(",") if p.strip()]
            retrieval_result = psq_candidates_from_rewrite_phrases(
                rewrite_phrases=rewrite_phrases,
                allow_nsfw_tags=allow_nsfw_tags,
                global_k=300,
                verbose=verbose_retrieval,
            )
            if isinstance(retrieval_result, tuple):
                candidates, phrase_reports = retrieval_result
            else:
                candidates, phrase_reports = retrieval_result, []
            log(f"Retrieved {len(candidates)} candidate tags")
            if verbose_retrieval:
                log(f"Total unique candidates: {len(candidates)}")
                limit = None if verbose_retrieval_all else max(1, int(verbose_retrieval_limit))
                for report in phrase_reports:
                    phrase = report.get("normalized") or report.get("phrase") or ""
                    lookup = report.get("lookup") or ""
                    tfidf_vocab = report.get("tfidf_vocab")
                    log(f"Phrase: {phrase} (lookup={lookup}) tfidf_vocab={tfidf_vocab}")
                    rows = report.get("candidates", [])
                    shown = rows if limit is None else rows[:limit]
                    for row in shown:
                          tag = row.get("tag")
                          alias_token = row.get("alias_token")
                          score_fasttext = row.get("score_fasttext")
                          score_context = row.get("score_context")
                          score_combined = row.get("score_combined")
                          count = row.get("count")
                          alias_part = ""
                          if alias_token and alias_token != tag:
                              alias_part = f" [alias_token={alias_token}]"
                          fasttext_str = (
                              f"{score_fasttext:.3f}" if isinstance(score_fasttext, (int, float)) else score_fasttext
                          )
                          if score_context is None:
                              context_str = "None"
                          else:
                              context_str = (
                                  f"{score_context:.3f}" if isinstance(score_context, (int, float)) else score_context
                              )
                          combined_str = (
                              f"{score_combined:.3f}" if isinstance(score_combined, (int, float)) else score_combined
                          )
                          log(
                              f"  {tag}{alias_part} | fasttext={fasttext_str} context={context_str} "
                              f"combined={combined_str} count={count}"
                          )
                    if limit is not None and len(rows) > limit:
                        log(f"  ... ({len(rows) - limit} more)")
        except Exception as e:
            log(f"Retrieval fallback: {type(e).__name__}: {e}")
            candidates = []

        log("Step 3: LLM index selection")
        # We pass the original 'prompt_in' as the description for the LLM to match against
        picked_indices = llm_select_indices(
            query_text=prompt_in,
            candidates=candidates,
            max_pick=0,
            log=log
        )

        selected_tags = [candidates[i].tag for i in picked_indices] if picked_indices else []

        log("Step 3b: Structural tag inference (solo/duo/gender/body plan)")
        structural_tags = llm_infer_structural_tags(prompt_in, log=log)
        if structural_tags:
            # Add structural tags that aren't already selected
            existing = {t for t in selected_tags}
            new_structural = [t for t in structural_tags if t not in existing]
            selected_tags.extend(new_structural)
            log(f"  Added {len(new_structural)} structural tags: {', '.join(new_structural)}")
        else:
            log("  No structural tags inferred")

        log("Step 3c: Expand via tag implications")
        tag_set = set(selected_tags)
        expanded, implied_only = expand_tags_via_implications(tag_set)
        if implied_only:
            selected_tags.extend(sorted(implied_only))
            log(f"  Added {len(implied_only)} implied tags: {', '.join(sorted(implied_only))}")
        else:
            log("  No additional implied tags")

        log("Step 4: Compose final prompt")
        final_prompt = compose_final_prompt(rewritten, selected_tags)

        log("Done: final prompt ready")
        return "\n".join(logs), final_prompt

    except Exception as e:
        log(f"Error: {type(e).__name__}: {e}")
        return "\n".join(logs), ""

    

with gr.Blocks(css=css) as app:
    with gr.Row():
        with gr.Column(scale=3, elem_classes=["prompt-col"]):
            image_tags = gr.Textbox(
                label="Enter Prompt",
                placeholder="e.g. fox, outside, detailed background, .",
                lines=1
            )
        with gr.Column(scale=1):
            _mascot_pil = Image.open(MASCOT_FILE).convert("RGBA")
            mascot_img = gr.Image(
                value=_mascot_pil,
                show_label=False,
                interactive=False,
                height=220,
                elem_id="mascot"
            )
            submit_button = gr.Button("Run", variant="primary")

    gr.Markdown(
        """
### Prompt Squirrel RAG (pipeline version)

Type a rough prompt. This tool rewrites it and aligns it to an e621-style tag vocabulary using Prompt Squirrel internally,
then returns a cleaned, model-friendly prompt.
        """.strip()
    )

    console = gr.Textbox(
        label="Console",
        lines=10,
        interactive=False,
        placeholder="Progress logs will appear here."
    )

    final_prompt = gr.Textbox(
        label="Final Prompt",
        lines=3,
        interactive=False,
        placeholder="Your optimized prompt will appear here."
    )

    submit_button.click(
        rag_pipeline_ui,
        inputs=[image_tags],
        outputs=[console, final_prompt]
    )

    image_tags.submit(
        rag_pipeline_ui,
        inputs=[image_tags],
        outputs=[console, final_prompt]
    )

if __name__ == "__main__":
    app.queue().launch(allowed_paths=[str(MASCOT_DIR)])