| from flask import Flask, request, jsonify, Response |
| from faster_whisper import WhisperModel |
| import torch |
| import time |
| import datetime |
| from threading import Semaphore |
| import os |
| from werkzeug.utils import secure_filename |
| import tempfile |
| from moviepy.editor import VideoFileClip |
| import logging |
| import torchaudio |
| import ffmpeg |
|
|
| |
| |
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
| app = Flask(__name__) |
|
|
| |
| |
| |
| MAX_CONCURRENT_REQUESTS = 1 |
| MAX_FILE_DURATION = 60 * 30 |
| TEMPORARY_FOLDER = tempfile.gettempdir() |
| ALLOWED_AUDIO_EXTENSIONS = {'mp3', 'wav', 'ogg', 'm4a', 'flac', 'aac', 'wma', 'opus', 'aiff'} |
| ALLOWED_VIDEO_EXTENSIONS = {'mp4', 'avi', 'mov', 'mkv', 'webm', 'flv', 'wmv', 'mpeg', 'mpg', '3gp'} |
| ALLOWED_EXTENSIONS = ALLOWED_AUDIO_EXTENSIONS.union(ALLOWED_VIDEO_EXTENSIONS) |
|
|
| API_KEY = os.environ.get("API_KEY") |
| MODEL_NAME = os.environ.get("WHISPER_MODEL", "guillaumekln/faster-whisper-large-v2") |
|
|
| |
| DEFAULT_INITIAL_PROMPT = "請使用繁體中文輸出" |
|
|
| |
| |
| |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| compute_type = "float16" if device == "cuda" else "int8" |
| logging.info(f"使用設備: {device},計算類型: {compute_type}") |
|
|
| beamsize = 2 |
|
|
| try: |
| wmodel = WhisperModel( |
| MODEL_NAME, |
| device=device, |
| compute_type=compute_type, |
| download_root="./model_cache" |
| ) |
| logging.info(f"模型 {MODEL_NAME} 載入成功.") |
| except Exception as e: |
| logging.error(f"載入模型 {MODEL_NAME} 失敗: {e}") |
| wmodel = None |
|
|
| |
| |
| |
| request_semaphore = Semaphore(MAX_CONCURRENT_REQUESTS) |
| active_requests = 0 |
|
|
| |
| |
| |
| def validate_api_key(req): |
| api_key = req.headers.get('X-API-Key') |
| return api_key == API_KEY if API_KEY else True |
|
|
| def allowed_file(filename): |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
| def cleanup_temp_files(*file_paths): |
| for file_path in file_paths: |
| try: |
| if file_path and os.path.exists(file_path): |
| os.remove(file_path) |
| logging.info(f"刪除暫存檔案: {file_path}") |
| except Exception as e: |
| logging.error(f"刪除暫存檔案 {file_path} 出錯: {str(e)}") |
|
|
| def extract_audio_from_video(video_path, output_audio_path): |
| """ |
| 使用 ffmpeg 從影片擷取 PCM WAV,並用 moviepy 檢查長度 |
| """ |
| try: |
| |
| ffmpeg.input(video_path).output( |
| output_audio_path, |
| acodec='pcm_s16le' |
| |
| ).run(capture_stdout=True, capture_stderr=True) |
|
|
| |
| video = VideoFileClip(video_path) |
| if video.duration > MAX_FILE_DURATION: |
| video.close() |
| raise ValueError(f"視頻時長超過 {MAX_FILE_DURATION} 秒") |
| video.close() |
|
|
| return output_audio_path |
| except Exception as e: |
| logging.exception("提取視頻中的音訊出錯") |
| raise Exception(f"提取視頻中的音訊出錯: {str(e)}") |
|
|
| def fmt_mmss_mmm(seconds: float) -> str: |
| """ |
| 轉成 MM:SS.mmm(符合需求,如 00:01.000) |
| 若未來需要小時欄位,可改為 HH:MM:SS.mmm。 |
| """ |
| if seconds is None: |
| seconds = 0.0 |
| total_ms = int(round(seconds * 1000)) |
| minutes, ms = divmod(total_ms, 60_000) |
| sec, ms = divmod(ms, 1000) |
| return f"{minutes:02d}:{sec:02d}.{ms:03d}" |
|
|
| def read_lang_param_with_default_zh(): |
| """ |
| 讀取 ?lang= 參數;沒帶或為 auto 時預設繁體中文 (zh) |
| """ |
| lang_param = request.args.get("lang", "").strip() |
| if not lang_param or lang_param.lower() == "auto": |
| return "zh" |
| return lang_param |
|
|
| def read_initial_prompt(): |
| """ |
| 讀取 ?prompt= 參數;沒帶則使用 DEFAULT_INITIAL_PROMPT |
| """ |
| prompt = request.args.get("prompt", "").strip() |
| return prompt if prompt else DEFAULT_INITIAL_PROMPT |
|
|
| def run_transcribe_pipeline(uploaded_file_path: str, file_extension: str): |
| """ |
| 共用的轉錄流程:處理影片/音訊、長度檢查、呼叫 Faster-Whisper。 |
| 回傳:(segments_iterable, is_video, temp_audio_path) |
| """ |
| is_video = file_extension in ALLOWED_VIDEO_EXTENSIONS |
| temp_audio_path = None |
|
|
| if is_video: |
| temp_audio_path = os.path.join(TEMPORARY_FOLDER, f"temp_audio_{int(time.time())}.wav") |
| extract_audio_from_video(uploaded_file_path, temp_audio_path) |
| transcription_file = temp_audio_path |
| else: |
| transcription_file = uploaded_file_path |
| |
| try: |
| waveform, sample_rate = torchaudio.load(transcription_file, format=file_extension) |
| duration = waveform.size(1) / sample_rate |
| if duration > MAX_FILE_DURATION: |
| raise ValueError(f"音訊時長超過 {MAX_FILE_DURATION} 秒") |
| except Exception: |
| logging.exception(f"使用 torchaudio.load 載入音訊檔出錯: {transcription_file}") |
| try: |
| torchaudio.set_audio_backend("soundfile") |
| waveform, sample_rate = torchaudio.load(transcription_file) |
| duration = waveform.size(1) / sample_rate |
| if duration > MAX_FILE_DURATION: |
| raise ValueError(f"音訊時長超過 {MAX_FILE_DURATION} 秒") |
| except Exception as soundfile_err: |
| logging.exception(f"使用 soundfile 後端載入音訊檔出錯: {transcription_file}") |
| raise Exception(f'使用兩個後端載入音訊檔都出錯: {str(soundfile_err)}') |
| finally: |
| torchaudio.set_audio_backend("default") |
|
|
| |
| language = read_lang_param_with_default_zh() |
| initial_prompt = read_initial_prompt() |
|
|
| |
| segments, info = wmodel.transcribe( |
| transcription_file, |
| beam_size=beamsize, |
| vad_filter=True, |
| without_timestamps=False, |
| compression_ratio_threshold=2.4, |
| word_timestamps=False, |
| language=language, |
| initial_prompt=initial_prompt |
| ) |
|
|
| return segments, is_video, temp_audio_path |
|
|
| |
| |
| |
| @app.route("/health", methods=["GET"]) |
| def health_check(): |
| return jsonify({ |
| 'status': 'API 正在運行', |
| 'timestamp': datetime.datetime.now().isoformat(), |
| 'device': device, |
| 'compute_type': compute_type, |
| 'active_requests': active_requests, |
| 'max_duration_supported': MAX_FILE_DURATION, |
| 'supported_formats': list(ALLOWED_EXTENSIONS), |
| 'model': MODEL_NAME, |
| 'default_language': 'zh', |
| 'default_initial_prompt': DEFAULT_INITIAL_PROMPT |
| }) |
|
|
| @app.route("/status/busy", methods=["GET"]) |
| def server_busy(): |
| is_busy = active_requests >= MAX_CONCURRENT_REQUESTS |
| return jsonify({ |
| 'is_busy': is_busy, |
| 'active_requests': active_requests, |
| 'max_capacity': MAX_CONCURRENT_REQUESTS |
| }) |
|
|
| |
| |
| |
| @app.route("/whisper_transcribe", methods=["POST"]) |
| def transcribe_json(): |
| global active_requests |
|
|
| if not validate_api_key(request): |
| return jsonify({'error': '無效的 API 金鑰'}), 401 |
|
|
| if not request_semaphore.acquire(blocking=False): |
| return jsonify({'error': '伺服器繁忙'}), 503 |
|
|
| active_requests += 1 |
| t0 = time.time() |
| temp_file_path = None |
| temp_audio_path = None |
|
|
| try: |
| if wmodel is None: |
| return jsonify({'error': '模型載入失敗。請檢查伺服器日誌。'}), 500 |
|
|
| if 'file' not in request.files: |
| return jsonify({'error': '未提供檔'}), 400 |
|
|
| file = request.files['file'] |
| if not (file and allowed_file(file.filename)): |
| return jsonify({'error': f'無效的檔案格式。支持:{", ".join(ALLOWED_EXTENSIONS)}'}), 400 |
|
|
| |
| temp_file_path = os.path.join(TEMPORARY_FOLDER, secure_filename(file.filename)) |
| file.save(temp_file_path) |
|
|
| file_extension = file.filename.rsplit('.', 1)[1].lower() |
|
|
| |
| try: |
| segments_iter, is_video, temp_audio_path = run_transcribe_pipeline(temp_file_path, file_extension) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| |
| results = [] |
| for seg in segments_iter: |
| start = seg.start or 0.0 |
| end = seg.end or 0.0 |
| text = (seg.text or "").strip() |
| results.append({ |
| "start": fmt_mmss_mmm(start), |
| "end": fmt_mmss_mmm(end), |
| "text": text |
| }) |
|
|
| return jsonify({ |
| 'file_type': 'video' if is_video else 'audio', |
| 'segments': results |
| }), 200 |
|
|
| except Exception as e: |
| logging.exception("轉錄過程中發生異常") |
| return jsonify({'error': str(e)}), 500 |
|
|
| finally: |
| cleanup_temp_files(temp_file_path, temp_audio_path) |
| active_requests -= 1 |
| request_semaphore.release() |
| logging.info(f"/whisper_transcribe 用時:{time.time() - t0:.2f}s (活動請求:{active_requests})") |
|
|
| |
| |
| |
| @app.route("/whisper_transcribe_text", methods=["POST"]) |
| def transcribe_text_only(): |
| global active_requests |
|
|
| if not validate_api_key(request): |
| return jsonify({'error': '無效的 API 金鑰'}), 401 |
|
|
| if not request_semaphore.acquire(blocking=False): |
| return jsonify({'error': '伺服器繁忙'}), 503 |
|
|
| active_requests += 1 |
| t0 = time.time() |
| temp_file_path = None |
| temp_audio_path = None |
|
|
| try: |
| if wmodel is None: |
| return jsonify({'error': '模型載入失敗。請檢查伺服器日誌。'}), 500 |
|
|
| if 'file' not in request.files: |
| return jsonify({'error': '未提供檔'}), 400 |
|
|
| file = request.files['file'] |
| if not (file and allowed_file(file.filename)): |
| return jsonify({'error': f'無效的檔案格式。支持:{", ".join(ALLOWED_EXTENSIONS)}'}), 400 |
|
|
| |
| temp_file_path = os.path.join(TEMPORARY_FOLDER, secure_filename(file.filename)) |
| file.save(temp_file_path) |
|
|
| file_extension = file.filename.rsplit('.', 1)[1].lower() |
|
|
| |
| try: |
| segments_iter, is_video, temp_audio_path = run_transcribe_pipeline(temp_file_path, file_extension) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| |
| full_text = " ".join((seg.text or "").strip() for seg in segments_iter if (seg.text or "").strip()) |
|
|
| |
| return Response(full_text, mimetype="text/plain; charset=utf-8"), 200 |
|
|
| except Exception as e: |
| logging.exception("轉錄過程中發生異常") |
| return jsonify({'error': str(e)}), 500 |
|
|
| finally: |
| cleanup_temp_files(temp_file_path, temp_audio_path) |
| active_requests -= 1 |
| request_semaphore.release() |
| logging.info(f"/whisper_transcribe_text 用時:{time.time() - t0:.2f}s (活動請求:{active_requests})") |
|
|
|
|
| if __name__ == "__main__": |
| if not os.path.exists(TEMPORARY_FOLDER): |
| os.makedirs(TEMPORARY_FOLDER) |
| logging.info(f"新建暫存檔案夾: {TEMPORARY_FOLDER}") |
|
|
| app.run(host="0.0.0.0", port=7860, threaded=True) |