Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import uuid | |
| import subprocess | |
| import time | |
| import os | |
| import tempfile | |
| import subprocess | |
| from typing import Optional, Tuple, List | |
| import pytube | |
| import docx | |
| import PyPDF2 | |
| import re | |
| from src.quiz_processing import analyze_document | |
| def extract_audio_from_video(video_path, output_format="mp3"): | |
| if not video_path: | |
| return None | |
| output_path = f"audio_{uuid.uuid4().hex[:6]}.{output_format}" | |
| try: | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", video_path, | |
| "-vn", | |
| "-c:a", "libmp3lame" if output_format == "mp3" else output_format, | |
| "-q:a", "9", | |
| "-ac", "1", | |
| "-ar", "12000", | |
| "-y", output_path | |
| ] | |
| subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if os.path.exists(output_path): | |
| return output_path | |
| else: | |
| raise Exception("Audio extraction failed") | |
| except Exception as e: | |
| raise Exception(f"Error extracting audio: {str(e)}") | |
| def transcribe_audio(audio_path, elevenlabs_api_key, model_id="scribe_v1"): | |
| import requests | |
| import tempfile | |
| try: | |
| url = "https://api.elevenlabs.io/v1/speech-to-text" | |
| headers = {"xi-api-key": elevenlabs_api_key} | |
| with open(audio_path, "rb") as file: | |
| files = {"file": file} | |
| data = {"model_id": model_id} | |
| response = requests.post( | |
| url, | |
| headers=headers, | |
| files=files, | |
| data=data, | |
| timeout=120 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| transcription = result.get('text', '') | |
| # Save transcription to file | |
| transcript_path = tempfile.mktemp(suffix='.txt') | |
| with open(transcript_path, 'w', encoding='utf-8') as f: | |
| f.write(transcription) | |
| return transcription, transcript_path, "Transcription completed successfully" | |
| else: | |
| return None, None, f"Transcription failed: {response.text}" | |
| except Exception as e: | |
| return None, None, f"Transcription error: {str(e)}" | |
| def process_video_file(video_path, audio_format, elevenlabs_api_key, model_id, gemini_api_key, language, content_type): | |
| try: | |
| audio_path = extract_audio_from_video(video_path, audio_format) | |
| transcription, transcript_path, transcription_status = transcribe_audio( | |
| audio_path, | |
| elevenlabs_api_key, | |
| model_id | |
| ) | |
| if not transcription: | |
| return audio_path, "Audio extracted, but transcription failed", None, transcription_status, None, None, None | |
| # Generate summary or quiz from transcription | |
| formatted_output, json_path, txt_path = analyze_document( | |
| transcription, | |
| gemini_api_key, | |
| language, | |
| content_type | |
| ) | |
| return audio_path, "Processing completed successfully", transcript_path, transcription_status, formatted_output, txt_path, json_path | |
| except Exception as e: | |
| error_message = f"Error processing video: {str(e)}" | |
| return None, error_message, None, error_message, error_message, None, None | |
| def process_youtube_video(youtube_url, audio_format, elevenlabs_api_key, model_id, gemini_api_key, language, content_type): | |
| try: | |
| yt = pytube.YouTube(youtube_url) | |
| stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() | |
| if not stream: | |
| raise Exception("No suitable video stream found") | |
| video_path = tempfile.mktemp(suffix='.mp4') | |
| stream.download(filename=video_path) | |
| audio_path = extract_audio_from_video(video_path, audio_format) | |
| transcription, transcript_path, transcription_status = transcribe_audio( | |
| audio_path, | |
| elevenlabs_api_key, | |
| model_id | |
| ) | |
| if not transcription: | |
| return audio_path, "Audio extracted, but transcription failed", None, transcription_status, None, None, None | |
| # Generate summary or quiz from transcription | |
| formatted_output, json_path, txt_path = analyze_document( | |
| transcription, | |
| gemini_api_key, | |
| language, | |
| content_type | |
| ) | |
| return audio_path, "Processing completed successfully", transcript_path, transcription_status, formatted_output, txt_path, json_path | |
| except Exception as e: | |
| error_message = f"Error processing YouTube video: {str(e)}" | |
| return None, error_message, None, error_message, error_message, None, None | |
| def process_audio_document(audio_path, elevenlabs_api_key, model_id, gemini_api_key, language, content_type): | |
| try: | |
| # Transcribe the audio | |
| transcription, transcript_path, transcription_status = transcribe_audio( | |
| audio_path, | |
| elevenlabs_api_key, | |
| model_id | |
| ) | |
| if not transcription: | |
| return "Transcription failed", None, None, None, None | |
| # Generate summary or quiz from transcription | |
| formatted_output, json_path, txt_path = analyze_document( | |
| transcription, | |
| gemini_api_key, | |
| language, | |
| content_type | |
| ) | |
| return "Processing completed successfully", transcript_path, formatted_output, txt_path, json_path | |
| except Exception as e: | |
| error_message = f"Error processing audio: {str(e)}" | |
| return error_message, None, error_message, None, None | |