import gradio as gr import httpx import asyncio import json API_URL = "http://localhost:8008/stream-words" async def stream_words(query: str): """异步获取 FastAPI 的流式输出""" async with httpx.AsyncClient(timeout=None) as client: async with client.stream("GET", API_URL, params={"query": query}) as response: async for line in response.aiter_lines(): if line.startswith("data: "): try: data = json.loads(line[6:]) if data.get('status') == 'word': yield data.get('word', '') elif data.get('status') == 'start': yield f"🚀 {data.get('message', '开始生成单词...')}\n" elif data.get('status') == 'complete': yield f"\n✅ {data.get('message', '生成完成!')}" except json.JSONDecodeError: continue def stream_words_mcp(query: str): """MCP工具包装函数:流式获取英文单词,返回结果给MCP客户端""" try: # 调用原始函数并收集结果 result = [] # 注意:这里需要处理异步函数 async def collect_tokens(): async for token in stream_words(query): if token.strip(): result.append(token) return result # 运行异步函数 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) tokens = loop.run_until_complete(collect_tokens()) loop.close() # 返回结果给MCP客户端 return f"成功生成 {len(tokens)} 个英文单词: {', '.join(tokens[:10])}{'...' if len(tokens) > 10 else ''}" except Exception as e: return f"生成单词时出错: {str(e)}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🧠 English Words Generator") gr.Markdown("输入你的查询,AI 将为你生成 200 个英文单词(流式输出)") chatbot = gr.Chatbot( type="messages", height=500, show_label=False, container=True, bubble_full_width=False ) with gr.Row(): with gr.Column(scale=8): query_input = gr.Textbox( placeholder="输入你的查询,比如:'给我一些常用英文单词'", label="", show_label=False, lines=2 ) with gr.Column(scale=1): send_btn = gr.Button("发送", variant="primary", size="lg") with gr.Column(scale=1): clear_btn = gr.Button("清空", variant="secondary") def handle_user_input(query, messages): """添加用户消息""" if query.strip(): new_messages = messages + [{"role": "user", "content": query}] return "", new_messages return query, messages def clear_chat(): """清空对话""" return [] async def generate_response(messages): """根据用户最后一条消息流式生成 AI 回复""" if not messages or messages[-1]["role"] != "user": yield messages return last_user_msg = messages[-1]["content"] # 添加 AI 回复占位符 messages.append({"role": "assistant", "content": "正在生成单词..."}) yield messages # 流式拼接回复 full_response = "" async for token in stream_words(last_user_msg): if token.strip(): full_response += " " + token messages[-1] = {"role": "assistant", "content": full_response.strip()} yield messages # 用户提交时,先加消息 → 再生成回复 query_input.submit(handle_user_input, inputs=[query_input, chatbot], outputs=[query_input, chatbot])\ .then(generate_response, inputs=chatbot, outputs=chatbot) send_btn.click(handle_user_input, inputs=[query_input, chatbot], outputs=[query_input, chatbot])\ .then(generate_response, inputs=chatbot, outputs=chatbot) clear_btn.click(clear_chat, outputs=chatbot) # 注册MCP工具 - 使用包装函数并正确配置输出 demo.load(stream_words_mcp, inputs=None, outputs=gr.Textbox(label="MCP结果", visible=False)) # 开启 MCP server demo.launch(mcp_server=True, server_port=7860)