Spaces:
Runtime error
Runtime error
| # Audio_Files.py | |
| ######################################### | |
| # Audio Processing Library | |
| # This library is used to download or load audio files from a local directory. | |
| # | |
| #### | |
| # | |
| # Functions: | |
| # | |
| # download_audio_file(url, save_path) | |
| # process_audio( | |
| # process_audio_file(audio_url, audio_file, whisper_model="small.en", api_name=None, api_key=None) | |
| # | |
| # | |
| ######################################### | |
| # Imports | |
| import json | |
| import logging | |
| import os | |
| import subprocess | |
| import tempfile | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| import requests | |
| import yt_dlp | |
| from App_Function_Libraries.Audio_Transcription_Lib import speech_to_text | |
| from App_Function_Libraries.Chunk_Lib import improved_chunking_process | |
| # | |
| # Local Imports | |
| from App_Function_Libraries.DB.DB_Manager import add_media_to_database, add_media_with_keywords, \ | |
| check_media_and_whisper_model | |
| from App_Function_Libraries.Summarization_General_Lib import save_transcription_and_summary, perform_transcription, \ | |
| perform_summarization | |
| from App_Function_Libraries.Utils.Utils import create_download_directory, save_segments_to_json, downloaded_files, \ | |
| sanitize_filename | |
| from App_Function_Libraries.Video_DL_Ingestion_Lib import extract_metadata | |
| # | |
| ####################################################################################################################### | |
| # Function Definitions | |
| # | |
| MAX_FILE_SIZE = 500 * 1024 * 1024 | |
| def download_audio_file(url, current_whisper_model="", use_cookies=False, cookies=None): | |
| try: | |
| # Check if media already exists in the database and compare whisper models | |
| should_download, reason = check_media_and_whisper_model( | |
| url=url, | |
| current_whisper_model=current_whisper_model | |
| ) | |
| if not should_download: | |
| logging.info(f"Skipping audio download: {reason}") | |
| return None | |
| logging.info(f"Proceeding with audio download: {reason}") | |
| # Set up the request headers | |
| headers = {} | |
| if use_cookies and cookies: | |
| try: | |
| cookie_dict = json.loads(cookies) | |
| headers['Cookie'] = '; '.join([f'{k}={v}' for k, v in cookie_dict.items()]) | |
| except json.JSONDecodeError: | |
| logging.warning("Invalid cookie format. Proceeding without cookies.") | |
| # Make the request | |
| response = requests.get(url, headers=headers, stream=True) | |
| # Raise an exception for bad status codes | |
| response.raise_for_status() | |
| # Get the file size | |
| file_size = int(response.headers.get('content-length', 0)) | |
| if file_size > 500 * 1024 * 1024: # 500 MB limit | |
| raise ValueError("File size exceeds the 500MB limit.") | |
| # Generate a unique filename | |
| file_name = f"audio_{uuid.uuid4().hex[:8]}.mp3" | |
| save_path = os.path.join('downloads', file_name) | |
| # Ensure the downloads directory exists | |
| os.makedirs('downloads', exist_ok=True) | |
| # Download the file | |
| with open(save_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| logging.info(f"Audio file downloaded successfully: {save_path}") | |
| return save_path | |
| except requests.RequestException as e: | |
| logging.error(f"Error downloading audio file: {str(e)}") | |
| raise | |
| except ValueError as e: | |
| logging.error(str(e)) | |
| raise | |
| except Exception as e: | |
| logging.error(f"Unexpected error downloading audio file: {str(e)}") | |
| raise | |
| def process_audio( | |
| audio_file_path, | |
| num_speakers=2, | |
| whisper_model="small.en", | |
| custom_prompt_input=None, | |
| offset=0, | |
| api_name=None, | |
| api_key=None, | |
| vad_filter=False, | |
| rolling_summarization=False, | |
| detail_level=0.01, | |
| keywords="default,no_keyword_set", | |
| chunk_text_by_words=False, | |
| max_words=0, | |
| chunk_text_by_sentences=False, | |
| max_sentences=0, | |
| chunk_text_by_paragraphs=False, | |
| max_paragraphs=0, | |
| chunk_text_by_tokens=False, | |
| max_tokens=0 | |
| ): | |
| try: | |
| # Perform transcription | |
| audio_file_path, segments = perform_transcription(audio_file_path, offset, whisper_model, vad_filter) | |
| if audio_file_path is None or segments is None: | |
| logging.error("Process_Audio: Transcription failed or segments not available.") | |
| return "Process_Audio: Transcription failed.", None, None, None, None, None | |
| logging.debug(f"Process_Audio: Transcription audio_file: {audio_file_path}") | |
| logging.debug(f"Process_Audio: Transcription segments: {segments}") | |
| transcription_text = {'audio_file': audio_file_path, 'transcription': segments} | |
| logging.debug(f"Process_Audio: Transcription text: {transcription_text}") | |
| # Save segments to JSON | |
| segments_json_path = save_segments_to_json(segments) | |
| # Perform summarization | |
| summary_text = None | |
| if api_name: | |
| if rolling_summarization is not None: | |
| pass | |
| # FIXME rolling summarization | |
| # summary_text = rolling_summarize_function( | |
| # transcription_text, | |
| # detail=detail_level, | |
| # api_name=api_name, | |
| # api_key=api_key, | |
| # custom_prompt=custom_prompt_input, | |
| # chunk_by_words=chunk_text_by_words, | |
| # max_words=max_words, | |
| # chunk_by_sentences=chunk_text_by_sentences, | |
| # max_sentences=max_sentences, | |
| # chunk_by_paragraphs=chunk_text_by_paragraphs, | |
| # max_paragraphs=max_paragraphs, | |
| # chunk_by_tokens=chunk_text_by_tokens, | |
| # max_tokens=max_tokens | |
| # ) | |
| else: | |
| summary_text = perform_summarization(api_name, segments_json_path, custom_prompt_input, api_key) | |
| if summary_text is None: | |
| logging.error("Summary text is None. Check summarization function.") | |
| summary_file_path = None | |
| else: | |
| summary_text = 'Summary not available' | |
| summary_file_path = None | |
| # Save transcription and summary | |
| download_path = create_download_directory("Audio_Processing") | |
| json_file_path, summary_file_path = save_transcription_and_summary(transcription_text, summary_text, | |
| download_path) | |
| # Update function call to add_media_to_database so that it properly applies the title, author and file type | |
| # Add to database | |
| add_media_to_database(None, {'title': 'Audio File', 'author': 'Unknown'}, segments, summary_text, keywords, | |
| custom_prompt_input, whisper_model) | |
| return transcription_text, summary_text, json_file_path, summary_file_path, None, None | |
| except Exception as e: | |
| logging.error(f"Error in process_audio: {str(e)}") | |
| return str(e), None, None, None, None, None | |
| def process_single_audio(audio_file_path, whisper_model, api_name, api_key, keep_original,custom_keywords, source, | |
| custom_prompt_input, chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking, | |
| use_multi_level_chunking, chunk_language): | |
| progress = [] | |
| transcription = "" | |
| summary = "" | |
| def update_progress(message): | |
| progress.append(message) | |
| return "\n".join(progress) | |
| try: | |
| # Check file size before processing | |
| file_size = os.path.getsize(audio_file_path) | |
| if file_size > MAX_FILE_SIZE: | |
| update_progress(f"File size ({file_size / (1024 * 1024):.2f} MB) exceeds the maximum limit of {MAX_FILE_SIZE / (1024 * 1024):.2f} MB. Skipping this file.") | |
| return "\n".join(progress), "", "" | |
| # Perform transcription | |
| update_progress("Starting transcription...") | |
| segments = speech_to_text(audio_file_path, whisper_model=whisper_model) | |
| transcription = " ".join([segment['Text'] for segment in segments]) | |
| update_progress("Audio transcribed successfully.") | |
| # Perform summarization if API is provided | |
| if api_name and api_key: | |
| update_progress("Starting summarization...") | |
| summary = perform_summarization(api_name, transcription, "Summarize the following audio transcript", | |
| api_key) | |
| update_progress("Audio summarized successfully.") | |
| else: | |
| summary = "No summary available" | |
| # Prepare keywords | |
| keywords = "audio,transcription" | |
| if custom_keywords: | |
| keywords += f",{custom_keywords}" | |
| # Add to database | |
| add_media_with_keywords( | |
| url=source, | |
| title=os.path.basename(audio_file_path), | |
| media_type='audio', | |
| content=transcription, | |
| keywords=keywords, | |
| prompt="Summarize the following audio transcript", | |
| summary=summary, | |
| transcription_model=whisper_model, | |
| author="Unknown", | |
| ingestion_date=None # This will use the current date | |
| ) | |
| update_progress("Audio file added to database successfully.") | |
| if not keep_original and source != "Uploaded File": | |
| os.remove(audio_file_path) | |
| update_progress(f"Temporary file {audio_file_path} removed.") | |
| elif keep_original and source != "Uploaded File": | |
| update_progress(f"Original audio file kept at: {audio_file_path}") | |
| except Exception as e: | |
| update_progress(f"Error processing {source}: {str(e)}") | |
| transcription = f"Error: {str(e)}" | |
| summary = "No summary due to error" | |
| return "\n".join(progress), transcription, summary | |
| def process_audio_files(audio_urls, audio_file, whisper_model, api_name, api_key, use_cookies, cookies, keep_original, | |
| custom_keywords, custom_prompt_input, chunk_method, max_chunk_size, chunk_overlap, | |
| use_adaptive_chunking, use_multi_level_chunking, chunk_language, diarize): | |
| progress = [] | |
| temp_files = [] | |
| all_transcriptions = [] | |
| all_summaries = [] | |
| def update_progress(message): | |
| progress.append(message) | |
| return "\n".join(progress) | |
| def cleanup_files(): | |
| for file in temp_files: | |
| try: | |
| if os.path.exists(file): | |
| os.remove(file) | |
| update_progress(f"Temporary file {file} removed.") | |
| except Exception as e: | |
| update_progress(f"Failed to remove temporary file {file}: {str(e)}") | |
| def reencode_mp3(mp3_file_path): | |
| try: | |
| reencoded_mp3_path = mp3_file_path.replace(".mp3", "_reencoded.mp3") | |
| subprocess.run([ffmpeg_cmd, '-i', mp3_file_path, '-codec:a', 'libmp3lame', reencoded_mp3_path], check=True) | |
| update_progress(f"Re-encoded {mp3_file_path} to {reencoded_mp3_path}.") | |
| return reencoded_mp3_path | |
| except subprocess.CalledProcessError as e: | |
| update_progress(f"Error re-encoding {mp3_file_path}: {str(e)}") | |
| raise | |
| def convert_mp3_to_wav(mp3_file_path): | |
| try: | |
| wav_file_path = mp3_file_path.replace(".mp3", ".wav") | |
| subprocess.run([ffmpeg_cmd, '-i', mp3_file_path, wav_file_path], check=True) | |
| update_progress(f"Converted {mp3_file_path} to {wav_file_path}.") | |
| return wav_file_path | |
| except subprocess.CalledProcessError as e: | |
| update_progress(f"Error converting {mp3_file_path} to WAV: {str(e)}") | |
| raise | |
| try: | |
| # Check and set the ffmpeg command | |
| global ffmpeg_cmd | |
| if os.name == "nt": | |
| logging.debug("Running on Windows") | |
| ffmpeg_cmd = os.path.join(os.getcwd(), "Bin", "ffmpeg.exe") | |
| else: | |
| ffmpeg_cmd = 'ffmpeg' # Assume 'ffmpeg' is in PATH for non-Windows systems | |
| # Ensure ffmpeg is accessible | |
| if not os.path.exists(ffmpeg_cmd) and os.name == "nt": | |
| raise FileNotFoundError(f"ffmpeg executable not found at path: {ffmpeg_cmd}") | |
| # Define chunk options early to avoid undefined errors | |
| chunk_options = { | |
| 'method': chunk_method, | |
| 'max_size': max_chunk_size, | |
| 'overlap': chunk_overlap, | |
| 'adaptive': use_adaptive_chunking, | |
| 'multi_level': use_multi_level_chunking, | |
| 'language': chunk_language | |
| } | |
| # Process multiple URLs | |
| urls = [url.strip() for url in audio_urls.split('\n') if url.strip()] | |
| for i, url in enumerate(urls): | |
| update_progress(f"Processing URL {i + 1}/{len(urls)}: {url}") | |
| # Download and process audio file | |
| audio_file_path = download_audio_file(url, use_cookies, cookies) | |
| if not os.path.exists(audio_file_path): | |
| update_progress(f"Downloaded file not found: {audio_file_path}") | |
| continue | |
| temp_files.append(audio_file_path) | |
| update_progress("Audio file downloaded successfully.") | |
| # Re-encode MP3 to fix potential issues | |
| reencoded_mp3_path = reencode_mp3(audio_file_path) | |
| if not os.path.exists(reencoded_mp3_path): | |
| update_progress(f"Re-encoded file not found: {reencoded_mp3_path}") | |
| continue | |
| temp_files.append(reencoded_mp3_path) | |
| # Convert re-encoded MP3 to WAV | |
| wav_file_path = convert_mp3_to_wav(reencoded_mp3_path) | |
| if not os.path.exists(wav_file_path): | |
| update_progress(f"Converted WAV file not found: {wav_file_path}") | |
| continue | |
| temp_files.append(wav_file_path) | |
| # Initialize transcription | |
| transcription = "" | |
| # Transcribe audio | |
| if diarize: | |
| segments = speech_to_text(wav_file_path, whisper_model=whisper_model, diarize=True) | |
| else: | |
| segments = speech_to_text(wav_file_path, whisper_model=whisper_model) | |
| # Handle segments nested under 'segments' key | |
| if isinstance(segments, dict) and 'segments' in segments: | |
| segments = segments['segments'] | |
| if isinstance(segments, list): | |
| transcription = " ".join([segment.get('Text', '') for segment in segments]) | |
| update_progress("Audio transcribed successfully.") | |
| else: | |
| update_progress("Unexpected segments format received from speech_to_text.") | |
| logging.error(f"Unexpected segments format: {segments}") | |
| continue | |
| if not transcription.strip(): | |
| update_progress("Transcription is empty.") | |
| else: | |
| # Apply chunking | |
| chunked_text = improved_chunking_process(transcription, chunk_options) | |
| # Summarize | |
| if api_name: | |
| try: | |
| summary = perform_summarization(api_name, chunked_text, custom_prompt_input, api_key) | |
| update_progress("Audio summarized successfully.") | |
| except Exception as e: | |
| logging.error(f"Error during summarization: {str(e)}") | |
| summary = "Summary generation failed" | |
| else: | |
| summary = "No summary available (API not provided)" | |
| all_transcriptions.append(transcription) | |
| all_summaries.append(summary) | |
| # Add to database | |
| add_media_with_keywords( | |
| url=url, | |
| title=os.path.basename(wav_file_path), | |
| media_type='audio', | |
| content=transcription, | |
| keywords=custom_keywords, | |
| prompt=custom_prompt_input, | |
| summary=summary, | |
| transcription_model=whisper_model, | |
| author="Unknown", | |
| ingestion_date=datetime.now().strftime('%Y-%m-%d') | |
| ) | |
| update_progress("Audio file processed and added to database.") | |
| # Process uploaded file if provided | |
| if audio_file: | |
| if os.path.getsize(audio_file.name) > MAX_FILE_SIZE: | |
| update_progress( | |
| f"Uploaded file size exceeds the maximum limit of {MAX_FILE_SIZE / (1024 * 1024):.2f}MB. Skipping this file.") | |
| else: | |
| # Re-encode MP3 to fix potential issues | |
| reencoded_mp3_path = reencode_mp3(audio_file.name) | |
| if not os.path.exists(reencoded_mp3_path): | |
| update_progress(f"Re-encoded file not found: {reencoded_mp3_path}") | |
| return update_progress("Processing failed: Re-encoded file not found"), "", "" | |
| temp_files.append(reencoded_mp3_path) | |
| # Convert re-encoded MP3 to WAV | |
| wav_file_path = convert_mp3_to_wav(reencoded_mp3_path) | |
| if not os.path.exists(wav_file_path): | |
| update_progress(f"Converted WAV file not found: {wav_file_path}") | |
| return update_progress("Processing failed: Converted WAV file not found"), "", "" | |
| temp_files.append(wav_file_path) | |
| # Initialize transcription | |
| transcription = "" | |
| if diarize: | |
| segments = speech_to_text(wav_file_path, whisper_model=whisper_model, diarize=True) | |
| else: | |
| segments = speech_to_text(wav_file_path, whisper_model=whisper_model) | |
| # Handle segments nested under 'segments' key | |
| if isinstance(segments, dict) and 'segments' in segments: | |
| segments = segments['segments'] | |
| if isinstance(segments, list): | |
| transcription = " ".join([segment.get('Text', '') for segment in segments]) | |
| else: | |
| update_progress("Unexpected segments format received from speech_to_text.") | |
| logging.error(f"Unexpected segments format: {segments}") | |
| chunked_text = improved_chunking_process(transcription, chunk_options) | |
| if api_name and api_key: | |
| try: | |
| summary = perform_summarization(api_name, chunked_text, custom_prompt_input, api_key) | |
| update_progress("Audio summarized successfully.") | |
| except Exception as e: | |
| logging.error(f"Error during summarization: {str(e)}") | |
| summary = "Summary generation failed" | |
| else: | |
| summary = "No summary available (API not provided)" | |
| all_transcriptions.append(transcription) | |
| all_summaries.append(summary) | |
| add_media_with_keywords( | |
| url="Uploaded File", | |
| title=os.path.basename(wav_file_path), | |
| media_type='audio', | |
| content=transcription, | |
| keywords=custom_keywords, | |
| prompt=custom_prompt_input, | |
| summary=summary, | |
| transcription_model=whisper_model, | |
| author="Unknown", | |
| ingestion_date=datetime.now().strftime('%Y-%m-%d') | |
| ) | |
| update_progress("Uploaded file processed and added to database.") | |
| # Final cleanup | |
| if not keep_original: | |
| cleanup_files() | |
| final_progress = update_progress("All processing complete.") | |
| final_transcriptions = "\n\n".join(all_transcriptions) | |
| final_summaries = "\n\n".join(all_summaries) | |
| return final_progress, final_transcriptions, final_summaries | |
| except Exception as e: | |
| logging.error(f"Error processing audio files: {str(e)}") | |
| cleanup_files() | |
| return update_progress(f"Processing failed: {str(e)}"), "", "" | |
| def download_youtube_audio(url): | |
| try: | |
| # Determine ffmpeg path based on the operating system. | |
| ffmpeg_path = './Bin/ffmpeg.exe' if os.name == 'nt' else 'ffmpeg' | |
| # Create a temporary directory | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Extract information about the video | |
| with yt_dlp.YoutubeDL({'quiet': True}) as ydl: | |
| info_dict = ydl.extract_info(url, download=False) | |
| sanitized_title = sanitize_filename(info_dict['title']) | |
| # Setup the temporary filenames | |
| temp_video_path = Path(temp_dir) / f"{sanitized_title}_temp.mp4" | |
| temp_audio_path = Path(temp_dir) / f"{sanitized_title}.mp3" | |
| # Initialize yt-dlp with options for downloading | |
| ydl_opts = { | |
| 'format': 'bestaudio[ext=m4a]/best[height<=480]', # Prefer best audio, or video up to 480p | |
| 'ffmpeg_location': ffmpeg_path, | |
| 'outtmpl': str(temp_video_path), | |
| 'noplaylist': True, | |
| 'quiet': True | |
| } | |
| # Execute yt-dlp to download the video/audio | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| # Check if the file exists | |
| if not temp_video_path.exists(): | |
| raise FileNotFoundError(f"Expected file was not found: {temp_video_path}") | |
| # Use ffmpeg to extract audio | |
| ffmpeg_command = [ | |
| ffmpeg_path, | |
| '-i', str(temp_video_path), | |
| '-vn', # No video | |
| '-acodec', 'libmp3lame', | |
| '-b:a', '192k', | |
| str(temp_audio_path) | |
| ] | |
| subprocess.run(ffmpeg_command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| # Check if the audio file was created | |
| if not temp_audio_path.exists(): | |
| raise FileNotFoundError(f"Expected audio file was not found: {temp_audio_path}") | |
| # Create a persistent directory for the download if it doesn't exist | |
| persistent_dir = Path("downloads") | |
| persistent_dir.mkdir(exist_ok=True) | |
| # Move the file from the temporary directory to the persistent directory | |
| persistent_file_path = persistent_dir / f"{sanitized_title}.mp3" | |
| os.replace(str(temp_audio_path), str(persistent_file_path)) | |
| # Add the file to the list of downloaded files | |
| downloaded_files.append(str(persistent_file_path)) | |
| return str(persistent_file_path), f"Audio downloaded successfully: {sanitized_title}.mp3" | |
| except Exception as e: | |
| return None, f"Error downloading audio: {str(e)}" | |
| def process_podcast(url, title, author, keywords, custom_prompt, api_name, api_key, whisper_model, | |
| keep_original=False, enable_diarization=False, use_cookies=False, cookies=None, | |
| chunk_method=None, max_chunk_size=300, chunk_overlap=0, use_adaptive_chunking=False, | |
| use_multi_level_chunking=False, chunk_language='english'): | |
| progress = [] | |
| error_message = "" | |
| temp_files = [] | |
| def update_progress(message): | |
| progress.append(message) | |
| return "\n".join(progress) | |
| def cleanup_files(): | |
| if not keep_original: | |
| for file in temp_files: | |
| try: | |
| if os.path.exists(file): | |
| os.remove(file) | |
| update_progress(f"Temporary file {file} removed.") | |
| except Exception as e: | |
| update_progress(f"Failed to remove temporary file {file}: {str(e)}") | |
| try: | |
| # Download podcast | |
| audio_file = download_audio_file(url, use_cookies, cookies) | |
| temp_files.append(audio_file) | |
| update_progress("Podcast downloaded successfully.") | |
| # Extract metadata | |
| metadata = extract_metadata(url) | |
| title = title or metadata.get('title', 'Unknown Podcast') | |
| author = author or metadata.get('uploader', 'Unknown Author') | |
| # Format metadata for storage | |
| metadata_text = f""" | |
| Metadata: | |
| Title: {title} | |
| Author: {author} | |
| Series: {metadata.get('series', 'N/A')} | |
| Episode: {metadata.get('episode', 'N/A')} | |
| Season: {metadata.get('season', 'N/A')} | |
| Upload Date: {metadata.get('upload_date', 'N/A')} | |
| Duration: {metadata.get('duration', 'N/A')} seconds | |
| Description: {metadata.get('description', 'N/A')} | |
| """ | |
| # Update keywords | |
| new_keywords = [] | |
| if metadata.get('series'): | |
| new_keywords.append(f"series:{metadata['series']}") | |
| if metadata.get('episode'): | |
| new_keywords.append(f"episode:{metadata['episode']}") | |
| if metadata.get('season'): | |
| new_keywords.append(f"season:{metadata['season']}") | |
| keywords = f"{keywords},{','.join(new_keywords)}" if keywords else ','.join(new_keywords) | |
| update_progress(f"Metadata extracted - Title: {title}, Author: {author}, Keywords: {keywords}") | |
| # Transcribe the podcast | |
| try: | |
| if enable_diarization: | |
| segments = speech_to_text(audio_file, whisper_model=whisper_model, diarize=True) | |
| else: | |
| segments = speech_to_text(audio_file, whisper_model=whisper_model) | |
| transcription = " ".join([segment['Text'] for segment in segments]) | |
| update_progress("Podcast transcribed successfully.") | |
| except Exception as e: | |
| error_message = f"Transcription failed: {str(e)}" | |
| raise | |
| # Apply chunking | |
| chunk_options = { | |
| 'method': chunk_method, | |
| 'max_size': max_chunk_size, | |
| 'overlap': chunk_overlap, | |
| 'adaptive': use_adaptive_chunking, | |
| 'multi_level': use_multi_level_chunking, | |
| 'language': chunk_language | |
| } | |
| chunked_text = improved_chunking_process(transcription, chunk_options) | |
| # Combine metadata and transcription | |
| full_content = metadata_text + "\n\nTranscription:\n" + transcription | |
| # Summarize if API is provided | |
| summary = None | |
| if api_name and api_key: | |
| try: | |
| summary = perform_summarization(api_name, chunked_text, custom_prompt, api_key) | |
| update_progress("Podcast summarized successfully.") | |
| except Exception as e: | |
| error_message = f"Summarization failed: {str(e)}" | |
| raise | |
| # Add to database | |
| try: | |
| add_media_with_keywords( | |
| url=url, | |
| title=title, | |
| media_type='podcast', | |
| content=full_content, | |
| keywords=keywords, | |
| prompt=custom_prompt, | |
| summary=summary or "No summary available", | |
| transcription_model=whisper_model, | |
| author=author, | |
| ingestion_date=datetime.now().strftime('%Y-%m-%d') | |
| ) | |
| update_progress("Podcast added to database successfully.") | |
| except Exception as e: | |
| error_message = f"Error adding podcast to database: {str(e)}" | |
| raise | |
| # Cleanup | |
| cleanup_files() | |
| return (update_progress("Processing complete."), full_content, summary or "No summary generated.", | |
| title, author, keywords, error_message) | |
| except Exception as e: | |
| logging.error(f"Error processing podcast: {str(e)}") | |
| cleanup_files() | |
| return update_progress(f"Processing failed: {str(e)}"), "", "", "", "", "", str(e) | |
| # | |
| # | |
| ####################################################################################################################### |