""" Rebel Face Swap — AI Face Swap for Images & Video Token-gated, login-based face swap app using open-source AI. """ import os, json, uuid, hashlib, time, threading, shutil, tempfile from datetime import datetime from flask import Flask, request, jsonify, render_template, send_from_directory, session, redirect, url_for from gradio_client import Client, handle_file from functools import wraps app = Flask(__name__) app.secret_key = os.environ.get("SECRET_KEY", "rebel-face-swap-secret-2026") # ── Config ────────────────────────────────────────────────────────── UPLOAD_FOLDER = "/tmp/rebel_uploads" OUTPUT_FOLDER = "/tmp/rebel_outputs" DB_FILE = "/tmp/rebel_faceswap_db.json" os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(OUTPUT_FOLDER, exist_ok=True) FREE_CREDITS = 5 IMAGE_SWAP_COST = 1 VIDEO_SWAP_COST = 3 IMAGE_SPACE = "tonyassi/face-swap" VIDEO_SPACE = "tonyassi/video-face-swap" # ── Database (JSON file) ──────────────────────────────────────────── db_lock = threading.Lock() def load_db(): if os.path.exists(DB_FILE): with open(DB_FILE, "r") as f: return json.load(f) return {"users": {}, "jobs": {}, "purchases": []} def save_db(db): with open(DB_FILE, "w") as f: json.dump(db, f, indent=2) def hash_pw(pw): return hashlib.sha256(pw.encode()).hexdigest() # ── Auth decorator ────────────────────────────────────────────────── def login_required(f): @wraps(f) def decorated(*args, **kwargs): if "user_id" not in session: if request.is_json or request.path.startswith("/api/"): return jsonify({"error": "Not logged in"}), 401 return redirect(url_for("login_page")) return f(*args, **kwargs) return decorated # ── Job queue ─────────────────────────────────────────────────────── jobs = {} jobs_lock = threading.Lock() def process_image_swap(job_id, source_path, target_path): """Background thread: image face swap via HF Space.""" try: with jobs_lock: jobs[job_id]["status"] = "processing" client = Client(IMAGE_SPACE) result = client.predict( src_img=handle_file(source_path), dest_img=handle_file(target_path), api_name="/swap_faces" ) # result is the output image path out_name = f"{job_id}.png" out_path = os.path.join(OUTPUT_FOLDER, out_name) shutil.copy(result, out_path) with jobs_lock: jobs[job_id]["status"] = "done" jobs[job_id]["result"] = out_name jobs[job_id]["result_type"] = "image" except Exception as e: with jobs_lock: jobs[job_id]["status"] = "error" jobs[job_id]["error"] = str(e) def process_video_swap(job_id, source_path, video_path, gender="all"): """Background thread: video face swap via HF Space.""" try: with jobs_lock: jobs[job_id]["status"] = "processing" client = Client(VIDEO_SPACE) result = client.predict( input_image=handle_file(source_path), input_video=handle_file(video_path), gender=gender, api_name="/generate" ) out_name = f"{job_id}.mp4" out_path = os.path.join(OUTPUT_FOLDER, out_name) shutil.copy(result, out_path) with jobs_lock: jobs[job_id]["status"] = "done" jobs[job_id]["result"] = out_name jobs[job_id]["result_type"] = "video" except Exception as e: with jobs_lock: jobs[job_id]["status"] = "error" jobs[job_id]["error"] = str(e) # ── Pages ─────────────────────────────────────────────────────────── @app.route("/") def index(): if "user_id" in session: return redirect(url_for("app_page")) return redirect(url_for("login_page")) @app.route("/login") def login_page(): return render_template("login.html") @app.route("/signup") def signup_page(): return render_template("signup.html") @app.route("/app") @login_required def app_page(): return render_template("app.html") @app.route("/pricing") def pricing_page(): return render_template("pricing.html", logged_in="user_id" in session) @app.route("/admin") def admin_page(): key = request.args.get("key", "") if key != "rebel-admin-2026": return "Unauthorized", 403 return render_template("admin.html") # ── Auth API ──────────────────────────────────────────────────────── @app.route("/api/signup", methods=["POST"]) def api_signup(): data = request.json email = data.get("email", "").strip().lower() password = data.get("password", "") username = data.get("username", "").strip() if not email or not password or not username: return jsonify({"error": "All fields required"}), 400 with db_lock: db = load_db() if email in db["users"]: return jsonify({"error": "Email already registered"}), 400 user_id = str(uuid.uuid4())[:8] db["users"][email] = { "id": user_id, "username": username, "email": email, "password": hash_pw(password), "credits": FREE_CREDITS, "created": datetime.utcnow().isoformat(), "swaps_done": 0 } save_db(db) session["user_id"] = user_id session["email"] = email return jsonify({"success": True, "credits": FREE_CREDITS}) @app.route("/api/login", methods=["POST"]) def api_login(): data = request.json email = data.get("email", "").strip().lower() password = data.get("password", "") with db_lock: db = load_db() user = db["users"].get(email) if not user or user["password"] != hash_pw(password): return jsonify({"error": "Invalid credentials"}), 401 session["user_id"] = user["id"] session["email"] = email return jsonify({"success": True, "credits": user["credits"]}) @app.route("/api/logout", methods=["POST"]) def api_logout(): session.clear() return jsonify({"success": True}) @app.route("/api/me") @login_required def api_me(): with db_lock: db = load_db() user = db["users"].get(session["email"], {}) return jsonify({ "username": user.get("username", ""), "email": user.get("email", ""), "credits": user.get("credits", 0), "swaps_done": user.get("swaps_done", 0) }) # ── Swap API ──────────────────────────────────────────────────────── @app.route("/api/swap", methods=["POST"]) @login_required def api_swap(): swap_type = request.form.get("type", "image") # image or video gender = request.form.get("gender", "all") cost = IMAGE_SWAP_COST if swap_type == "image" else VIDEO_SWAP_COST # Check credits with db_lock: db = load_db() user = db["users"].get(session["email"]) if not user: return jsonify({"error": "User not found"}), 404 if user["credits"] < cost: return jsonify({"error": f"Not enough credits. Need {cost}, have {user['credits']}"}), 402 # Get files source = request.files.get("source") target = request.files.get("target") if not source or not target: return jsonify({"error": "Source face image and target file required"}), 400 # Save uploads job_id = str(uuid.uuid4())[:12] source_ext = os.path.splitext(source.filename)[1] or ".png" target_ext = os.path.splitext(target.filename)[1] or (".png" if swap_type == "image" else ".mp4") source_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_source{source_ext}") target_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_target{target_ext}") source.save(source_path) target.save(target_path) # Deduct credits with db_lock: db = load_db() db["users"][session["email"]]["credits"] -= cost db["users"][session["email"]]["swaps_done"] += 1 save_db(db) # Start job with jobs_lock: jobs[job_id] = { "status": "queued", "type": swap_type, "user": session["email"], "created": time.time() } if swap_type == "image": t = threading.Thread(target=process_image_swap, args=(job_id, source_path, target_path)) else: t = threading.Thread(target=process_video_swap, args=(job_id, source_path, target_path, gender)) t.daemon = True t.start() return jsonify({"job_id": job_id, "credits_remaining": db["users"][session["email"]]["credits"]}) @app.route("/api/job/") @login_required def api_job_status(job_id): with jobs_lock: job = jobs.get(job_id) if not job: return jsonify({"error": "Job not found"}), 404 resp = {"status": job["status"], "type": job.get("type", "image")} if job["status"] == "done": resp["result_url"] = f"/output/{job['result']}" resp["result_type"] = job.get("result_type", "image") elif job["status"] == "error": resp["error"] = job.get("error", "Unknown error") return jsonify(resp) @app.route("/output/") @login_required def serve_output(filename): return send_from_directory(OUTPUT_FOLDER, filename) # ── Credits / Payment API ────────────────────────────────────────── @app.route("/api/add_credits", methods=["POST"]) @login_required def api_add_credits(): """Manual credit add (for PayPal verification or admin).""" data = request.json amount = data.get("amount", 0) with db_lock: db = load_db() if session["email"] in db["users"]: db["users"][session["email"]]["credits"] += amount db["purchases"].append({ "email": session["email"], "amount": amount, "timestamp": datetime.utcnow().isoformat() }) save_db(db) return jsonify({"success": True, "credits": db["users"][session["email"]]["credits"]}) # ── Admin API ─────────────────────────────────────────────────────── @app.route("/api/admin/stats") def api_admin_stats(): key = request.args.get("key", "") if key != "rebel-admin-2026": return jsonify({"error": "Unauthorized"}), 403 with db_lock: db = load_db() total_users = len(db["users"]) total_swaps = sum(u.get("swaps_done", 0) for u in db["users"].values()) total_purchases = len(db["purchases"]) users_list = [ {"username": u["username"], "email": u["email"], "credits": u["credits"], "swaps_done": u.get("swaps_done", 0), "created": u.get("created", "")} for u in db["users"].values() ] return jsonify({ "total_users": total_users, "total_swaps": total_swaps, "total_purchases": total_purchases, "users": users_list, "purchases": db["purchases"] }) # ── Static ────────────────────────────────────────────────────────── @app.route("/static/") def serve_static(filename): return send_from_directory("static", filename) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=False)