Spaces:
Running
Running
| import os | |
| os.environ["QT_QPA_PLATFORM"] = "offscreen" | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| from flask import Flask, request, send_file, jsonify | |
| #from rembg import remove | |
| import io | |
| import asyncio | |
| import edge_tts | |
| import torch | |
| import requests | |
| from huggingface_hub import login | |
| from pydub import AudioSegment | |
| import yt_dlp | |
| # HF Space के app.py में ये जोड़ें | |
| from flask import Flask, request, jsonify | |
| # अब ये नीचे वाले imports मत डालो (requirements में नहीं हैं) | |
| # import face_recognition | |
| # import text2emotion as te | |
| # import pytesseract | |
| # --- HUGGING FACE SECRET LOGIN --- | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if HF_TOKEN: | |
| login(token=HF_TOKEN) | |
| print("✅ Logged in successfully!") | |
| else: | |
| print("⚠️ Warning: HF_TOKEN missing!") | |
| app = Flask(__name__) | |
| # --- Helper: TTS BGM mixing (reused from original) --- | |
| def home(): | |
| return "🔥 Daimond Batch Heavy Engine (API MODE) is RUNNING! 🔥" | |
| # ---------- IMAGE PROCESSING ENDPOINTS ---------- | |
| def scan_image(): | |
| try: | |
| file = request.files['image'].read() | |
| # Face Detection API | |
| FACE_API = "https://api-inference.huggingface.co/models/arnabdhar/face-detection" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| face_res = requests.post(FACE_API, headers=headers, data=file, timeout=30) | |
| face_count = 0 | |
| if face_res.status_code == 200: | |
| result = face_res.json() | |
| face_count = len(result) if isinstance(result, list) else 0 | |
| # OCR API | |
| OCR_API = "https://api-inference.huggingface.co/models/microsoft/trocr-base-printed" | |
| ocr_res = requests.post(OCR_API, headers=headers, data=file, timeout=30) | |
| extracted_text = "" | |
| if ocr_res.status_code == 200: | |
| ocr_data = ocr_res.json() | |
| extracted_text = ocr_data[0]['generated_text'] if isinstance(ocr_data, list) else ocr_data.get('generated_text', '') | |
| return jsonify({"faces": face_count, "text": extracted_text.strip()}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def detect_emotion(): | |
| try: | |
| data = request.get_json() | |
| text = data.get('text', '') | |
| API_URL = "https://api-inference.huggingface.co/models/bhadresh-savani/bert-base-uncased-emotion" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| response = requests.post(API_URL, headers=headers, json={"inputs": text}, timeout=30) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # result एक list होती है जिसमें label और score होते हैं | |
| if isinstance(result, list) and len(result) > 0: | |
| top = max(result[0], key=lambda x: x['score']) | |
| return jsonify({"emotion": top['label'], "score": top['score']}) | |
| return jsonify({"error": "Emotion detection failed"}), 500 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def api_bg_remove(): | |
| try: | |
| file = request.files['image'].read() | |
| API_URL = "https://api-inference.huggingface.co/models/not-lain/rembg" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| response = requests.post(API_URL, headers=headers, data=file, timeout=60) | |
| if response.status_code == 200: | |
| return send_file(io.BytesIO(response.content), mimetype='image/png') | |
| else: | |
| return f"BG Remove Error: {response.text}", 500 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def api_sketch(): | |
| try: | |
| file = request.files['image'].read() | |
| API_URL = "https://api-inference.huggingface.co/models/AP123/sketch-simplify" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| response = requests.post(API_URL, headers=headers, data=file, timeout=60) | |
| if response.status_code == 200: | |
| return send_file(io.BytesIO(response.content), mimetype='image/jpeg') | |
| else: | |
| return f"Sketch Error: {response.text}", 500 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def api_ocr(): | |
| try: | |
| file = request.files['image'].read() | |
| API_URL = "https://api-inference.huggingface.co/models/microsoft/trocr-base-handwritten" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| response = requests.post(API_URL, headers=headers, data=file, timeout=60) | |
| if response.status_code == 200: | |
| result = response.json() | |
| text = result[0]['generated_text'] if isinstance(result, list) else result.get('generated_text', '') | |
| return jsonify({"text": text.strip()}) | |
| else: | |
| return jsonify({"error": response.text}), 500 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # HF Space के app.py में ये जोड़ें | |
| # ... आपके बाकी के सभी एंडपॉइंट (जैसे /tts, /imagine, /bg-remove) ... | |
| import time | |
| import yt_dlp | |
| from flask import Flask, request, jsonify | |
| import os | |
| # ... आपके अन्य सभी इम्पोर्ट और एंडपॉइंट ... | |
| from flask import Flask, request, jsonify | |
| from piped_api import PipedClient | |
| import time | |
| app = Flask(__name__) | |
| PIPED_CLIENT = PipedClient() | |
| def get_stream(): | |
| start_time = time.time() | |
| data = request.json | |
| query = data.get('query') | |
| if not query: | |
| return jsonify({"error": "Query is required"}), 400 | |
| try: | |
| # Piped API से केवल 1 रिजल्ट खोजें (तेज़ी के लिए) | |
| search_results = PIPED_CLIENT.search(query, limit=1) | |
| if not search_results: | |
| return jsonify({"error": "No video found"}), 404 | |
| first_result = search_results[0] | |
| video_id = first_result['url'].split('=')[-1] | |
| # वीडियो की डिटेल लें | |
| video_detail = PIPED_CLIENT.get_video(video_id) | |
| if not video_detail.audio_streams: | |
| return jsonify({"error": "No audio stream found"}), 500 | |
| # सबसे अच्छी क्वालिटी की ऑडियो स्ट्रीम चुनें | |
| audio_stream = video_detail.audio_streams[0] | |
| processing_time = time.time() - start_time | |
| print(f"✅ Stream URL generated in {processing_time:.2f} seconds") | |
| return jsonify({ | |
| "title": video_detail.title, | |
| "webpage_url": f"https://youtube.com/watch?v={video_id}", | |
| "duration": video_detail.duration, | |
| "stream_url": audio_stream.url, | |
| "requester": data.get('requester', 'Unknown') | |
| }) | |
| except Exception as e: | |
| print(f"❌ Error in /get-stream: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---------- TTS ---------- | |
| def tts_api(): | |
| try: | |
| text = request.form.get('text', 'Hello') | |
| rate = request.form.get('rate', '+0%') | |
| voice = request.form.get('voice', 'hi-IN-SwaraNeural') | |
| bgm_type = request.form.get('bgm', 'none') | |
| raw_audio = "temp_raw.mp3" | |
| final_audio = "temp_final.mp3" | |
| async def generate_audio(): | |
| communicate = edge_tts.Communicate(text, voice, rate=rate) | |
| await communicate.save(raw_audio) | |
| asyncio.run(generate_audio()) | |
| if bgm_type in ['news', 'roast']: | |
| try: | |
| voice_track = AudioSegment.from_file(raw_audio) | |
| bgm_file = f"{bgm_type}.mp3" | |
| if os.path.exists(bgm_file): | |
| music_track = AudioSegment.from_file(bgm_file) | |
| if len(music_track) < len(voice_track): | |
| music_track = music_track * (len(voice_track) // len(music_track) + 1) | |
| music_track = music_track[:len(voice_track)] - 15 | |
| final_mix = voice_track.overlay(music_track) | |
| final_mix.export(final_audio, format="mp3") | |
| return send_file(final_audio, mimetype="audio/mpeg") | |
| except Exception as e: | |
| print(f"Mixer Error: {e}") | |
| return send_file(raw_audio, mimetype="audio/mpeg") | |
| except Exception as e: | |
| return str(e), 500 | |
| # ---------- AI IMAGE GENERATION (FLUX) ---------- | |
| def imagine(): | |
| prompt = request.json.get('prompt') | |
| if not prompt: | |
| return "Prompt is required", 400 | |
| # Direct HF Inference API call – no local fallback | |
| API_URL = "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-schnell" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| response = requests.post(API_URL, headers=headers, json={"inputs": prompt}, timeout=60) | |
| if response.status_code == 200: | |
| return send_file(io.BytesIO(response.content), mimetype='image/png') | |
| else: | |
| return f"Imagine Error: {response.text}", 500 | |
| def generate_alias(): | |
| # Alias for /imagine | |
| return imagine() | |
| def enhance_image(): | |
| try: | |
| file = request.files['image'].read() | |
| API_URL = "https://api-inference.huggingface.co/models/explore-it/realesrgan-x4plus" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| response = requests.post(API_URL, headers=headers, data=file, timeout=60) | |
| if response.status_code == 200: | |
| return send_file(io.BytesIO(response.content), mimetype='image/jpeg') | |
| else: | |
| return f"Enhance Error: {response.text}", 500 | |
| except Exception as e: | |
| return f"Enhance Error: {str(e)}", 500 | |
| # ---------- MULTI-FONT PAPER GENERATOR ---------- | |
| def generate_paper(): | |
| try: | |
| data = request.get_json() | |
| segments = data.get('segments', []) | |
| # segments: [{"font": 1, "text": "Hello", "color": [0,0,180]}, ...] | |
| img_width = 1240 | |
| left_margin = 150 | |
| top_margin = 180 | |
| line_spacing = 65 | |
| total_text = " ".join([seg['text'] for seg in segments]) | |
| est_lines = total_text.count('\n') + (len(total_text) // 40) + 5 | |
| img_height = max(1754, top_margin + (est_lines * line_spacing) + 200) | |
| img = Image.new('RGB', (img_width, img_height), color=(255, 255, 255)) | |
| draw = ImageDraw.Draw(img) | |
| # Red margin lines | |
| draw.line([(left_margin, 0), (left_margin, img_height)], fill=(255, 0, 0), width=3) | |
| draw.line([(0, top_margin), (img_width, top_margin)], fill=(255, 0, 0), width=3) | |
| # Blue ruled lines | |
| y = top_margin + line_spacing | |
| while y < img_height: | |
| draw.line([(0, y), (img_width, y)], fill=(0, 0, 255, 80), width=2) | |
| y += line_spacing | |
| x_text = left_margin + 20 | |
| y_text = top_margin + line_spacing - 45 | |
| for seg in segments: | |
| font_num = seg.get('font', 1) | |
| color = tuple(seg.get('color', [0, 0, 180])) | |
| text = seg.get('text', '') | |
| try: | |
| font = ImageFont.truetype(f"font{font_num}.ttf", 35) | |
| except: | |
| font = ImageFont.load_default() | |
| paragraphs = text.split('\n') | |
| for para in paragraphs: | |
| if not para.strip(): | |
| continue | |
| words = para.split(' ') | |
| for word in words: | |
| bbox = draw.textbbox((0, 0), word + " ", font=font) | |
| word_width = bbox[2] - bbox[0] | |
| if x_text + word_width > img_width - 50: | |
| x_text = left_margin + 20 | |
| y_text += line_spacing | |
| draw.text((x_text, y_text), word + " ", font=font, fill=color) | |
| x_text += word_width | |
| x_text = left_margin + 20 | |
| y_text += line_spacing | |
| img_io = io.BytesIO() | |
| img.save(img_io, 'JPEG', quality=90) | |
| img_io.seek(0) | |
| return send_file(img_io, mimetype='image/jpeg') | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # ---------- VIDEO DOWNLOADER (yt-dlp) ---------- | |
| def download_video(): | |
| try: | |
| data = request.get_json() | |
| url = data.get('url') | |
| if not url: | |
| return jsonify({"error": "URL required"}), 400 | |
| ydl_opts = { | |
| 'format': 'b[ext=mp4]/best', | |
| 'quiet': True, | |
| 'noplaylist': True, | |
| 'max_filesize': 50000000, # 50 MB | |
| 'outtmpl': '/tmp/%(id)s.%(ext)s', | |
| 'http_headers': {'User-Agent': 'Mozilla/5.0'} | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| filename = ydl.prepare_filename(info) | |
| if not os.path.exists(filename): | |
| for ext in ['mp4', 'webm', 'mkv']: | |
| f = filename.rsplit('.', 1)[0] + '.' + ext | |
| if os.path.exists(f): | |
| filename = f | |
| break | |
| return send_file(filename, mimetype='video/mp4', as_attachment=True) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860) |