Spaces:
Runtime error
Runtime error
| import os | |
| import pysrt | |
| from moviepy import VideoFileClip | |
| import shutil | |
| from PIL import Image, ImageOps | |
| import numpy as np | |
| import speech_recognition as sr | |
| def get_images_from_video(video_path, fps=0.2): | |
| """Extract frames from a video file at specified FPS. | |
| Args: | |
| video_path (str): Path to the video file. | |
| fps (float, optional): Frames per second to extract. Defaults to 0.2. | |
| Returns: | |
| list: List of frames as numpy arrays. | |
| """ | |
| clip = VideoFileClip(video_path) | |
| images = clip.iter_frames(fps=fps) | |
| return images | |
| def image_with_most_non_black_space(images, output_path, return_type="path"): | |
| """Find and save the image with the most non-black space from a list of images. | |
| Args: | |
| images (list): List of image file paths, PIL Image objects, or numpy arrays. | |
| output_path (str): Path where the output image should be saved. | |
| return_type (str, optional): Type of return value - "path" or "image". Defaults to "path". | |
| Returns: | |
| Union[str, PIL.Image, None]: Path to saved image, PIL Image object, or None if no valid image found. | |
| """ | |
| max_non_black_area = 0 | |
| image_with_max_non_black_space = None | |
| for img in images: | |
| try: | |
| # If img is a path, open the image | |
| if isinstance(img, str): | |
| image = Image.open(img) | |
| elif isinstance(img, Image.Image): | |
| image = img | |
| elif isinstance(img, np.ndarray): | |
| image = Image.fromarray(img) | |
| else: | |
| print(f"Unsupported type: {type(img)}. Skipping.") | |
| continue | |
| # Convert to grayscale | |
| gray = ImageOps.grayscale(image) | |
| # Convert to numpy array | |
| gray_array = np.array(gray) | |
| # Count non-black pixels (threshold to consider near-black as black) | |
| non_black_pixels = np.sum(gray_array > 10) # Threshold 10 to account for slight variations in black | |
| if non_black_pixels > max_non_black_area: | |
| max_non_black_area = non_black_pixels | |
| image_with_max_non_black_space = image | |
| except Exception as e: | |
| print(f"Warning: Unable to process image {img}: {e}") | |
| if image_with_max_non_black_space is not None: | |
| image_with_max_non_black_space.save(output_path) | |
| print(f"Saved image with most non-black space to {output_path}") | |
| if return_type == "path": | |
| return output_path | |
| else: | |
| return image_with_max_non_black_space | |
| return image_with_max_non_black_space | |
| def parse_srt_to_text(output_dir, topic_name): | |
| """Convert SRT subtitle file to plain text. | |
| Args: | |
| output_dir (str): Directory containing the topic folders. | |
| topic_name (str): Name of the topic/video. | |
| """ | |
| topic_name = topic_name.replace(" ", "_").lower() | |
| srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt") | |
| txt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.txt") | |
| subs = pysrt.open(srt_path) | |
| with open(txt_path, 'w') as f: | |
| full_text = "" | |
| for sub in subs: | |
| sub.text = sub.text.replace("...", ".") | |
| full_text += sub.text + " " | |
| f.write(full_text.strip()) | |
| def parse_srt_and_extract_frames(output_dir, topic_name): | |
| """Extract frames from video at subtitle timestamps and save with corresponding text. | |
| Args: | |
| output_dir (str): Directory containing the topic folders. | |
| topic_name (str): Name of the topic/video. | |
| """ | |
| topic_name = topic_name.replace(" ", "_").lower() | |
| video_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.mp4") | |
| srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt") | |
| subs = pysrt.open(srt_path) | |
| # Create extract_images folder if it doesn't exist | |
| images_dir = os.path.join(output_dir, topic_name, "extract_images") | |
| if os.path.exists(images_dir): | |
| shutil.rmtree(images_dir) | |
| os.makedirs(images_dir) | |
| # Load the video file | |
| video = VideoFileClip(video_path) | |
| # Dictionary to store image-text pairs | |
| pairs = {} | |
| i = 0 | |
| while i < len(subs): | |
| sub = subs[i] | |
| text = sub.text | |
| sub_indexes = [sub.index] | |
| # Check if we need to concatenate with next subtitle | |
| while i < len(subs) - 1 and not text.strip().endswith('.'): | |
| i += 1 | |
| next_sub = subs[i] | |
| text += " " + next_sub.text | |
| sub_indexes.append(next_sub.index) | |
| # Get the end time of the last concatenated subtitle | |
| end_time = sub.end.to_time() | |
| # Convert end time to seconds | |
| end_time_seconds = end_time.hour * 3600 + end_time.minute * 60 + end_time.second + end_time.microsecond / 1e6 | |
| # Save the frame as an image in extract_images folder | |
| frame_path = os.path.join(images_dir, f"{sub.index}.jpg") | |
| video.save_frame(frame_path, t=end_time_seconds) | |
| # Save the subtitle text to a txt file | |
| text_path = os.path.join(images_dir, f"{sub.index}.txt") | |
| with open(text_path, 'w') as f: | |
| f.write(text) | |
| # Add pair to dictionary | |
| pairs[str(sub.index)] = { | |
| "image_path": f"{sub.index}.jpg", | |
| "text": text, | |
| "text_path": f"{sub.index}.txt", | |
| "srt_index": sub_indexes, | |
| } | |
| i += 1 | |
| # Save pairs to json file | |
| import json | |
| json_path = os.path.join(images_dir, "pairs.json") | |
| with open(json_path, 'w') as f: | |
| json.dump(pairs, f, indent=4) | |
| # Close the video file | |
| video.close() | |
| def extract_trasnscript(video_path): | |
| """Extract transcript from video audio using Google Speech Recognition. | |
| Args: | |
| video_path (str): Path to the video file. | |
| Returns: | |
| str: Transcribed text from the video audio. | |
| Raises: | |
| FileNotFoundError: If video file does not exist. | |
| """ | |
| if not os.path.exists(video_path): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| clip = VideoFileClip(video_path) | |
| # write the video to a temporary audio file | |
| audio_path = os.path.join(os.path.dirname(video_path), "audio.wav") | |
| clip.audio.write_audiofile(audio_path) | |
| try: | |
| # extract the subtitles from the audio file | |
| recognizer = sr.Recognizer() | |
| with sr.AudioFile(audio_path) as source: | |
| audio = recognizer.record(source) | |
| return recognizer.recognize_google(audio) | |
| finally: | |
| # clean up the temporary audio file | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| if __name__ == "__main__": | |
| import argparse | |
| def process_all_topics(output_folder): | |
| """Process all topic folders in the output directory. | |
| Args: | |
| output_folder (str): Directory containing the topic folders. | |
| """ | |
| # Only get immediate subdirectories | |
| topics = [d for d in os.listdir(output_folder) | |
| if os.path.isdir(os.path.join(output_folder, d))] | |
| for topic in topics: | |
| print(f"\nProcessing topic: {topic}") | |
| try: | |
| parse_srt_to_text(output_folder, topic) | |
| parse_srt_and_extract_frames(output_folder, topic) | |
| except Exception as e: | |
| print(f"Error processing {topic}: {str(e)}") | |
| continue | |
| # Set up argument parser | |
| parser = argparse.ArgumentParser(description='Process video files and extract frames with subtitles') | |
| parser.add_argument('--output_dir', type=str, default="output", | |
| help='Directory containing the topic folders') | |
| args = parser.parse_args() | |
| # Process topics using provided output directory | |
| process_all_topics(args.output_dir) |