Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request | |
| from fastapi.responses import StreamingResponse | |
| import asyncio | |
| import json | |
| from typing import AsyncGenerator | |
| app = FastAPI(title="Streaming English Words API", version="1.0.0") | |
| # 预定义的200个常用英文单词 | |
| ENGLISH_WORDS = [ | |
| "the", "be", "to", "of", "and", "a", "in", "that", "have", "I", | |
| "it", "for", "not", "on", "with", "he", "as", "you", "do", "at", | |
| "this", "but", "his", "by", "from", "they", "we", "say", "her", "she", | |
| "or", "an", "will", "my", "one", "all", "would", "there", "their", "what", | |
| "so", "up", "out", "if", "about", "who", "get", "which", "go", "me", | |
| "when", "make", "can", "like", "time", "no", "just", "him", "know", "take", | |
| "people", "into", "year", "your", "good", "some", "could", "them", "see", "other", | |
| "than", "then", "now", "look", "only", "come", "its", "over", "think", "also", | |
| "back", "after", "use", "two", "how", "our", "work", "first", "well", "way", | |
| "even", "new", "want", "because", "any", "these", "give", "day", "most", "us", | |
| "very", "life", "after", "call", "world", "over", "still", "take", "every", "through", | |
| "before", "long", "where", "much", "should", "well", "people", "down", "own", "work", | |
| "first", "good", "new", "write", "our", "used", "me", "man", "too", "any", | |
| "day", "same", "right", "look", "think", "also", "around", "another", "came" | |
| ] | |
| async def generate_words_stream(query: str) -> AsyncGenerator[str, None]: | |
| """生成200个英文单词的流式输出""" | |
| # 发送开始标记 | |
| yield f"data: {json.dumps({'status': 'start', 'query': query, 'message': '开始生成200个英文单词...'})}\n\n" | |
| # 流式输出200个单词 | |
| for i, word in enumerate(ENGLISH_WORDS, 1): | |
| # 模拟一些处理时间,让流式效果更明显 | |
| await asyncio.sleep(0.05) | |
| # 发送单词数据 | |
| data = { | |
| 'status': 'word', | |
| 'index': i, | |
| 'word': word, | |
| 'total': len(ENGLISH_WORDS) | |
| } | |
| yield f"data: {json.dumps(data)}\n\n" | |
| # 发送完成标记 | |
| yield f"data: {json.dumps({'status': 'complete', 'message': '200个英文单词生成完成!'})}\n\n" | |
| async def root(): | |
| return {"message": "Streaming English Words API", "endpoint": "/stream-words"} | |
| async def stream_words(query: str = "default"): | |
| """流式输出200个英文单词""" | |
| return StreamingResponse( | |
| generate_words_stream(query), | |
| media_type="text/plain", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "Content-Type": "text/event-stream", | |
| } | |
| ) | |
| async def get_words(query: str = "default"): | |
| """一次性返回所有200个英文单词""" | |
| return { | |
| "query": query, | |
| "total_words": len(ENGLISH_WORDS), | |
| "words": ENGLISH_WORDS | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8008) |