import os os.environ["QT_QPA_PLATFORM"] = "offscreen" import cv2 import numpy as np import pytesseract from PIL import Image, ImageDraw, ImageFont from flask import Flask, request, send_file, jsonify from rembg import remove import io # app.py के शुरुआत में ये लाइनें ज़रूर जोड़ो import face_recognition import text2emotion as te from PIL import Image import asyncio import edge_tts import torch import requests from huggingface_hub import login from pydub import AudioSegment import yt_dlp # --- HUGGING FACE SECRET LOGIN --- HF_TOKEN = os.getenv("HF_TOKEN") if HF_TOKEN: login(token=HF_TOKEN) print("✅ Logged in successfully!") else: print("⚠️ Warning: HF_TOKEN missing!") app = Flask(__name__) # --- Helper: TTS BGM mixing (reused from original) --- @app.route('/') def home(): return "🔥 Daimond Batch Heavy Engine (API MODE) is RUNNING! 🔥" # ---------- IMAGE PROCESSING ENDPOINTS ---------- @app.route('/scan', methods=['POST']) def scan_image(): try: file = request.files['image'].read() npimg = np.frombuffer(file, np.uint8) img = cv2.imdecode(npimg, cv2.IMREAD_COLOR) result = {} # Face Detection try: import face_recognition small_img = cv2.resize(img, (0,0), fx=0.5, fy=0.5) rgb_img = cv2.cvtColor(small_img, cv2.COLOR_BGR2RGB) face_locs = face_recognition.face_locations(rgb_img) result['faces'] = len(face_locs) except Exception as e: result['faces'] = 0 # OCR try: import pytesseract pil_img = Image.open(io.BytesIO(file)) text = pytesseract.image_to_string(pil_img).strip() result['text'] = text except: result['text'] = "" return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/emotion', methods=['POST']) def detect_emotion(): try: data = request.get_json() text = data.get('text', '') if not text: return jsonify({"error": "No text"}), 400 import text2emotion as te emotions = te.get_emotion(text) if emotions: main = max(emotions, key=emotions.get) return jsonify({"emotion": main, "score": emotions[main]}) return jsonify({"emotion": None, "score": 0}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/bg-remove', methods=['POST']) def api_bg_remove(): try: file = request.files['image'].read() API_URL = "https://api-inference.huggingface.co/models/not-lain/rembg" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, data=file, timeout=60) if response.status_code == 200: return send_file(io.BytesIO(response.content), mimetype='image/png') else: return f"BG Remove Error: {response.text}", 500 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/sketch', methods=['POST']) def api_sketch(): try: file = request.files['image'].read() API_URL = "https://api-inference.huggingface.co/models/AP123/sketch-simplify" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, data=file, timeout=60) if response.status_code == 200: return send_file(io.BytesIO(response.content), mimetype='image/jpeg') else: return f"Sketch Error: {response.text}", 500 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/ocr', methods=['POST']) def api_ocr(): try: file = request.files['image'].read() API_URL = "https://api-inference.huggingface.co/models/microsoft/trocr-base-handwritten" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, data=file, timeout=60) if response.status_code == 200: result = response.json() text = result[0]['generated_text'] if isinstance(result, list) else result.get('generated_text', '') return jsonify({"text": text.strip()}) else: return jsonify({"error": response.text}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 # ---------- TTS ---------- @app.route('/tts', methods=['POST']) def tts_api(): try: text = request.form.get('text', 'Hello') rate = request.form.get('rate', '+0%') voice = request.form.get('voice', 'hi-IN-SwaraNeural') bgm_type = request.form.get('bgm', 'none') raw_audio = "temp_raw.mp3" final_audio = "temp_final.mp3" async def generate_audio(): communicate = edge_tts.Communicate(text, voice, rate=rate) await communicate.save(raw_audio) asyncio.run(generate_audio()) if bgm_type in ['news', 'roast']: try: voice_track = AudioSegment.from_file(raw_audio) bgm_file = f"{bgm_type}.mp3" if os.path.exists(bgm_file): music_track = AudioSegment.from_file(bgm_file) if len(music_track) < len(voice_track): music_track = music_track * (len(voice_track) // len(music_track) + 1) music_track = music_track[:len(voice_track)] - 15 final_mix = voice_track.overlay(music_track) final_mix.export(final_audio, format="mp3") return send_file(final_audio, mimetype="audio/mpeg") except Exception as e: print(f"Mixer Error: {e}") return send_file(raw_audio, mimetype="audio/mpeg") except Exception as e: return str(e), 500 # ---------- AI IMAGE GENERATION (FLUX) ---------- @app.route('/imagine', methods=['POST']) def imagine(): prompt = request.json.get('prompt') if not prompt: return "Prompt is required", 400 # Direct HF Inference API call – no local fallback API_URL = "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-schnell" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, json={"inputs": prompt}, timeout=60) if response.status_code == 200: return send_file(io.BytesIO(response.content), mimetype='image/png') else: return f"Imagine Error: {response.text}", 500 @app.route('/generate', methods=['POST']) def generate_alias(): # Alias for /imagine return imagine() @app.route('/enhance', methods=['POST']) def enhance_image(): try: file = request.files['image'].read() API_URL = "https://api-inference.huggingface.co/models/explore-it/realesrgan-x4plus" headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.post(API_URL, headers=headers, data=file, timeout=60) if response.status_code == 200: return send_file(io.BytesIO(response.content), mimetype='image/jpeg') else: return f"Enhance Error: {response.text}", 500 except Exception as e: return f"Enhance Error: {str(e)}", 500 # ---------- MULTI-FONT PAPER GENERATOR ---------- @app.route('/paper', methods=['POST']) def generate_paper(): try: data = request.get_json() segments = data.get('segments', []) # segments: [{"font": 1, "text": "Hello", "color": [0,0,180]}, ...] img_width = 1240 left_margin = 150 top_margin = 180 line_spacing = 65 total_text = " ".join([seg['text'] for seg in segments]) est_lines = total_text.count('\n') + (len(total_text) // 40) + 5 img_height = max(1754, top_margin + (est_lines * line_spacing) + 200) img = Image.new('RGB', (img_width, img_height), color=(255, 255, 255)) draw = ImageDraw.Draw(img) # Red margin lines draw.line([(left_margin, 0), (left_margin, img_height)], fill=(255, 0, 0), width=3) draw.line([(0, top_margin), (img_width, top_margin)], fill=(255, 0, 0), width=3) # Blue ruled lines y = top_margin + line_spacing while y < img_height: draw.line([(0, y), (img_width, y)], fill=(0, 0, 255, 80), width=2) y += line_spacing x_text = left_margin + 20 y_text = top_margin + line_spacing - 45 for seg in segments: font_num = seg.get('font', 1) color = tuple(seg.get('color', [0, 0, 180])) text = seg.get('text', '') try: font = ImageFont.truetype(f"font{font_num}.ttf", 35) except: font = ImageFont.load_default() paragraphs = text.split('\n') for para in paragraphs: if not para.strip(): continue words = para.split(' ') for word in words: bbox = draw.textbbox((0, 0), word + " ", font=font) word_width = bbox[2] - bbox[0] if x_text + word_width > img_width - 50: x_text = left_margin + 20 y_text += line_spacing draw.text((x_text, y_text), word + " ", font=font, fill=color) x_text += word_width x_text = left_margin + 20 y_text += line_spacing img_io = io.BytesIO() img.save(img_io, 'JPEG', quality=90) img_io.seek(0) return send_file(img_io, mimetype='image/jpeg') except Exception as e: return jsonify({"error": str(e)}), 500 # ---------- VIDEO DOWNLOADER (yt-dlp) ---------- @app.route('/download', methods=['POST']) def download_video(): try: data = request.get_json() url = data.get('url') if not url: return jsonify({"error": "URL required"}), 400 ydl_opts = { 'format': 'b[ext=mp4]/best', 'quiet': True, 'noplaylist': True, 'max_filesize': 50000000, # 50 MB 'outtmpl': '/tmp/%(id)s.%(ext)s', 'http_headers': {'User-Agent': 'Mozilla/5.0'} } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filename = ydl.prepare_filename(info) if not os.path.exists(filename): for ext in ['mp4', 'webm', 'mkv']: f = filename.rsplit('.', 1)[0] + '.' + ext if os.path.exists(f): filename = f break return send_file(filename, mimetype='video/mp4', as_attachment=True) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)