Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import gradio as gr | |
| import requests | |
| import torch | |
| import os | |
| from transformers import MarianMTModel, MarianTokenizer, AutoTokenizer, AutoModelForSeq2SeqLM | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from youtube_transcript_api.proxies import WebshareProxyConfig | |
| from gtts import gTTS | |
| # Initialize YouTubeTranscriptApi | |
| proxy_username = os.environ.get('WEBSHARE_PROXY_UN') | |
| proxy_password = os.environ.get('WEBSHARE_PROXY_PW') | |
| ytt_api = None | |
| try: | |
| if proxy_username and proxy_password: | |
| ytt_api = YouTubeTranscriptApi( | |
| proxy_config=WebshareProxyConfig( | |
| proxy_username=proxy_username, | |
| proxy_password=proxy_password, | |
| filter_ip_locations=["us"], | |
| ) | |
| ) | |
| print(f"Successfully connected to the Youtube API with proxy.") | |
| else: | |
| ytt_api = YouTubeTranscriptApi() | |
| print(f"Successfully connected to the Youtube API without proxy.") | |
| except Exception as e: | |
| print(f"A proxy error occurred in connecting to the Youtube API: {e}") | |
| ytt_api = YouTubeTranscriptApi() # Fallback if proxy fails | |
| def getEnglishTranscript(video_id): | |
| """Retrieves the English transcript for a given YouTube video ID.""" | |
| if not ytt_api: | |
| print("YouTubeTranscriptApi not initialized.") | |
| return "" | |
| try: | |
| transcript_list = ytt_api.list(video_id) | |
| english_original = None | |
| for transcript in transcript_list: | |
| if(transcript.language_code == 'en'): | |
| english_original = transcript.fetch() | |
| break | |
| english_output = "" | |
| if english_original: | |
| for snippet in english_original: | |
| english_output += snippet.text + " " | |
| else: | |
| print(f"No English transcript found for video ID: {video_id}") | |
| return english_output.strip() | |
| except Exception as e: | |
| print(f"Error retrieving English transcript for video ID {video_id}: {e}") | |
| return "" | |
| def getArabicTranscript(video_id): | |
| """Retrieves the Arabic transcript for a given YouTube video ID, translating if necessary.""" | |
| if not ytt_api: | |
| print("YouTubeTranscriptApi not initialized.") | |
| return "" | |
| try: | |
| transcript_list = ytt_api.list(video_id) | |
| arabic_translation = None | |
| for transcript in transcript_list: | |
| if(transcript.is_translatable): | |
| arabic_language_code = None | |
| for lang in transcript.translation_languages: | |
| if lang.language == 'Arabic': | |
| arabic_language_code = lang.language_code | |
| break | |
| if arabic_language_code: | |
| print(f"\nTranslating to Arabic ({arabic_language_code})...") | |
| arabic_translation = transcript.translate(arabic_language_code).fetch() | |
| print("Arabic Translation Found and Stored.") | |
| break # Exit after finding the first Arabic translation | |
| arabic_output = "" | |
| if arabic_translation: | |
| for snippet in arabic_translation: | |
| arabic_output += snippet.text + " " | |
| else: | |
| print(f"No translatable transcript found for Arabic for video ID: {video_id}") | |
| return arabic_output.strip() | |
| except Exception as e: | |
| print(f"Error retrieving or translating Arabic transcript for video ID {video_id}: {e}") | |
| return "" | |
| def getFrenchTranscript(video_id): | |
| """Retrieves the French transcript for a given YouTube video ID, translating if necessary.""" | |
| if not ytt_api: | |
| print("YouTubeTranscriptApi not initialized.") | |
| return "" | |
| try: | |
| transcript_list = ytt_api.list(video_id) | |
| french_translation = None | |
| for transcript in transcript_list: | |
| if(transcript.is_translatable): | |
| french_language_code = None | |
| for lang in transcript.translation_languages: | |
| if lang.language == 'French': | |
| french_language_code = lang.language_code | |
| break | |
| if french_language_code: | |
| print(f"\nTranslating to French ({french_language_code})...") | |
| french_translation = transcript.translate(french_language_code).fetch() | |
| print("French Translation Found and Stored.") | |
| break # Exit after finding the first French translation | |
| french_output = "" | |
| if french_translation: | |
| for snippet in french_translation: | |
| french_output += snippet.text + " " | |
| else: | |
| print(f"No translatable transcript found for French for video ID: {video_id}") | |
| return french_output.strip() | |
| except Exception as e: | |
| print(f"Error retrieving or translating French transcript for video ID {video_id}: {e}") | |
| return "" | |
| model, tokenizer, device = None, None, None | |
| formatted_language_code = "" | |
| def setModelAndTokenizer(language_code): | |
| """Sets the appropriate translation model and tokenizer based on the target language code.""" | |
| global model, tokenizer, device, formatted_language_code | |
| _MODEL_NAME = None | |
| _readable_name = None | |
| if language_code == 'ar': | |
| _MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-ar" | |
| _readable_name = "English to Arabic" | |
| elif language_code == 'fr': | |
| _MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-fr" | |
| _readable_name = "English to French" | |
| elif language_code == 'ha': | |
| _MODEL_NAME = "facebook/nllb-200-distilled-600M" | |
| _readable_name = "English to Hausa" | |
| formatted_language_code = "hau_Latn" | |
| elif language_code == 'fa': | |
| _MODEL_NAME = "facebook/nllb-200-distilled-600M" | |
| _readable_name = "English to Dari/Afghan Persian" | |
| formatted_language_code = "pes_Arab" | |
| elif language_code == 'ps': | |
| _MODEL_NAME = "facebook/nllb-200-distilled-600M" | |
| _readable_name = "English to Pashto" | |
| formatted_language_code = "pbt_Arab" | |
| else: | |
| return f"Language code '{language_code}' not supported for translation model." | |
| if model is not None and tokenizer is not None and hasattr(tokenizer, 'name_or_path') and tokenizer.name_or_path == _MODEL_NAME: | |
| print(f"Model and tokenizer for {_readable_name} already loaded.") | |
| return f"Model and tokenizer for {_readable_name} already loaded." | |
| print(f"Loading model and tokenizer for {_readable_name}...") | |
| if "Helsinki-NLP" in _MODEL_NAME: | |
| try: | |
| tokenizer = MarianTokenizer.from_pretrained(_MODEL_NAME) | |
| model = MarianMTModel.from_pretrained(_MODEL_NAME) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| print(f"Successfully loaded Helsinki-NLP model: {_MODEL_NAME}") | |
| except Exception as e: | |
| print(f"Error loading Helsinki-NLP model or tokenizer: {e}") | |
| return "Error loading translation model." | |
| elif "facebook" in _MODEL_NAME: | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(_MODEL_NAME) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(_MODEL_NAME, device_map="auto") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| print(f"Successfully loaded Facebook NLLB model: {_MODEL_NAME}") | |
| except Exception as e: | |
| print(f"Error loading Facebook NLLB model or tokenizer: {e}") | |
| return "Error loading translation model." | |
| else: | |
| return f"Unknown model type for {_MODEL_NAME}" | |
| return f"Model and tokenizer set for {_readable_name}." | |
| def chunk_text_by_tokens(text, tokenizer, max_tokens): | |
| """Splits text into chunks based on token count.""" | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| for word in words: | |
| trial_chunk = current_chunk + [word] | |
| # Use add_special_tokens=False to get token count of just the words | |
| num_tokens = len(tokenizer(" ".join(trial_chunk), add_special_tokens=False).input_ids) | |
| if num_tokens > max_tokens: | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| current_chunk = [word] | |
| else: | |
| current_chunk = trial_chunk | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| return chunks | |
| def translate_me(text, language_code): | |
| """Translates the input text to the target language using the loaded model.""" | |
| global model, tokenizer, device, formatted_language_code | |
| if model is None or tokenizer is None: | |
| status = setModelAndTokenizer(language_code) | |
| if "Error" in status or "not supported" in status: | |
| print(status) | |
| return f"Translation failed: {status}" | |
| if text is None or text.strip() == "": | |
| return "No text to translate." | |
| try: | |
| if language_code in ['ar', 'fr']: | |
| inputs = tokenizer(text, return_tensors="pt", padding=True).to(device) | |
| translated = model.generate(**inputs) | |
| return tokenizer.decode(translated[0], skip_special_tokens=True) | |
| elif language_code in ['ha','fa','ps']: | |
| SAFE_CHUNK_SIZE = 900 | |
| tokenizer.src_lang = "eng_Latn" # English | |
| bos_token_id = tokenizer.convert_tokens_to_ids([formatted_language_code])[0] | |
| chunks = chunk_text_by_tokens(text, tokenizer, SAFE_CHUNK_SIZE) | |
| translations = [] | |
| for chunk in chunks: | |
| inputs = tokenizer(chunk, return_tensors="pt").to(device) | |
| translated_tokens = model.generate( | |
| **inputs, | |
| forced_bos_token_id=bos_token_id, | |
| max_length=512 | |
| ) | |
| translation = tokenizer.decode(translated_tokens[0], skip_special_tokens=True) | |
| translations.append(translation) | |
| return "\n".join(translations) | |
| else: | |
| return f"Translation not implemented for language code: {language_code}" | |
| except Exception as e: | |
| print(f"Error during translation: {e}") | |
| return "Error during translation." | |
| def say_it_api(text, _out_lang): | |
| """ | |
| Converts text to speech using gTTS and saves it to a temporary file. | |
| Returns the file path. | |
| """ | |
| if text is None or text.strip() == "": | |
| print("No text provided for gTTS speech generation.") | |
| return None | |
| try: | |
| tts = gTTS(text=text, lang=_out_lang) | |
| filename = "/tmp/gtts_audio.mp3" | |
| tts.save(filename) | |
| return filename | |
| except Exception as e: | |
| print(f"Error during gTTS speech generation: {e}") | |
| return None | |
| def speak_with_elevenlabs_api(text, language_code): | |
| """ | |
| Converts text to speech using ElevenLabs API and saves it to a temporary file. | |
| Returns the file path. | |
| """ | |
| ELEVENLABS_API_KEY = os.environ.get('ELEVENLABS_API_KEY') | |
| VOICE_ID = "EXAVITQu4vr4xnSDxMaL" # Rachel; see docs for voices | |
| if not ELEVENLABS_API_KEY: | |
| print("ElevenLabs API key not found in environment variables.") | |
| return None | |
| if text is None or text.strip() == "": | |
| print("No text provided for ElevenLabs speech generation.") | |
| return None | |
| url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}" | |
| headers = { | |
| "xi-api-key": ELEVENLABS_API_KEY, | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "text": text, | |
| "model_id": "eleven_multilingual_v2", | |
| "voice_settings": { | |
| "stability": 0.5, | |
| "similarity_boost": 0.5 | |
| } | |
| } | |
| try: | |
| response = requests.post(url, headers=headers, json=data) | |
| if response.status_code == 200: | |
| filename = "/tmp/elevenlabs_audio.mp3" | |
| with open(filename, 'wb') as f: | |
| f.write(response.content) | |
| return filename | |
| else: | |
| print(f"Error from ElevenLabs API: Status Code {response.status_code}, Response: {response.text}") | |
| return None | |
| except Exception as e: | |
| print(f"Error calling ElevenLabs API: {e}") | |
| return None | |
| def speechRouter_api(text,language_code): | |
| """ | |
| Routes text-to-speech requests based on language code and returns the audio file path. | |
| """ | |
| if text is None or text.strip() == "": | |
| return None # No text to speak | |
| if language_code == 'ar': | |
| return say_it_api(text,language_code) | |
| elif language_code == 'fr': | |
| return say_it_api(text,language_code) | |
| elif language_code in ['ha', 'fa', 'ps']: | |
| return speak_with_elevenlabs_api(text, language_code) | |
| else: | |
| print(f"Language code '{language_code}' not supported for speech generation.") | |
| return None | |
| def translate_and_speak_api_wrapper(video_id, out_lang): | |
| """ | |
| Translates the given English text from a Youtube video transcript | |
| to other languages and generates speech for the translated text. | |
| Args: | |
| video_id: The Youtube video ID to translate and speak. | |
| out_lang: The language to translate to. | |
| Returns: | |
| A tuple containing: | |
| - translated_text (str): The translated text. | |
| - audio_file_path (str or None): The path to the generated audio file, or None if speech generation failed. | |
| """ | |
| # Ensure model and tokenizer are loaded for the target language | |
| model_status = setModelAndTokenizer(out_lang) | |
| if "Error" in model_status or "not supported" in model_status: | |
| return f"Translation failed: {model_status}", None | |
| english_text = getEnglishTranscript(video_id) | |
| if english_text == "": | |
| return "No English transcript available to translate.", None | |
| translated_text = "" | |
| if out_lang == "ar": | |
| translated_text = getArabicTranscript(video_id) | |
| if translated_text.strip() == "": # If no direct Arabic transcript, translate English | |
| print("No direct Arabic transcript found, translating from English.") | |
| translated_text = translate_me(english_text,out_lang) | |
| elif out_lang == "fr": | |
| translated_text = getFrenchTranscript(video_id) | |
| if translated_text.strip() == "": # If no direct French transcript, translate English | |
| print("No direct French transcript found, translating from English.") | |
| translated_text = translate_me(english_text,out_lang) | |
| elif out_lang in ["ha", "fa", "ps"]: | |
| translated_text = translate_me(english_text,out_lang) | |
| else: | |
| return f"Language code '{out_lang}' not supported for translation.", None | |
| if translated_text is None or translated_text.strip() == "" or "Translation failed" in translated_text: | |
| return f"Translation to {out_lang} failed.", None | |
| # Generate speech using the API wrapper | |
| audio_file_path = speechRouter_api(translated_text, out_lang) | |
| return translated_text, audio_file_path | |
| # This function will serve as the API endpoint for Gradio. | |
| def translate_and_speak_api(video_id: str, language_code: str): | |
| """ | |
| API endpoint to translate and speak YouTube video transcripts. | |
| """ | |
| print(f"Received request for video ID: {video_id}, language: {language_code}") | |
| translated_text, audio_file_path = translate_and_speak_api_wrapper(video_id, language_code) | |
| # Return the translated text and the audio file path (or an empty string if None) | |
| # Returning an empty string instead of None for the audio output might resolve | |
| # the TypeError when autoplay is True. | |
| return translated_text, audio_file_path if audio_file_path is not None else "" | |
| # Define input components | |
| video_id_input = gr.Textbox(label="YouTube Video ID") | |
| language_dropdown = gr.Dropdown( | |
| label="Target Language", | |
| choices=['ar', 'fr', 'ha', 'fa', 'ps'], # Supported language codes | |
| value='ar' # Default value | |
| ) | |
| # Define output components | |
| translated_text_output = gr.Textbox(label="Translated Text") | |
| audio_output = gr.Audio(label="Translated Speech", autoplay=True) | |
| # Combine components and the translate_and_speak_api function into a Gradio interface | |
| demo = gr.Interface( | |
| fn=translate_and_speak_api, # Use the API endpoint function | |
| inputs=[video_id_input, language_dropdown], # Inputs match the API function arguments | |
| outputs=[translated_text_output, audio_output], # Outputs match the API function return values | |
| title="YouTube Translator and Speaker", | |
| description="Enter a YouTube video ID and select a language to get the translated transcript and speech." | |
| ) | |
| # ---- Launch Gradio ---- | |
| if __name__ == "__main__": | |
| demo.launch() | |