from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse import asyncio import json from typing import AsyncGenerator app = FastAPI(title="Streaming English Words API", version="1.0.0") # 预定义的200个常用英文单词 ENGLISH_WORDS = [ "the", "be", "to", "of", "and", "a", "in", "that", "have", "I", "it", "for", "not", "on", "with", "he", "as", "you", "do", "at", "this", "but", "his", "by", "from", "they", "we", "say", "her", "she", "or", "an", "will", "my", "one", "all", "would", "there", "their", "what", "so", "up", "out", "if", "about", "who", "get", "which", "go", "me", "when", "make", "can", "like", "time", "no", "just", "him", "know", "take", "people", "into", "year", "your", "good", "some", "could", "them", "see", "other", "than", "then", "now", "look", "only", "come", "its", "over", "think", "also", "back", "after", "use", "two", "how", "our", "work", "first", "well", "way", "even", "new", "want", "because", "any", "these", "give", "day", "most", "us", "very", "life", "after", "call", "world", "over", "still", "take", "every", "through", "before", "long", "where", "much", "should", "well", "people", "down", "own", "work", "first", "good", "new", "write", "our", "used", "me", "man", "too", "any", "day", "same", "right", "look", "think", "also", "around", "another", "came" ] async def generate_words_stream(query: str) -> AsyncGenerator[str, None]: """生成200个英文单词的流式输出""" # 发送开始标记 yield f"data: {json.dumps({'status': 'start', 'query': query, 'message': '开始生成200个英文单词...'})}\n\n" # 流式输出200个单词 for i, word in enumerate(ENGLISH_WORDS, 1): # 模拟一些处理时间,让流式效果更明显 await asyncio.sleep(0.05) # 发送单词数据 data = { 'status': 'word', 'index': i, 'word': word, 'total': len(ENGLISH_WORDS) } yield f"data: {json.dumps(data)}\n\n" # 发送完成标记 yield f"data: {json.dumps({'status': 'complete', 'message': '200个英文单词生成完成!'})}\n\n" @app.get("/") async def root(): return {"message": "Streaming English Words API", "endpoint": "/stream-words"} @app.get("/stream-words") async def stream_words(query: str = "default"): """流式输出200个英文单词""" return StreamingResponse( generate_words_stream(query), media_type="text/plain", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "text/event-stream", } ) @app.get("/words") async def get_words(query: str = "default"): """一次性返回所有200个英文单词""" return { "query": query, "total_words": len(ENGLISH_WORDS), "words": ENGLISH_WORDS } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8008)