import os os.environ["QT_QPA_PLATFORM"] = "offscreen" import cv2 import numpy as np from PIL import Image, ImageDraw, ImageFont from flask import Flask, request, send_file, jsonify #from rembg import remove import io import asyncio import edge_tts import torch import requests from huggingface_hub import login from pydub import AudioSegment import yt_dlp # HF Space के app.py में ये जोड़ें from flask import Flask, request, jsonify # अब ये नीचे वाले imports मत डालो (requirements में नहीं हैं) # import face_recognition # import text2emotion as te # import pytesseract # --- HUGGING FACE SECRET LOGIN --- HF_TOKEN = os.getenv("HF_TOKEN") if HF_TOKEN: login(token=HF_TOKEN) print("✅ Logged in successfully!") else: print("⚠️ Warning: HF_TOKEN missing!") app = Flask(__name__) # --- Helper: TTS BGM mixing (reused from original) --- @app.route('/') def home(): return "🔥 Daimond Batch Heavy Engine (API MODE) is RUNNING! 🔥" # ---------- IMAGE PROCESSING ENDPOINTS ---------- @app.route('/scan', methods=['POST']) def scan_image(): try: file = request.files['image'].read() # Face Detection API FACE_API = "https://api-inference.huggingface.co/models/arnabdhar/face-detection" headers = {"Authorization": f"Bearer {HF_TOKEN}"} face_res = requests.post(FACE_API, headers=headers, data=file, timeout=30) face_count = 0 if face_res.status_code == 200: result = face_res.json() face_count = len(result) if isinstance(result, list) else 0 # OCR API OCR_API = "https://api-inference.huggingface.co/models/microsoft/trocr-base-printed" ocr_res = requests.post(OCR_API, headers=headers, data=file, timeout=30) extracted_text = "" if ocr_res.status_code == 200: ocr_data = ocr_res.json() extracted_text = ocr_data[0]['generated_text'] if isinstance(ocr_data, list) else ocr_data.get('generated_text', '') return jsonify({"faces": face_count, "text": extracted_text.strip()}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/emotion', methods=['POST']) def detect_emotion(): try: data = request.get_json() text = data.get('text', '') API_URL = "https://api-inference.huggingface.co/models/bhadresh-savani/bert-base-uncased-emotion" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, json={"inputs": text}, timeout=30) if response.status_code == 200: result = response.json() # result एक list होती है जिसमें label और score होते हैं if isinstance(result, list) and len(result) > 0: top = max(result[0], key=lambda x: x['score']) return jsonify({"emotion": top['label'], "score": top['score']}) return jsonify({"error": "Emotion detection failed"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/bg-remove', methods=['POST']) def api_bg_remove(): try: file = request.files['image'].read() API_URL = "https://api-inference.huggingface.co/models/not-lain/rembg" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, data=file, timeout=60) if response.status_code == 200: return send_file(io.BytesIO(response.content), mimetype='image/png') else: return f"BG Remove Error: {response.text}", 500 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/sketch', methods=['POST']) def api_sketch(): try: file = request.files['image'].read() API_URL = "https://api-inference.huggingface.co/models/AP123/sketch-simplify" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, data=file, timeout=60) if response.status_code == 200: return send_file(io.BytesIO(response.content), mimetype='image/jpeg') else: return f"Sketch Error: {response.text}", 500 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/ocr', methods=['POST']) def api_ocr(): try: file = request.files['image'].read() API_URL = "https://api-inference.huggingface.co/models/microsoft/trocr-base-handwritten" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, data=file, timeout=60) if response.status_code == 200: result = response.json() text = result[0]['generated_text'] if isinstance(result, list) else result.get('generated_text', '') return jsonify({"text": text.strip()}) else: return jsonify({"error": response.text}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 # HF Space के app.py में ये जोड़ें # ... आपके बाकी के सभी एंडपॉइंट (जैसे /tts, /imagine, /bg-remove) ... import time import yt_dlp from flask import Flask, request, jsonify import os # ... आपके अन्य सभी इम्पोर्ट और एंडपॉइंट ... from flask import Flask, request, jsonify from piped_api import PipedClient import time app = Flask(__name__) PIPED_CLIENT = PipedClient() @app.route('/get-stream', methods=['POST']) def get_stream(): start_time = time.time() data = request.json query = data.get('query') if not query: return jsonify({"error": "Query is required"}), 400 try: # Piped API से केवल 1 रिजल्ट खोजें (तेज़ी के लिए) search_results = PIPED_CLIENT.search(query, limit=1) if not search_results: return jsonify({"error": "No video found"}), 404 first_result = search_results[0] video_id = first_result['url'].split('=')[-1] # वीडियो की डिटेल लें video_detail = PIPED_CLIENT.get_video(video_id) if not video_detail.audio_streams: return jsonify({"error": "No audio stream found"}), 500 # सबसे अच्छी क्वालिटी की ऑडियो स्ट्रीम चुनें audio_stream = video_detail.audio_streams[0] processing_time = time.time() - start_time print(f"✅ Stream URL generated in {processing_time:.2f} seconds") return jsonify({ "title": video_detail.title, "webpage_url": f"https://youtube.com/watch?v={video_id}", "duration": video_detail.duration, "stream_url": audio_stream.url, "requester": data.get('requester', 'Unknown') }) except Exception as e: print(f"❌ Error in /get-stream: {e}") return jsonify({"error": str(e)}), 500 # ---------- TTS ---------- @app.route('/tts', methods=['POST']) def tts_api(): try: text = request.form.get('text', 'Hello') rate = request.form.get('rate', '+0%') voice = request.form.get('voice', 'hi-IN-SwaraNeural') bgm_type = request.form.get('bgm', 'none') raw_audio = "temp_raw.mp3" final_audio = "temp_final.mp3" async def generate_audio(): communicate = edge_tts.Communicate(text, voice, rate=rate) await communicate.save(raw_audio) asyncio.run(generate_audio()) if bgm_type in ['news', 'roast']: try: voice_track = AudioSegment.from_file(raw_audio) bgm_file = f"{bgm_type}.mp3" if os.path.exists(bgm_file): music_track = AudioSegment.from_file(bgm_file) if len(music_track) < len(voice_track): music_track = music_track * (len(voice_track) // len(music_track) + 1) music_track = music_track[:len(voice_track)] - 15 final_mix = voice_track.overlay(music_track) final_mix.export(final_audio, format="mp3") return send_file(final_audio, mimetype="audio/mpeg") except Exception as e: print(f"Mixer Error: {e}") return send_file(raw_audio, mimetype="audio/mpeg") except Exception as e: return str(e), 500 # ---------- AI IMAGE GENERATION (FLUX) ---------- @app.route('/imagine', methods=['POST']) def imagine(): prompt = request.json.get('prompt') if not prompt: return "Prompt is required", 400 # Direct HF Inference API call – no local fallback API_URL = "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-schnell" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, json={"inputs": prompt}, timeout=60) if response.status_code == 200: return send_file(io.BytesIO(response.content), mimetype='image/png') else: return f"Imagine Error: {response.text}", 500 @app.route('/generate', methods=['POST']) def generate_alias(): # Alias for /imagine return imagine() @app.route('/enhance', methods=['POST']) def enhance_image(): try: file = request.files['image'].read() API_URL = "https://api-inference.huggingface.co/models/explore-it/realesrgan-x4plus" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, data=file, timeout=60) if response.status_code == 200: return send_file(io.BytesIO(response.content), mimetype='image/jpeg') else: return f"Enhance Error: {response.text}", 500 except Exception as e: return f"Enhance Error: {str(e)}", 500 # ---------- MULTI-FONT PAPER GENERATOR ---------- @app.route('/paper', methods=['POST']) def generate_paper(): try: data = request.get_json() segments = data.get('segments', []) # segments: [{"font": 1, "text": "Hello", "color": [0,0,180]}, ...] img_width = 1240 left_margin = 150 top_margin = 180 line_spacing = 65 total_text = " ".join([seg['text'] for seg in segments]) est_lines = total_text.count('\n') + (len(total_text) // 40) + 5 img_height = max(1754, top_margin + (est_lines * line_spacing) + 200) img = Image.new('RGB', (img_width, img_height), color=(255, 255, 255)) draw = ImageDraw.Draw(img) # Red margin lines draw.line([(left_margin, 0), (left_margin, img_height)], fill=(255, 0, 0), width=3) draw.line([(0, top_margin), (img_width, top_margin)], fill=(255, 0, 0), width=3) # Blue ruled lines y = top_margin + line_spacing while y < img_height: draw.line([(0, y), (img_width, y)], fill=(0, 0, 255, 80), width=2) y += line_spacing x_text = left_margin + 20 y_text = top_margin + line_spacing - 45 for seg in segments: font_num = seg.get('font', 1) color = tuple(seg.get('color', [0, 0, 180])) text = seg.get('text', '') try: font = ImageFont.truetype(f"font{font_num}.ttf", 35) except: font = ImageFont.load_default() paragraphs = text.split('\n') for para in paragraphs: if not para.strip(): continue words = para.split(' ') for word in words: bbox = draw.textbbox((0, 0), word + " ", font=font) word_width = bbox[2] - bbox[0] if x_text + word_width > img_width - 50: x_text = left_margin + 20 y_text += line_spacing draw.text((x_text, y_text), word + " ", font=font, fill=color) x_text += word_width x_text = left_margin + 20 y_text += line_spacing img_io = io.BytesIO() img.save(img_io, 'JPEG', quality=90) img_io.seek(0) return send_file(img_io, mimetype='image/jpeg') except Exception as e: return jsonify({"error": str(e)}), 500 # ---------- VIDEO DOWNLOADER (yt-dlp) ---------- @app.route('/download', methods=['POST']) def download_video(): try: data = request.get_json() url = data.get('url') if not url: return jsonify({"error": "URL required"}), 400 ydl_opts = { 'format': 'b[ext=mp4]/best', 'quiet': True, 'noplaylist': True, 'max_filesize': 50000000, # 50 MB 'outtmpl': '/tmp/%(id)s.%(ext)s', 'http_headers': {'User-Agent': 'Mozilla/5.0'} } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filename = ydl.prepare_filename(info) if not os.path.exists(filename): for ext in ['mp4', 'webm', 'mkv']: f = filename.rsplit('.', 1)[0] + '.' + ext if os.path.exists(f): filename = f break return send_file(filename, mimetype='video/mp4', as_attachment=True) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)