import gradio as gr import librosa import soundfile import tempfile import os import uuid import json import re from nemo.collections.asr.models import ASRModel from nemo.utils import logging from align import main, AlignmentConfig, ASSFileConfig SAMPLE_RATE = 16000 logging.setLevel(logging.INFO) # --- PARSING FUNCTIONS --- def parse_srt(content): pattern = re.compile(r'\d+\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n((?:(?!\n\n).)*)', re.DOTALL) matches = pattern.findall(content + "\n\n") segments = [] def time_to_sec(t_str): h, m, s_ms = t_str.split(':') s, ms = s_ms.split(',') return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000.0 for match in matches: start = time_to_sec(match[0]) end = time_to_sec(match[1]) text = match[2].replace('\n', ' ').strip() segments.append({"start": start, "end": end, "text": text}) return segments def parse_lrc(content, audio_duration): lines = content.split('\n') pattern = re.compile(r'\[(\d{2}):(\d{2}\.\d{2,3})\](.*)') segments = [] for line in lines: match = pattern.match(line.strip()) if match: m, s, text = match.groups() start = int(m) * 60 + float(s) if text.strip(): segments.append({"start": start, "text": text.strip()}) for i in range(len(segments) - 1): segments[i]["end"] = segments[i+1]["start"] if segments: segments[-1]["end"] = audio_duration return segments def parse_ass_time(t_str): h, m, s = t_str.strip().split(':') return float(h) * 3600 + float(m) * 60 + float(s) def format_ass_time(sec): sec = max(0.0, sec) nh = int(sec // 3600) nm = int((sec % 3600) // 60) ns = sec % 60 return f"{nh:d}:{nm:02d}:{ns:05.2f}" # ----------------------------- def get_audio_data_and_duration(file): data, sr = librosa.load(file) if sr != SAMPLE_RATE: data = librosa.resample(data, orig_sr=sr, target_sr=SAMPLE_RATE) # monochannel data = librosa.to_mono(data) duration = librosa.get_duration(y=data, sr=SAMPLE_RATE) return data, duration def get_char_tokens(text, model): tokens = [] for character in text: if character in model.decoder.vocabulary: tokens.append(model.decoder.vocabulary.index(character)) else: tokens.append(len(model.decoder.vocabulary)) # return unk token (same as blank token) return tokens def get_S_prime_and_T(text, model_name, model, audio_duration): if "citrinet" in model_name or "_fastconformer_" in model_name: output_timestep_duration = 0.08 elif "_conformer_" in model_name: output_timestep_duration = 0.04 elif "quartznet" in model_name: output_timestep_duration = 0.02 else: raise RuntimeError("unexpected model name") T = int(audio_duration / output_timestep_duration) + 1 if hasattr(model, 'tokenizer'): all_tokens = model.tokenizer.text_to_ids(text) elif hasattr(model.decoder, "vocabulary"): all_tokens = get_char_tokens(text, model) else: raise RuntimeError("cannot obtain tokens from this model") n_token_repetitions = 0 for i_tok in range(1, len(all_tokens)): if all_tokens[i_tok] == all_tokens[i_tok - 1]: n_token_repetitions += 1 S_prime = len(all_tokens) + n_token_repetitions return S_prime, T def delete_mp4s_except_given_filepath(filepath): files_in_dir = os.listdir() mp4_files_in_dir = [x for x in files_in_dir if x.endswith(".mp4")] for mp4_file in mp4_files_in_dir: if mp4_file != filepath: os.remove(mp4_file) def align(Microphone, File_Upload, subs_file, text, split_on_newline, progress=gr.Progress()): utt_id = uuid.uuid4() output_video_filepath = f"{utt_id}.mp4" delete_mp4s_except_given_filepath(output_video_filepath) output_info = "" ass_text = "" progress(0, desc="Validating input") if (Microphone is not None) and (File_Upload is not None): raise gr.Error("Please use either the microphone or file upload input - not both") elif (Microphone is None) and (File_Upload is None): raise gr.Error("You have to either use the microphone or upload an audio file") elif Microphone is not None: file = Microphone else: file = File_Upload audio_data, duration = get_audio_data_and_duration(file) progress(0.1, desc="Loading speech recognition model") model_name = "ayymen/stt_zgh_fastconformer_ctc_small" model = ASRModel.from_pretrained(model_name) segments = [] if subs_file is not None: with open(subs_file.name, 'r', encoding='utf-8') as f: subs_content = f.read() if subs_file.name.lower().endswith('.srt'): segments = parse_srt(subs_content) elif subs_file.name.lower().endswith('.lrc'): segments = parse_lrc(subs_content, duration) else: raise gr.Error("Subtitle file must be an .srt or .lrc file.") with tempfile.TemporaryDirectory() as tmpdir: manifest_path = os.path.join(tmpdir, f"{utt_id}_manifest.json") if segments: progress(0.2, desc="Chunking audio and generating manifest") with open(manifest_path, 'w', encoding='utf-8') as fout: for i, seg in enumerate(segments): S_prime, T = get_S_prime_and_T(seg['text'], model_name, model, seg['end'] - seg['start']) if S_prime > T: raise gr.Error(f"Segment {i} text is too long for its audio duration.") start_sample = int(seg['start'] * SAMPLE_RATE) end_sample = int(seg['end'] * SAMPLE_RATE) chunk_data = audio_data[start_sample:end_sample] chunk_path = os.path.join(tmpdir, f"{utt_id}_{i:04d}.wav") soundfile.write(chunk_path, chunk_data, SAMPLE_RATE) seg_text = seg['text'].replace('\n', '|') if split_on_newline else seg['text'].replace('\n', ' ') data = { "audio_filepath": chunk_path, "text": seg_text, } fout.write(f"{json.dumps(data)}\n") resegment_text_to_fill_space = False else: audio_path = os.path.join(tmpdir, f'{utt_id}.wav') soundfile.write(audio_path, audio_data, SAMPLE_RATE) if not text: progress(0.2, desc="Transcribing audio") text = model.transcribe([audio_path])[0] if 'hybrid' in model_name: text = text[0] if text == "": raise gr.Error("ERROR: the ASR model did not detect any speech. Please upload audio with speech.") output_info += ( "You did not enter any input text, so the ASR model's transcription will be used:\n" "--------------------------\n" f"{text}\n" "--------------------------\n" f"You could try pasting the transcription into the text input box, correcting any" " transcription errors, and clicking 'Submit' again." ) if split_on_newline: text = "|".join(list(filter(None, text.split("\n")))) S_prime, T = get_S_prime_and_T(text, model_name, model, duration) if S_prime > T: raise gr.Error("The number of tokens in the input text is too long compared to the duration of the audio.") with open(manifest_path, 'w', encoding='utf-8') as fout: data = { "audio_filepath": audio_path, "text": text, } fout.write(f"{json.dumps(data)}\n") resegment_text_to_fill_space = "|" not in text alignment_config = AlignmentConfig( pretrained_name=model_name, manifest_filepath=manifest_path, output_dir=f"{tmpdir}/nfa_output/", audio_filepath_parts_in_utt_id=1, batch_size=1, use_local_attention=True, additional_segment_grouping_separator="|", save_output_file_formats=["ass", "ctm"], ass_file_config=ASSFileConfig( fontsize=45, resegment_text_to_fill_space=resegment_text_to_fill_space, max_lines_per_segment=4, ), ) progress(0.5, desc="Aligning audio") main(alignment_config) progress(0.95, desc="Saving generated alignments") ass_path = "word_level.ass" word_ctm_path = "word_level.ctm" segment_ctm_path = "segment_level.ctm" if segments: merged_ass = "" header_written = False for i, seg in enumerate(segments): chunk_ass_path = f"{tmpdir}/nfa_output/ass/words/{utt_id}_{i:04d}.ass" if os.path.exists(chunk_ass_path): chunk_lines = [] with open(chunk_ass_path, "r", encoding='utf-8') as f: for line in f: if line.startswith("Dialogue:"): parts = line.split(",", 9) if len(parts) >= 10: chunk_lines.append(parts) elif not header_written: merged_ass += line header_written = True if chunk_lines: for j, parts in enumerate(chunk_lines): local_start = parse_ass_time(parts[1]) local_end = parse_ass_time(parts[2]) # Shift NFA's internal timing by the LRC/SRT start time global_start = local_start + seg['start'] global_end = local_end + seg['start'] # Only stretch the FIRST word backwards to fill the leading silence if j == 0: global_start = seg['start'] # Only stretch the LAST word forwards to fill the trailing silence if j == len(chunk_lines) - 1: global_end = seg['end'] # Prevent vertical stacking with the next segment (50ms gap) if i < len(segments) - 1: next_start = segments[i+1]['start'] if global_end >= next_start: global_end = next_start - 0.05 if global_start >= global_end: global_start = global_end - 0.01 parts[1] = format_ass_time(global_start) parts[2] = format_ass_time(global_end) merged_ass += ",".join(parts) with open(ass_path, "w", encoding="utf-8") as f: f.write(merged_ass) ass_text = merged_ass # Merge CTMs for ctm_type, out_path in [("words", word_ctm_path), ("segments", segment_ctm_path)]: merged_ctm = "" for i, seg in enumerate(segments): chunk_ctm_path = f"{tmpdir}/nfa_output/ctm/{ctm_type}/{utt_id}_{i:04d}.ctm" if os.path.exists(chunk_ctm_path): with open(chunk_ctm_path, "r", encoding='utf-8') as f: for line in f: parts = line.strip().split() if len(parts) >= 5: parts[0] = str(utt_id) parts[2] = f"{float(parts[2]) + seg['start']:.2f}" merged_ctm += " ".join(parts) + "\n" with open(out_path, "w", encoding="utf-8") as f: f.write(merged_ctm) else: ass_file_for_video = f"{tmpdir}/nfa_output/ass/words/{utt_id}.ass" with open(ass_file_for_video, "r", encoding="utf-8") as f: ass_text = f.read() with open(ass_path, "w", encoding="utf-8") as f: f.write(ass_text) with open(f"{tmpdir}/nfa_output/ctm/words/{utt_id}.ctm", "r", encoding="utf-8") as f: with open(word_ctm_path, "w", encoding="utf-8") as out_f: out_f.write(f.read()) with open(f"{tmpdir}/nfa_output/ctm/segments/{utt_id}.ctm", "r", encoding="utf-8") as f: with open(segment_ctm_path, "w", encoding="utf-8") as out_f: out_f.write(f.read()) full_audio_path = os.path.join(tmpdir, "full_audio.wav") soundfile.write(full_audio_path, audio_data, SAMPLE_RATE) ffmpeg_command = ( f"ffmpeg -y -i {full_audio_path} " "-f lavfi -i color=c=white:s=1280x720:r=50 " "-crf 1 -shortest -vcodec libx264 -pix_fmt yuv420p " f"-vf \"ass='{ass_path}'\" " f"{output_video_filepath}" ) os.system(ffmpeg_command) return ( output_video_filepath, gr.update(value=output_info, visible=True if output_info else False), output_video_filepath, gr.update(value=ass_path, visible=True), gr.update(value=word_ctm_path, visible=True), gr.update(value=segment_ctm_path, visible=True) ) def delete_non_tmp_video(video_path): if video_path: if os.path.exists(video_path): os.remove(video_path) return None with gr.Blocks(title="NeMo Forced Aligner", theme="huggingface") as demo: non_tmp_output_video_filepath = gr.State([]) with gr.Row(): with gr.Column(): gr.Markdown("# NeMo Forced Aligner") gr.Markdown( "Demo for [NeMo Forced Aligner](https://github.com/NVIDIA/NeMo/tree/main/tools/nemo_forced_aligner) (NFA). " "Upload audio in Tamazight and (optionally) the text spoken in the audio to generate a video where each part of the text will be highlighted as it is spoken. " "**Now supports syncing with pre-timed SRT or LRC files!**", ) gr.Markdown("You can also download CTM and ASS files to add subtitles to your videos. ") with gr.Row(): with gr.Column(scale=1): gr.Markdown("## Input") mic_in = gr.Audio(sources=["microphone"], type='filepath', label="Microphone input") audio_file_in = gr.Audio(sources=["upload"], type='filepath', label="File upload") subs_file_in = gr.File(label="[Optional] Upload an SRT or LRC file to constrain alignment to predefined timestamps", file_types=[".srt", ".lrc"]) ref_text = gr.Textbox( label="[Optional] The reference text. Use '|' separators to specify which text will appear together. " "Leave this field blank to use an ASR model's transcription as the reference text instead. (Ignored if SRT/LRC is uploaded)" ) split_on_newline = gr.Checkbox( True, label="Separate text on new lines", ) submit_button = gr.Button("Submit") with gr.Column(scale=1): gr.Markdown("## Output") video_out = gr.Video(label="Output Video") text_out = gr.Textbox(label="Output Info", visible=False) ass_file = gr.File(label="ASS File", visible=False) word_ctm_file = gr.File(label="Word-level CTM File", visible=False) segment_ctm_file = gr.File(label="Segment-level CTM File", visible=False) gr.Markdown("You can use this [space](https://huggingface.co/spaces/Tamazight-NLP/ASS_Word-Level_Converter) to convert ASS file to SRT and LRC/ELRC formats. This [space](https://huggingface.co/spaces/Tamazight-NLP/CTM-to-SRT) can be used to convert CTM file to SRT format.") with gr.Row(): gr.HTML( "
" "Tutorial: \"How to use NFA?\" ๐ | " "Blog post: \"How does forced alignment work?\" ๐ | " "NFA Github page ๐ฉโ๐ป" "
" ) submit_button.click( fn=align, inputs=[mic_in, audio_file_in, subs_file_in, ref_text, split_on_newline], outputs=[video_out, text_out, non_tmp_output_video_filepath, ass_file, word_ctm_file, segment_ctm_file], ).then( fn=delete_non_tmp_video, inputs=[non_tmp_output_video_filepath], outputs=None, ) example_2 = """โตโดฐโดฝโตโตโตโตโต โต โตโตโดฐโดทโตโดผโต. โต โตโตโต โต โตโดฑโดฑโต โดฐโตโดฐโตโตโดฐโตข โดฐโตโตโตโตโตโตโต. โดฐโตโตโตข โต โตโดฑโดฑโต โตโตโต โตโต โตโดณโดฐ โตโตโตโตโตโต โตโตโตโตโต, โตโดฑโดฑโต โต โตโตโตฅโตกโดฐโตโต, โดฝโตโดฐ โดณโดฐโต. โดฐโตโดฐโตโตโดฐโตข โดฐโตโตโตโตโตโตโต, โต โตโตโตฃโตกโดฐโตโตโต โตโตโดฐ โต โตโตโดณโดณโดฐโตโตโต. โดฐโดณโตโตโตโดท โต โตกโดฐโตโต โต โตโดผโตโดฐ, โดฐโตโต โต โตโตโตโตโตโต, โดฝโตโดฐโตโดณโดฐโต โตขโดฐโต โดท โตโดฐโดท โตโตโดฝโต. โตโดฐ โตโต โดฝโตขโตขโต โดฝโดฐ โต โตโตโตโตโตโดท, โดท โดฝโตขโตขโต โดฝโดฐ โดฐโดท โตโตโตโตโต. โตโตโตโต โดฐโต, โตโตโตโต โดฐโต, โดฐโตโดฐโตโดฐโต โตขโตโตโดทโต. โดฐโตโดฐโตโดฐโต โต โตโตกโตโตโตโต โตโตโตโตโตโดผโดฐโต, โตโต โดท โดฐโตขโต โตโตโตขโตโตโต, โตโตโดฐ โตโตโตโดนโดนโดฐโต.""" example_3 = "โดทโดฐโดณ โตโตขโตโต โต โตโดณโตโตโดฐ|โตโตโดฐโต โตโตขโต|โดณโดณโตฏโตฃ โดท!|โตโตโตโต โดฐโตโต|โตโตโตขโดฐโตโต โดฐโตฃโดทโดทโตโต|โตโตโตขโดฐโตโต โตโดฐโตโตโดฐ โดท โตโตโตโตโดฐโตกโต" examples = gr.Examples( examples=[ ["common_voice_zgh_37837257.mp3", None, "โตโต โตโตขโต โตโดฐโดท โดท โตโดปโตโตโตโตโตโดท โดฐโดท โดฐโดฝ โตโตโต โตโดฐโดท โตโดณโตโดท"], ["Voice1410.wav", None, example_2], ["Tamazight_For_All.mp3", None, example_3] ], inputs=[audio_file_in, subs_file_in, ref_text] ) demo.queue() demo.launch()