| from typing import Iterator, TextIO, List, Dict, Any, Optional, Sequence, Union |
| from utils import getSubs, str2time, maintain_aspect_ratio_resize |
| from moviepy import VideoFileClip |
| import whisper |
| import os |
| import cv2 |
| import webvtt |
| from PIL import Image |
| from tqdm import tqdm |
| import json |
| from langchain_yt_dlp.youtube_loader import YoutubeLoaderDL |
| from transformers import BlipProcessor, BlipForConditionalGeneration |
|
|
|
|
| |
| def get_video_metdata(video_url:str): |
| docs = YoutubeLoaderDL.from_youtube_url(video_url, add_video_info=True).load() |
| return docs[0].metadata |
|
|
| |
| def extract_audio(path_to_video:str, output_folder:str): |
| video_name = os.path.basename(path_to_video).replace('.mp4', '') |
|
|
| |
| path_to_extracted_audio_file = os.path.join(output_folder, 'audio.mp3') |
|
|
| |
| clip = VideoFileClip(path_to_video) |
| clip.audio.write_audiofile(path_to_extracted_audio_file) |
| return path_to_extracted_audio_file |
|
|
|
|
| |
| def transcribe_video(path_to_extracted_audio_file, output_folder, whisper_model=None): |
| |
| if whisper_model is None: |
| whisper_model = whisper.load_model("small") |
| options = dict(task="translate", best_of=1, language='en') |
| results = whisper_model.transcribe(path_to_extracted_audio_file, **options) |
|
|
| vtt = getSubs(results["segments"], "vtt") |
| |
| video_name = os.path.basename(path_to_video).replace('.mp4', '') |
| path_to_generated_transcript = os.path.join(output_folder, f'{video_name}.vtt') |
| |
| |
| with open(path_to_generated_transcript, 'w') as f: |
| f.write(vtt) |
| return path_to_generated_transcript |
|
|
|
|
| |
| def extract_and_save_frames_and_metadata( |
| path_to_video, |
| path_to_transcript, |
| path_to_save_extracted_frames, |
| path_to_save_metadatas): |
|
|
| |
| metadatas = [] |
|
|
| |
| video = cv2.VideoCapture(path_to_video) |
| |
| trans = webvtt.read(path_to_transcript) |
|
|
| |
| |
| for idx, transcript in enumerate(trans): |
| |
| start_time_ms = str2time(transcript.start) |
| end_time_ms = str2time(transcript.end) |
| |
| |
| mid_time_ms = (end_time_ms + start_time_ms) / 2 |
| |
| text = transcript.text.replace("\n", ' ') |
| |
| video.set(cv2.CAP_PROP_POS_MSEC, mid_time_ms) |
| success, frame = video.read() |
| if success: |
| |
| image = maintain_aspect_ratio_resize(frame, height=350) |
| |
| img_fname = f'frame_{idx}.jpg' |
| img_fpath = os.path.join( |
| path_to_save_extracted_frames, img_fname |
| ) |
| cv2.imwrite(img_fpath, image) |
|
|
| |
| metadata = { |
| 'extracted_frame_path': img_fpath, |
| 'transcript': text, |
| 'video_segment_id': idx, |
| 'video_path': path_to_video, |
| 'start_time': transcript.start, |
| 'end_time': transcript.end |
| } |
| metadatas.append(metadata) |
| else: |
| print(f"ERROR! Cannot extract frame: idx = {idx}") |
|
|
| |
| metadatas = update_transcript(metadatas) |
|
|
| |
| fn = os.path.join(path_to_save_metadatas, 'metadatas.json') |
| with open(fn, 'w') as outfile: |
| json.dump(metadatas, outfile) |
| return metadatas |
|
|
|
|
| def update_transcript(vid_metadata, n=7): |
| vid_trans = [frame['transcript'] for frame in vid_metadata] |
| updated_vid_trans = [ |
| ' '.join(vid_trans[i-int(n/2) : i+int(n/2)]) if i-int(n/2) >= 0 else |
| ' '.join(vid_trans[0 : i + int(n/2)]) for i in range(len(vid_trans)) |
| ] |
|
|
| |
| for i in range(len(updated_vid_trans)): |
| vid_metadata[i]['transcript'] = updated_vid_trans[i] |
| return vid_metadata |
|
|
|
|
| |
| def get_video_caption(path_to_video_frames: List, metadatas, output_folder_path:str, vlm=None, vlm_processor=None): |
| if vlm is None or vlm_processor is None: |
| vlm_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") |
| vlm = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base") |
|
|
| frame_caption = {} |
| for i, frame_path in enumerate(tqdm(path_to_video_frames, desc="Captioning frames")): |
| |
| frame = Image.open(frame_path) |
| inputs = vlm_processor(frame, return_tensors="pt") |
|
|
| out = vlm.generate(**inputs) |
| caption = vlm_processor.decode(out[0], skip_special_tokens=True) |
| frame_caption[frame_path] = caption |
|
|
| caption_out_path = os.path.join(output_folder_path, 'captions.json') |
| with open(caption_out_path, 'w') as outfile: |
| json.dump(frame_caption, outfile) |
| |
| |
| for frame_metadata in metadatas: |
| frame_metadata['caption'] = frame_caption[frame_metadata['extracted_frame_path']] |
|
|
| metadatas_out_path = os.path.join(output_folder_path, 'metadatas.json') |
| with open(metadatas_out_path, 'w') as outfile: |
| json.dump(metadatas, outfile) |
| return metadatas_out_path |
|
|
|
|
|
|