Daimond-Batch / app.py
Singhp08's picture
Update app.py
0ab065c verified
import os
os.environ["QT_QPA_PLATFORM"] = "offscreen"
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from flask import Flask, request, send_file, jsonify
#from rembg import remove
import io
import asyncio
import edge_tts
import torch
import requests
from huggingface_hub import login
from pydub import AudioSegment
import yt_dlp
# HF Space के app.py में ये जोड़ें
from flask import Flask, request, jsonify
# अब ये नीचे वाले imports मत डालो (requirements में नहीं हैं)
# import face_recognition
# import text2emotion as te
# import pytesseract
# --- HUGGING FACE SECRET LOGIN ---
HF_TOKEN = os.getenv("HF_TOKEN")
if HF_TOKEN:
login(token=HF_TOKEN)
print("✅ Logged in successfully!")
else:
print("⚠️ Warning: HF_TOKEN missing!")
app = Flask(__name__)
# --- Helper: TTS BGM mixing (reused from original) ---
@app.route('/')
def home():
return "🔥 Daimond Batch Heavy Engine (API MODE) is RUNNING! 🔥"
# ---------- IMAGE PROCESSING ENDPOINTS ----------
@app.route('/scan', methods=['POST'])
def scan_image():
try:
file = request.files['image'].read()
# Face Detection API
FACE_API = "https://api-inference.huggingface.co/models/arnabdhar/face-detection"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
face_res = requests.post(FACE_API, headers=headers, data=file, timeout=30)
face_count = 0
if face_res.status_code == 200:
result = face_res.json()
face_count = len(result) if isinstance(result, list) else 0
# OCR API
OCR_API = "https://api-inference.huggingface.co/models/microsoft/trocr-base-printed"
ocr_res = requests.post(OCR_API, headers=headers, data=file, timeout=30)
extracted_text = ""
if ocr_res.status_code == 200:
ocr_data = ocr_res.json()
extracted_text = ocr_data[0]['generated_text'] if isinstance(ocr_data, list) else ocr_data.get('generated_text', '')
return jsonify({"faces": face_count, "text": extracted_text.strip()})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/emotion', methods=['POST'])
def detect_emotion():
try:
data = request.get_json()
text = data.get('text', '')
API_URL = "https://api-inference.huggingface.co/models/bhadresh-savani/bert-base-uncased-emotion"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
response = requests.post(API_URL, headers=headers, json={"inputs": text}, timeout=30)
if response.status_code == 200:
result = response.json()
# result एक list होती है जिसमें label और score होते हैं
if isinstance(result, list) and len(result) > 0:
top = max(result[0], key=lambda x: x['score'])
return jsonify({"emotion": top['label'], "score": top['score']})
return jsonify({"error": "Emotion detection failed"}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/bg-remove', methods=['POST'])
def api_bg_remove():
try:
file = request.files['image'].read()
API_URL = "https://api-inference.huggingface.co/models/not-lain/rembg"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
response = requests.post(API_URL, headers=headers, data=file, timeout=60)
if response.status_code == 200:
return send_file(io.BytesIO(response.content), mimetype='image/png')
else:
return f"BG Remove Error: {response.text}", 500
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/sketch', methods=['POST'])
def api_sketch():
try:
file = request.files['image'].read()
API_URL = "https://api-inference.huggingface.co/models/AP123/sketch-simplify"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
response = requests.post(API_URL, headers=headers, data=file, timeout=60)
if response.status_code == 200:
return send_file(io.BytesIO(response.content), mimetype='image/jpeg')
else:
return f"Sketch Error: {response.text}", 500
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/ocr', methods=['POST'])
def api_ocr():
try:
file = request.files['image'].read()
API_URL = "https://api-inference.huggingface.co/models/microsoft/trocr-base-handwritten"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
response = requests.post(API_URL, headers=headers, data=file, timeout=60)
if response.status_code == 200:
result = response.json()
text = result[0]['generated_text'] if isinstance(result, list) else result.get('generated_text', '')
return jsonify({"text": text.strip()})
else:
return jsonify({"error": response.text}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
# HF Space के app.py में ये जोड़ें
# ... आपके बाकी के सभी एंडपॉइंट (जैसे /tts, /imagine, /bg-remove) ...
import time
import yt_dlp
from flask import Flask, request, jsonify
import os
# ... आपके अन्य सभी इम्पोर्ट और एंडपॉइंट ...
from flask import Flask, request, jsonify
from piped_api import PipedClient
import time
app = Flask(__name__)
PIPED_CLIENT = PipedClient()
@app.route('/get-stream', methods=['POST'])
def get_stream():
start_time = time.time()
data = request.json
query = data.get('query')
if not query:
return jsonify({"error": "Query is required"}), 400
try:
# Piped API से केवल 1 रिजल्ट खोजें (तेज़ी के लिए)
search_results = PIPED_CLIENT.search(query, limit=1)
if not search_results:
return jsonify({"error": "No video found"}), 404
first_result = search_results[0]
video_id = first_result['url'].split('=')[-1]
# वीडियो की डिटेल लें
video_detail = PIPED_CLIENT.get_video(video_id)
if not video_detail.audio_streams:
return jsonify({"error": "No audio stream found"}), 500
# सबसे अच्छी क्वालिटी की ऑडियो स्ट्रीम चुनें
audio_stream = video_detail.audio_streams[0]
processing_time = time.time() - start_time
print(f"✅ Stream URL generated in {processing_time:.2f} seconds")
return jsonify({
"title": video_detail.title,
"webpage_url": f"https://youtube.com/watch?v={video_id}",
"duration": video_detail.duration,
"stream_url": audio_stream.url,
"requester": data.get('requester', 'Unknown')
})
except Exception as e:
print(f"❌ Error in /get-stream: {e}")
return jsonify({"error": str(e)}), 500
# ---------- TTS ----------
@app.route('/tts', methods=['POST'])
def tts_api():
try:
text = request.form.get('text', 'Hello')
rate = request.form.get('rate', '+0%')
voice = request.form.get('voice', 'hi-IN-SwaraNeural')
bgm_type = request.form.get('bgm', 'none')
raw_audio = "temp_raw.mp3"
final_audio = "temp_final.mp3"
async def generate_audio():
communicate = edge_tts.Communicate(text, voice, rate=rate)
await communicate.save(raw_audio)
asyncio.run(generate_audio())
if bgm_type in ['news', 'roast']:
try:
voice_track = AudioSegment.from_file(raw_audio)
bgm_file = f"{bgm_type}.mp3"
if os.path.exists(bgm_file):
music_track = AudioSegment.from_file(bgm_file)
if len(music_track) < len(voice_track):
music_track = music_track * (len(voice_track) // len(music_track) + 1)
music_track = music_track[:len(voice_track)] - 15
final_mix = voice_track.overlay(music_track)
final_mix.export(final_audio, format="mp3")
return send_file(final_audio, mimetype="audio/mpeg")
except Exception as e:
print(f"Mixer Error: {e}")
return send_file(raw_audio, mimetype="audio/mpeg")
except Exception as e:
return str(e), 500
# ---------- AI IMAGE GENERATION (FLUX) ----------
@app.route('/imagine', methods=['POST'])
def imagine():
prompt = request.json.get('prompt')
if not prompt:
return "Prompt is required", 400
# Direct HF Inference API call – no local fallback
API_URL = "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-schnell"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
response = requests.post(API_URL, headers=headers, json={"inputs": prompt}, timeout=60)
if response.status_code == 200:
return send_file(io.BytesIO(response.content), mimetype='image/png')
else:
return f"Imagine Error: {response.text}", 500
@app.route('/generate', methods=['POST'])
def generate_alias():
# Alias for /imagine
return imagine()
@app.route('/enhance', methods=['POST'])
def enhance_image():
try:
file = request.files['image'].read()
API_URL = "https://api-inference.huggingface.co/models/explore-it/realesrgan-x4plus"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
response = requests.post(API_URL, headers=headers, data=file, timeout=60)
if response.status_code == 200:
return send_file(io.BytesIO(response.content), mimetype='image/jpeg')
else:
return f"Enhance Error: {response.text}", 500
except Exception as e:
return f"Enhance Error: {str(e)}", 500
# ---------- MULTI-FONT PAPER GENERATOR ----------
@app.route('/paper', methods=['POST'])
def generate_paper():
try:
data = request.get_json()
segments = data.get('segments', [])
# segments: [{"font": 1, "text": "Hello", "color": [0,0,180]}, ...]
img_width = 1240
left_margin = 150
top_margin = 180
line_spacing = 65
total_text = " ".join([seg['text'] for seg in segments])
est_lines = total_text.count('\n') + (len(total_text) // 40) + 5
img_height = max(1754, top_margin + (est_lines * line_spacing) + 200)
img = Image.new('RGB', (img_width, img_height), color=(255, 255, 255))
draw = ImageDraw.Draw(img)
# Red margin lines
draw.line([(left_margin, 0), (left_margin, img_height)], fill=(255, 0, 0), width=3)
draw.line([(0, top_margin), (img_width, top_margin)], fill=(255, 0, 0), width=3)
# Blue ruled lines
y = top_margin + line_spacing
while y < img_height:
draw.line([(0, y), (img_width, y)], fill=(0, 0, 255, 80), width=2)
y += line_spacing
x_text = left_margin + 20
y_text = top_margin + line_spacing - 45
for seg in segments:
font_num = seg.get('font', 1)
color = tuple(seg.get('color', [0, 0, 180]))
text = seg.get('text', '')
try:
font = ImageFont.truetype(f"font{font_num}.ttf", 35)
except:
font = ImageFont.load_default()
paragraphs = text.split('\n')
for para in paragraphs:
if not para.strip():
continue
words = para.split(' ')
for word in words:
bbox = draw.textbbox((0, 0), word + " ", font=font)
word_width = bbox[2] - bbox[0]
if x_text + word_width > img_width - 50:
x_text = left_margin + 20
y_text += line_spacing
draw.text((x_text, y_text), word + " ", font=font, fill=color)
x_text += word_width
x_text = left_margin + 20
y_text += line_spacing
img_io = io.BytesIO()
img.save(img_io, 'JPEG', quality=90)
img_io.seek(0)
return send_file(img_io, mimetype='image/jpeg')
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---------- VIDEO DOWNLOADER (yt-dlp) ----------
@app.route('/download', methods=['POST'])
def download_video():
try:
data = request.get_json()
url = data.get('url')
if not url:
return jsonify({"error": "URL required"}), 400
ydl_opts = {
'format': 'b[ext=mp4]/best',
'quiet': True,
'noplaylist': True,
'max_filesize': 50000000, # 50 MB
'outtmpl': '/tmp/%(id)s.%(ext)s',
'http_headers': {'User-Agent': 'Mozilla/5.0'}
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
for ext in ['mp4', 'webm', 'mkv']:
f = filename.rsplit('.', 1)[0] + '.' + ext
if os.path.exists(f):
filename = f
break
return send_file(filename, mimetype='video/mp4', as_attachment=True)
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)